aboutsummaryrefslogtreecommitdiff
path: root/tests/atf_python/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/atf_python/utils.py')
-rw-r--r--tests/atf_python/utils.py100
1 files changed, 100 insertions, 0 deletions
diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py
new file mode 100644
index 000000000000..26911c12aef3
--- /dev/null
+++ b/tests/atf_python/utils.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python3
+import os
+import pwd
+from ctypes import CDLL
+from ctypes import get_errno
+from ctypes.util import find_library
+from typing import Dict
+from typing import List
+from typing import Optional
+
+import pytest
+
+
+def nodeid_to_method_name(nodeid: str) -> str:
+ """file_name.py::ClassName::method_name[parametrize] -> method_name"""
+ return nodeid.split("::")[-1].split("[")[0]
+
+
+class LibCWrapper(object):
+ def __init__(self):
+ path: Optional[str] = find_library("c")
+ if path is None:
+ raise RuntimeError("libc not found")
+ self._libc = CDLL(path, use_errno=True)
+
+ def modfind(self, mod_name: str) -> int:
+ if self._libc.modfind(bytes(mod_name, encoding="ascii")) == -1:
+ return get_errno()
+ return 0
+
+ def kldload(self, kld_name: str) -> int:
+ if self._libc.kldload(bytes(kld_name, encoding="ascii")) == -1:
+ return get_errno()
+ return 0
+
+ def jail_attach(self, jid: int) -> int:
+ if self._libc.jail_attach(jid) != 0:
+ return get_errno()
+ return 0
+
+
+libc = LibCWrapper()
+
+
+class BaseTest(object):
+ NEED_ROOT: bool = False # True if the class needs root privileges for the setup
+ TARGET_USER = None # Set to the target user by the framework
+ REQUIRED_MODULES: List[str] = []
+ SKIP_MODULES: List[str] = []
+
+ def require_module(self, mod_name: str, skip=True):
+ error_code = libc.modfind(mod_name)
+ if error_code == 0:
+ return
+ err_str = os.strerror(error_code)
+ txt = "kernel module '{}' not available: {}".format(mod_name, err_str)
+ if skip:
+ pytest.skip(txt)
+ else:
+ raise ValueError(txt)
+
+ def skip_module(self, mod_name: str):
+ error_code = libc.modfind(mod_name)
+ if error_code == 0:
+ txt = "kernel module '{}' loaded, skip test".format(mod_name)
+ pytest.skip(txt)
+ return
+
+ def _check_modules(self):
+ for mod_name in self.REQUIRED_MODULES:
+ self.require_module(mod_name)
+ for mod_name in self.SKIP_MODULES:
+ self.skip_module(mod_name)
+
+ @property
+ def atf_vars(self) -> Dict[str, str]:
+ px = "_ATF_VAR_"
+ return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}
+
+ def drop_privileges_user(self, user: str):
+ uid = pwd.getpwnam(user)[2]
+ print("Dropping privs to {}/{}".format(user, uid))
+ os.setuid(uid)
+
+ def drop_privileges(self):
+ if self.TARGET_USER:
+ if self.TARGET_USER == "unprivileged":
+ user = self.atf_vars["unprivileged-user"]
+ else:
+ user = self.TARGET_USER
+ self.drop_privileges_user(user)
+
+ @property
+ def test_id(self) -> str:
+ # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
+ return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
+
+ def setup_method(self, method):
+ """Run all pre-requisits for the test execution"""
+ self._check_modules()