aboutsummaryrefslogtreecommitdiff
path: root/tests/atf_python/atf_pytest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/atf_python/atf_pytest.py')
-rw-r--r--tests/atf_python/atf_pytest.py298
1 files changed, 298 insertions, 0 deletions
diff --git a/tests/atf_python/atf_pytest.py b/tests/atf_python/atf_pytest.py
new file mode 100644
index 000000000000..02ed502ace67
--- /dev/null
+++ b/tests/atf_python/atf_pytest.py
@@ -0,0 +1,298 @@
+import types
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Tuple
+
+from atf_python.ktest import generate_ktests
+from atf_python.utils import nodeid_to_method_name
+
+import pytest
+import os
+
+
+class ATFCleanupItem(pytest.Item):
+ def runtest(self):
+ """Runs cleanup procedure for the test instead of the test itself"""
+ instance = self.parent.cls()
+ cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid))
+ if hasattr(instance, cleanup_name):
+ cleanup = getattr(instance, cleanup_name)
+ cleanup(self.nodeid)
+ elif hasattr(instance, "cleanup"):
+ instance.cleanup(self.nodeid)
+
+ def setup_method_noop(self, method):
+ """Overrides runtest setup method"""
+ pass
+
+ def teardown_method_noop(self, method):
+ """Overrides runtest teardown method"""
+ pass
+
+
+class ATFTestObj(object):
+ def __init__(self, obj, has_cleanup):
+ # Use nodeid without name to properly name class-derived tests
+ self.ident = obj.nodeid.split("::", 1)[1]
+ self.description = self._get_test_description(obj)
+ self.has_cleanup = has_cleanup
+ self.obj = obj
+
+ def _get_test_description(self, obj):
+ """Returns first non-empty line from func docstring or func name"""
+ if getattr(obj, "descr", None) is not None:
+ return getattr(obj, "descr")
+ docstr = obj.function.__doc__
+ if docstr:
+ for line in docstr.split("\n"):
+ if line:
+ return line
+ return obj.name
+
+ @staticmethod
+ def _convert_user_mark(mark, obj, ret: Dict):
+ username = mark.args[0]
+ if username == "unprivileged":
+ # Special unprivileged user requested.
+ # First, require the unprivileged-user config option presence
+ key = "require.config"
+ if key not in ret:
+ ret[key] = "unprivileged_user"
+ else:
+ ret[key] = "{} {}".format(ret[key], "unprivileged_user")
+ # Check if the framework requires root
+ test_cls = ATFHandler.get_test_class(obj)
+ if test_cls and getattr(test_cls, "NEED_ROOT", False):
+ # Yes, so we ask kyua to run us under root instead
+ # It is up to the implementation to switch back to the desired
+ # user
+ ret["require.user"] = "root"
+ else:
+ ret["require.user"] = username
+
+ def _convert_marks(self, obj) -> Dict[str, Any]:
+ wj_func = lambda x: " ".join(x) # noqa: E731
+ _map: Dict[str, Dict] = {
+ "require_user": {"handler": self._convert_user_mark},
+ "require_arch": {"name": "require.arch", "fmt": wj_func},
+ "require_diskspace": {"name": "require.diskspace"},
+ "require_files": {"name": "require.files", "fmt": wj_func},
+ "require_machine": {"name": "require.machine", "fmt": wj_func},
+ "require_memory": {"name": "require.memory"},
+ "require_progs": {"name": "require.progs", "fmt": wj_func},
+ "timeout": {},
+ }
+ ret = {}
+ for mark in obj.iter_markers():
+ if mark.name in _map:
+ if "handler" in _map[mark.name]:
+ _map[mark.name]["handler"](mark, obj, ret)
+ continue
+ name = _map[mark.name].get("name", mark.name)
+ if "fmt" in _map[mark.name]:
+ val = _map[mark.name]["fmt"](mark.args[0])
+ else:
+ val = mark.args[0]
+ ret[name] = val
+ return ret
+
+ def as_lines(self) -> List[str]:
+ """Output test definition in ATF-specific format"""
+ ret = []
+ ret.append("ident: {}".format(self.ident))
+ ret.append("descr: {}".format(self._get_test_description(self.obj)))
+ if self.has_cleanup:
+ ret.append("has.cleanup: true")
+ for key, value in self._convert_marks(self.obj).items():
+ ret.append("{}: {}".format(key, value))
+ return ret
+
+
+class ATFHandler(object):
+ class ReportState(NamedTuple):
+ state: str
+ reason: str
+
+ def __init__(self, report_file_name: Optional[str]):
+ self._tests_state_map: Dict[str, ReportStatus] = {}
+ self._report_file_name = report_file_name
+ self._report_file_handle = None
+
+ def setup_configure(self):
+ fname = self._report_file_name
+ if fname:
+ self._report_file_handle = open(fname, mode="w")
+
+ def setup_method_pre(self, item):
+ """Called before actually running the test setup_method"""
+ # Check if we need to manually drop the privileges
+ for mark in item.iter_markers():
+ if mark.name == "require_user":
+ cls = self.get_test_class(item)
+ cls.TARGET_USER = mark.args[0]
+ break
+
+ def override_runtest(self, obj):
+ # Override basic runtest command
+ obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
+ # Override class setup/teardown
+ obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
+ obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
+
+ @staticmethod
+ def get_test_class(obj):
+ if hasattr(obj, "parent") and obj.parent is not None:
+ if hasattr(obj.parent, "cls"):
+ return obj.parent.cls
+
+ def has_object_cleanup(self, obj):
+ cls = self.get_test_class(obj)
+ if cls is not None:
+ method_name = nodeid_to_method_name(obj.nodeid)
+ cleanup_name = "cleanup_{}".format(method_name)
+ if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name):
+ return True
+ return False
+
+ def _generate_test_cleanups(self, items):
+ new_items = []
+ for obj in items:
+ if self.has_object_cleanup(obj):
+ self.override_runtest(obj)
+ new_items.append(obj)
+ items.clear()
+ items.extend(new_items)
+
+ def expand_tests(self, collector, name, obj):
+ return generate_ktests(collector, name, obj)
+
+ def modify_tests(self, items, config):
+ if config.option.atf_cleanup:
+ self._generate_test_cleanups(items)
+
+ def list_tests(self, tests: List[str]):
+ print('Content-Type: application/X-atf-tp; version="1"')
+ print()
+ for test_obj in tests:
+ has_cleanup = self.has_object_cleanup(test_obj)
+ atf_test = ATFTestObj(test_obj, has_cleanup)
+ for line in atf_test.as_lines():
+ print(line)
+ print()
+
+ def set_report_state(self, test_name: str, state: str, reason: str):
+ self._tests_state_map[test_name] = self.ReportState(state, reason)
+
+ def _extract_report_reason(self, report):
+ data = report.longrepr
+ if data is None:
+ return None
+ if isinstance(data, Tuple):
+ # ('/path/to/test.py', 23, 'Skipped: unable to test')
+ reason = data[2]
+ for prefix in "Skipped: ":
+ if reason.startswith(prefix):
+ reason = reason[len(prefix):]
+ return reason
+ else:
+ # string/ traceback / exception report. Capture the last line
+ return str(data).split("\n")[-1]
+ return None
+
+ def add_report(self, report):
+ # MAP pytest report state to the atf-desired state
+ #
+ # ATF test states:
+ # (1) expected_death, (2) expected_exit, (3) expected_failure
+ # (4) expected_signal, (5) expected_timeout, (6) passed
+ # (7) skipped, (8) failed
+ #
+ # Note that ATF don't have the concept of "soft xfail" - xpass
+ # is a failure. It also calls teardown routine in a separate
+ # process, thus teardown states (pytest-only) are handled as
+ # body continuation.
+
+ # (stage, state, wasxfail)
+
+ # Just a passing test: WANT: passed
+ # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
+ #
+ # Failing body test: WHAT: failed
+ # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
+ #
+ # pytest.skip test decorator: WANT: skipped
+ # GOT: (setup,skipped, False), (teardown, passed, False)
+ #
+ # pytest.skip call inside test function: WANT: skipped
+ # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
+ #
+ # mark.xfail decorator+pytest.xfail: WANT: expected_failure
+ # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
+ #
+ # mark.xfail decorator+pass: WANT: failed
+ # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
+
+ test_name = report.location[2]
+ stage = report.when
+ state = report.outcome
+ reason = self._extract_report_reason(report)
+
+ # We don't care about strict xfail - it gets translated to False
+
+ if stage == "setup":
+ if state in ("skipped", "failed"):
+ # failed init -> failed test, skipped setup -> xskip
+ # for the whole test
+ self.set_report_state(test_name, state, reason)
+ elif stage == "call":
+ # "call" stage shouldn't matter if setup failed
+ if test_name in self._tests_state_map:
+ if self._tests_state_map[test_name].state == "failed":
+ return
+ if state == "failed":
+ # Record failure & override "skipped" state
+ self.set_report_state(test_name, state, reason)
+ elif state == "skipped":
+ if hasattr(report, "wasxfail"):
+ # xfail() called in the test body
+ state = "expected_failure"
+ else:
+ # skip inside the body
+ pass
+ self.set_report_state(test_name, state, reason)
+ elif state == "passed":
+ if hasattr(report, "wasxfail"):
+ # the test was expected to fail but didn't
+ # mark as hard failure
+ state = "failed"
+ self.set_report_state(test_name, state, reason)
+ elif stage == "teardown":
+ if state == "failed":
+ # teardown should be empty, as the cleanup
+ # procedures should be implemented as a separate
+ # function/method, so mark teardown failure as
+ # global failure
+ self.set_report_state(test_name, state, reason)
+
+ def write_report(self):
+ if self._report_file_handle is None:
+ return
+ if self._tests_state_map:
+ # If we're executing in ATF mode, there has to be just one test
+ # Anyway, deterministically pick the first one
+ first_test_name = next(iter(self._tests_state_map))
+ test = self._tests_state_map[first_test_name]
+ if test.state == "passed":
+ line = test.state
+ else:
+ line = "{}: {}".format(test.state, test.reason)
+ print(line, file=self._report_file_handle)
+ self._report_file_handle.close()
+
+ @staticmethod
+ def get_atf_vars() -> Dict[str, str]:
+ px = "_ATF_VAR_"
+ return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}