diff options
| author | Alexander V. Chernikov <melifaro@FreeBSD.org> | 2023-04-14 15:25:50 +0000 |
|---|---|---|
| committer | Alexander V. Chernikov <melifaro@FreeBSD.org> | 2023-04-14 15:47:55 +0000 |
| commit | 3e5d0784b9b5296bda801add034b057ad68237f7 (patch) | |
| tree | 09c1ac8db59cbdf84d2b457c45ffdca09b1d3bdf /tests/atf_python | |
| parent | 2f53b5991ce05b7e6f2b1eb826cd902fb255a9eb (diff) | |
Diffstat (limited to 'tests/atf_python')
| -rw-r--r-- | tests/atf_python/Makefile | 2 | ||||
| -rw-r--r-- | tests/atf_python/atf_pytest.py | 6 | ||||
| -rw-r--r-- | tests/atf_python/ktest.py | 173 | ||||
| -rw-r--r-- | tests/atf_python/sys/netlink/attrs.py | 2 | ||||
| -rw-r--r-- | tests/atf_python/sys/netlink/base_headers.py | 7 | ||||
| -rw-r--r-- | tests/atf_python/sys/netlink/netlink.py | 2 | ||||
| -rw-r--r-- | tests/atf_python/sys/netlink/netlink_generic.py | 118 | ||||
| -rw-r--r-- | tests/atf_python/utils.py | 5 |
8 files changed, 313 insertions, 2 deletions
diff --git a/tests/atf_python/Makefile b/tests/atf_python/Makefile index 1a2fec387eda..889cdcdf9592 100644 --- a/tests/atf_python/Makefile +++ b/tests/atf_python/Makefile @@ -2,7 +2,7 @@ .PATH: ${.CURDIR} -FILES= __init__.py atf_pytest.py utils.py +FILES= __init__.py atf_pytest.py ktest.py utils.py SUBDIR= sys .include <bsd.own.mk> diff --git a/tests/atf_python/atf_pytest.py b/tests/atf_python/atf_pytest.py index 0dd3a225b73d..19b5f88fa200 100644 --- a/tests/atf_python/atf_pytest.py +++ b/tests/atf_python/atf_pytest.py @@ -6,6 +6,7 @@ from typing import NamedTuple from typing import Optional from typing import Tuple +from atf_python.ktest import generate_ktests from atf_python.utils import nodeid_to_method_name import pytest @@ -42,6 +43,8 @@ class ATFTestObj(object): def _get_test_description(self, obj): """Returns first non-empty line from func docstring or func name""" + if getattr(obj, "descr", None) is not None: + return getattr(obj, "descr") docstr = obj.function.__doc__ if docstr: for line in docstr.split("\n"): @@ -163,6 +166,9 @@ class ATFHandler(object): items.clear() items.extend(new_items) + def expand_tests(self, collector, name, obj): + return generate_ktests(collector, name, obj) + def modify_tests(self, items, config): if config.option.atf_cleanup: self._generate_test_cleanups(items) diff --git a/tests/atf_python/ktest.py b/tests/atf_python/ktest.py new file mode 100644 index 000000000000..4cd9970aaec1 --- /dev/null +++ b/tests/atf_python/ktest.py @@ -0,0 +1,173 @@ +import logging +import time +from typing import NamedTuple + +import pytest +from atf_python.sys.netlink.attrs import NlAttrNested +from atf_python.sys.netlink.attrs import NlAttrStr +from atf_python.sys.netlink.netlink import NetlinkMultipartIterator +from atf_python.sys.netlink.netlink import NlHelper +from atf_python.sys.netlink.netlink import Nlsock +from atf_python.sys.netlink.netlink_generic import KtestAttrType +from atf_python.sys.netlink.netlink_generic import KtestInfoMessage +from atf_python.sys.netlink.netlink_generic import KtestLogMsgType +from atf_python.sys.netlink.netlink_generic import KtestMsgAttrType +from atf_python.sys.netlink.netlink_generic import KtestMsgType +from atf_python.sys.netlink.netlink_generic import timespec +from atf_python.sys.netlink.utils import NlConst +from atf_python.utils import BaseTest +from atf_python.utils import libc +from atf_python.utils import nodeid_to_method_name + + +datefmt = "%H:%M:%S" +fmt = "%(asctime)s.%(msecs)03d %(filename)s:%(funcName)s:%(lineno)d %(message)s" +logging.basicConfig(level=logging.DEBUG, format=fmt, datefmt=datefmt) +logger = logging.getLogger("ktest") + + +NETLINK_FAMILY = "ktest" + + +class KtestItem(pytest.Item): + def __init__(self, *, descr, kcls, **kwargs): + super().__init__(**kwargs) + self.descr = descr + self._kcls = kcls + + def runtest(self): + self._kcls().runtest() + + +class KtestCollector(pytest.Class): + def collect(self): + obj = self.obj + exclude_names = set([n for n in dir(obj) if not n.startswith("_")]) + + autoload = obj.KTEST_MODULE_AUTOLOAD + module_name = obj.KTEST_MODULE_NAME + loader = KtestLoader(module_name, autoload) + ktests = loader.load_ktests() + if not ktests: + return + + orig = pytest.Class.from_parent(self.parent, name=self.name, obj=obj) + for py_test in orig.collect(): + yield py_test + + for ktest in ktests: + name = ktest["name"] + descr = ktest["desc"] + if name in exclude_names: + continue + yield KtestItem.from_parent(self, name=name, descr=descr, kcls=obj) + + +class KtestLoader(object): + def __init__(self, module_name: str, autoload: bool): + self.module_name = module_name + self.autoload = autoload + self.helper = NlHelper() + self.nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper) + self.family_id = self._get_family_id() + + def _get_family_id(self): + try: + family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY) + except ValueError: + if self.autoload: + libc.kldload(self.module_name) + family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY) + else: + raise + return family_id + + def _load_ktests(self): + msg = KtestInfoMessage(self.helper, self.family_id, KtestMsgType.KTEST_CMD_LIST) + msg.set_request() + msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, self.module_name)) + self.nlsock.write_message(msg, verbose=False) + nlmsg_seq = msg.nl_hdr.nlmsg_seq + + ret = [] + for rx_msg in NetlinkMultipartIterator(self.nlsock, nlmsg_seq, self.family_id): + # test_msg.print_message() + tst = { + "mod_name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_MOD_NAME).text, + "name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_NAME).text, + "desc": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_DESCR).text, + } + ret.append(tst) + return ret + + def load_ktests(self): + ret = self._load_ktests() + if not ret and self.autoload: + libc.kldload(self.module_name) + ret = self._load_ktests() + return ret + + +def generate_ktests(collector, name, obj): + if getattr(obj, "KTEST_MODULE_NAME", None) is not None: + return KtestCollector.from_parent(collector, name=name, obj=obj) + return None + + +class BaseKernelTest(BaseTest): + KTEST_MODULE_AUTOLOAD = True + KTEST_MODULE_NAME = None + + def _get_record_time(self, msg) -> float: + timespec = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TS).ts + epoch_ktime = timespec.tv_sec * 1.0 + timespec.tv_nsec * 1.0 / 1000000000 + if not hasattr(self, "_start_epoch"): + self._start_ktime = epoch_ktime + self._start_time = time.time() + epoch_time = self._start_time + else: + epoch_time = time.time() - self._start_time + epoch_ktime + return epoch_time + + def _log_message(self, msg): + # Convert syslog-type l + syslog_level = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL).u8 + if syslog_level <= 6: + loglevel = logging.INFO + else: + loglevel = logging.DEBUG + rec = logging.LogRecord( + self.KTEST_MODULE_NAME, + loglevel, + msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FILE).text, + msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LINE).u32, + "%s", + (msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT).text), + None, + msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC).text, + None, + ) + rec.created = self._get_record_time(msg) + logger.handle(rec) + + def _runtest_name(self, test_name: str, test_data): + module_name = self.KTEST_MODULE_NAME + # print("Running kernel test {} for module {}".format(test_name, module_name)) + helper = NlHelper() + nlsock = Nlsock(NlConst.NETLINK_GENERIC, helper) + family_id = nlsock.get_genl_family_id(NETLINK_FAMILY) + msg = KtestInfoMessage(helper, family_id, KtestMsgType.KTEST_CMD_RUN) + msg.set_request() + msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, module_name)) + msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_TEST_NAME, test_name)) + if test_data is not None: + msg.add_nla(NlAttrNested(KtestAttrType.KTEST_ATTR_TEST_META, test_data)) + nlsock.write_message(msg, verbose=False) + + for log_msg in NetlinkMultipartIterator( + nlsock, msg.nl_hdr.nlmsg_seq, family_id + ): + self._log_message(log_msg) + + def runtest(self, test_data=None): + self._runtest_name(nodeid_to_method_name(self.test_id), test_data) diff --git a/tests/atf_python/sys/netlink/attrs.py b/tests/atf_python/sys/netlink/attrs.py index f6fe9ee43c98..58fbab7fc8db 100644 --- a/tests/atf_python/sys/netlink/attrs.py +++ b/tests/atf_python/sys/netlink/attrs.py @@ -7,6 +7,8 @@ from atf_python.sys.netlink.utils import enum_or_int class NlAttr(object): + HDR_LEN = 4 # sizeof(struct nlattr) + def __init__(self, nla_type, data): if isinstance(nla_type, Enum): self._nla_type = nla_type.value diff --git a/tests/atf_python/sys/netlink/base_headers.py b/tests/atf_python/sys/netlink/base_headers.py index 759d8827fb3c..71771a249b3d 100644 --- a/tests/atf_python/sys/netlink/base_headers.py +++ b/tests/atf_python/sys/netlink/base_headers.py @@ -15,6 +15,13 @@ class Nlmsghdr(Structure): ] +class Nlattr(Structure): + _fields_ = [ + ("nla_len", c_ushort), + ("nla_type", c_ushort), + ] + + class NlMsgType(Enum): NLMSG_NOOP = 1 NLMSG_ERROR = 2 diff --git a/tests/atf_python/sys/netlink/netlink.py b/tests/atf_python/sys/netlink/netlink.py index f813727d55b4..4bdefc2d5014 100644 --- a/tests/atf_python/sys/netlink/netlink.py +++ b/tests/atf_python/sys/netlink/netlink.py @@ -22,8 +22,8 @@ from atf_python.sys.netlink.message import BaseNetlinkMessage from atf_python.sys.netlink.message import NlMsgCategory from atf_python.sys.netlink.message import NlMsgProps from atf_python.sys.netlink.message import StdNetlinkMessage -from atf_python.sys.netlink.netlink_generic import GenlCtrlMsgType from atf_python.sys.netlink.netlink_generic import GenlCtrlAttrType +from atf_python.sys.netlink.netlink_generic import GenlCtrlMsgType from atf_python.sys.netlink.netlink_generic import handler_classes as genl_classes from atf_python.sys.netlink.netlink_route import handler_classes as rt_classes from atf_python.sys.netlink.utils import align4 diff --git a/tests/atf_python/sys/netlink/netlink_generic.py b/tests/atf_python/sys/netlink/netlink_generic.py index ee75d5bf37f3..06dc8704fe07 100644 --- a/tests/atf_python/sys/netlink/netlink_generic.py +++ b/tests/atf_python/sys/netlink/netlink_generic.py @@ -1,10 +1,16 @@ #!/usr/local/bin/python3 +from ctypes import c_int64 +from ctypes import c_long from ctypes import sizeof +from ctypes import Structure from enum import Enum +import struct +from atf_python.sys.netlink.attrs import NlAttr from atf_python.sys.netlink.attrs import NlAttrStr from atf_python.sys.netlink.attrs import NlAttrU16 from atf_python.sys.netlink.attrs import NlAttrU32 +from atf_python.sys.netlink.attrs import NlAttrU8 from atf_python.sys.netlink.base_headers import GenlMsgHdr from atf_python.sys.netlink.message import NlMsgCategory from atf_python.sys.netlink.message import NlMsgProps @@ -105,6 +111,118 @@ class NetlinkGenlCtrlMessage(NetlinkGenlMessage): family_name = GenlCtrlFamilyName +KtestFamilyName = "ktest" + + +class KtestMsgType(Enum): + KTEST_CMD_UNSPEC = 0 + KTEST_CMD_LIST = 1 + KTEST_CMD_RUN = 2 + KTEST_CMD_NEWTEST = 3 + KTEST_CMD_NEWMESSAGE = 4 + + +class KtestAttrType(Enum): + KTEST_ATTR_MOD_NAME = 1 + KTEST_ATTR_TEST_NAME = 2 + KTEST_ATTR_TEST_DESCR = 3 + KTEST_ATTR_TEST_META = 4 + + +class KtestLogMsgType(Enum): + KTEST_MSG_START = 1 + KTEST_MSG_END = 2 + KTEST_MSG_LOG = 3 + KTEST_MSG_FAIL = 4 + + +class KtestMsgAttrType(Enum): + KTEST_MSG_ATTR_TS = 1 + KTEST_MSG_ATTR_FUNC = 2 + KTEST_MSG_ATTR_FILE = 3 + KTEST_MSG_ATTR_LINE = 4 + KTEST_MSG_ATTR_TEXT = 5 + KTEST_MSG_ATTR_LEVEL = 6 + KTEST_MSG_ATTR_META = 7 + + +class timespec(Structure): + _fields_ = [ + ("tv_sec", c_int64), + ("tv_nsec", c_long), + ] + + +class NlAttrTS(NlAttr): + DATA_LEN = sizeof(timespec) + + def __init__(self, nla_type, val): + self.ts = val + super().__init__(nla_type, b"") + + @property + def nla_len(self): + return NlAttr.HDR_LEN + self.DATA_LEN + + def _print_attr_value(self): + return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec) + + @staticmethod + def _validate(data): + assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN + nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN]) + assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN + + @classmethod + def _parse(cls, data): + nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN]) + val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:]) + return cls(nla_type, val) + + def __bytes__(self): + return self._to_bytes(bytes(self.ts)) + + +ktest_info_attrs = prepare_attrs_map( + [ + AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr), + AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr), + AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr), + ] +) + + +ktest_msg_attrs = prepare_attrs_map( + [ + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS), + ] +) + + +class KtestInfoMessage(NetlinkGenlMessage): + messages = [ + NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET), + NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW), + NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW), + ] + nl_attrs_map = ktest_info_attrs + family_name = KtestFamilyName + + +class KtestMsgMessage(NetlinkGenlMessage): + messages = [ + NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW), + ] + nl_attrs_map = ktest_msg_attrs + family_name = KtestFamilyName + + handler_classes = { GenlCtrlFamilyName: [NetlinkGenlCtrlMessage], + KtestFamilyName: [KtestInfoMessage, KtestMsgMessage], } diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py index 591a532ca476..1c0a68dad383 100644 --- a/tests/atf_python/utils.py +++ b/tests/atf_python/utils.py @@ -28,6 +28,11 @@ class LibCWrapper(object): return get_errno() return 0 + def kldload(self, kld_name: str) -> int: + if self._libc.kldload(bytes(kld_name, encoding="ascii")) == -1: + return get_errno() + return 0 + def jail_attach(self, jid: int) -> int: if self._libc.jail_attach(jid) != 0: return get_errno() |
