aboutsummaryrefslogtreecommitdiff
path: root/tests/atf_python
diff options
context:
space:
mode:
authorAlexander V. Chernikov <melifaro@FreeBSD.org>2023-04-14 15:25:50 +0000
committerAlexander V. Chernikov <melifaro@FreeBSD.org>2023-04-14 15:47:55 +0000
commit3e5d0784b9b5296bda801add034b057ad68237f7 (patch)
tree09c1ac8db59cbdf84d2b457c45ffdca09b1d3bdf /tests/atf_python
parent2f53b5991ce05b7e6f2b1eb826cd902fb255a9eb (diff)
Diffstat (limited to 'tests/atf_python')
-rw-r--r--tests/atf_python/Makefile2
-rw-r--r--tests/atf_python/atf_pytest.py6
-rw-r--r--tests/atf_python/ktest.py173
-rw-r--r--tests/atf_python/sys/netlink/attrs.py2
-rw-r--r--tests/atf_python/sys/netlink/base_headers.py7
-rw-r--r--tests/atf_python/sys/netlink/netlink.py2
-rw-r--r--tests/atf_python/sys/netlink/netlink_generic.py118
-rw-r--r--tests/atf_python/utils.py5
8 files changed, 313 insertions, 2 deletions
diff --git a/tests/atf_python/Makefile b/tests/atf_python/Makefile
index 1a2fec387eda..889cdcdf9592 100644
--- a/tests/atf_python/Makefile
+++ b/tests/atf_python/Makefile
@@ -2,7 +2,7 @@
.PATH: ${.CURDIR}
-FILES= __init__.py atf_pytest.py utils.py
+FILES= __init__.py atf_pytest.py ktest.py utils.py
SUBDIR= sys
.include <bsd.own.mk>
diff --git a/tests/atf_python/atf_pytest.py b/tests/atf_python/atf_pytest.py
index 0dd3a225b73d..19b5f88fa200 100644
--- a/tests/atf_python/atf_pytest.py
+++ b/tests/atf_python/atf_pytest.py
@@ -6,6 +6,7 @@ from typing import NamedTuple
from typing import Optional
from typing import Tuple
+from atf_python.ktest import generate_ktests
from atf_python.utils import nodeid_to_method_name
import pytest
@@ -42,6 +43,8 @@ class ATFTestObj(object):
def _get_test_description(self, obj):
"""Returns first non-empty line from func docstring or func name"""
+ if getattr(obj, "descr", None) is not None:
+ return getattr(obj, "descr")
docstr = obj.function.__doc__
if docstr:
for line in docstr.split("\n"):
@@ -163,6 +166,9 @@ class ATFHandler(object):
items.clear()
items.extend(new_items)
+ def expand_tests(self, collector, name, obj):
+ return generate_ktests(collector, name, obj)
+
def modify_tests(self, items, config):
if config.option.atf_cleanup:
self._generate_test_cleanups(items)
diff --git a/tests/atf_python/ktest.py b/tests/atf_python/ktest.py
new file mode 100644
index 000000000000..4cd9970aaec1
--- /dev/null
+++ b/tests/atf_python/ktest.py
@@ -0,0 +1,173 @@
+import logging
+import time
+from typing import NamedTuple
+
+import pytest
+from atf_python.sys.netlink.attrs import NlAttrNested
+from atf_python.sys.netlink.attrs import NlAttrStr
+from atf_python.sys.netlink.netlink import NetlinkMultipartIterator
+from atf_python.sys.netlink.netlink import NlHelper
+from atf_python.sys.netlink.netlink import Nlsock
+from atf_python.sys.netlink.netlink_generic import KtestAttrType
+from atf_python.sys.netlink.netlink_generic import KtestInfoMessage
+from atf_python.sys.netlink.netlink_generic import KtestLogMsgType
+from atf_python.sys.netlink.netlink_generic import KtestMsgAttrType
+from atf_python.sys.netlink.netlink_generic import KtestMsgType
+from atf_python.sys.netlink.netlink_generic import timespec
+from atf_python.sys.netlink.utils import NlConst
+from atf_python.utils import BaseTest
+from atf_python.utils import libc
+from atf_python.utils import nodeid_to_method_name
+
+
+datefmt = "%H:%M:%S"
+fmt = "%(asctime)s.%(msecs)03d %(filename)s:%(funcName)s:%(lineno)d %(message)s"
+logging.basicConfig(level=logging.DEBUG, format=fmt, datefmt=datefmt)
+logger = logging.getLogger("ktest")
+
+
+NETLINK_FAMILY = "ktest"
+
+
+class KtestItem(pytest.Item):
+ def __init__(self, *, descr, kcls, **kwargs):
+ super().__init__(**kwargs)
+ self.descr = descr
+ self._kcls = kcls
+
+ def runtest(self):
+ self._kcls().runtest()
+
+
+class KtestCollector(pytest.Class):
+ def collect(self):
+ obj = self.obj
+ exclude_names = set([n for n in dir(obj) if not n.startswith("_")])
+
+ autoload = obj.KTEST_MODULE_AUTOLOAD
+ module_name = obj.KTEST_MODULE_NAME
+ loader = KtestLoader(module_name, autoload)
+ ktests = loader.load_ktests()
+ if not ktests:
+ return
+
+ orig = pytest.Class.from_parent(self.parent, name=self.name, obj=obj)
+ for py_test in orig.collect():
+ yield py_test
+
+ for ktest in ktests:
+ name = ktest["name"]
+ descr = ktest["desc"]
+ if name in exclude_names:
+ continue
+ yield KtestItem.from_parent(self, name=name, descr=descr, kcls=obj)
+
+
+class KtestLoader(object):
+ def __init__(self, module_name: str, autoload: bool):
+ self.module_name = module_name
+ self.autoload = autoload
+ self.helper = NlHelper()
+ self.nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
+ self.family_id = self._get_family_id()
+
+ def _get_family_id(self):
+ try:
+ family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY)
+ except ValueError:
+ if self.autoload:
+ libc.kldload(self.module_name)
+ family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY)
+ else:
+ raise
+ return family_id
+
+ def _load_ktests(self):
+ msg = KtestInfoMessage(self.helper, self.family_id, KtestMsgType.KTEST_CMD_LIST)
+ msg.set_request()
+ msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, self.module_name))
+ self.nlsock.write_message(msg, verbose=False)
+ nlmsg_seq = msg.nl_hdr.nlmsg_seq
+
+ ret = []
+ for rx_msg in NetlinkMultipartIterator(self.nlsock, nlmsg_seq, self.family_id):
+ # test_msg.print_message()
+ tst = {
+ "mod_name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_MOD_NAME).text,
+ "name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_NAME).text,
+ "desc": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_DESCR).text,
+ }
+ ret.append(tst)
+ return ret
+
+ def load_ktests(self):
+ ret = self._load_ktests()
+ if not ret and self.autoload:
+ libc.kldload(self.module_name)
+ ret = self._load_ktests()
+ return ret
+
+
+def generate_ktests(collector, name, obj):
+ if getattr(obj, "KTEST_MODULE_NAME", None) is not None:
+ return KtestCollector.from_parent(collector, name=name, obj=obj)
+ return None
+
+
+class BaseKernelTest(BaseTest):
+ KTEST_MODULE_AUTOLOAD = True
+ KTEST_MODULE_NAME = None
+
+ def _get_record_time(self, msg) -> float:
+ timespec = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TS).ts
+ epoch_ktime = timespec.tv_sec * 1.0 + timespec.tv_nsec * 1.0 / 1000000000
+ if not hasattr(self, "_start_epoch"):
+ self._start_ktime = epoch_ktime
+ self._start_time = time.time()
+ epoch_time = self._start_time
+ else:
+ epoch_time = time.time() - self._start_time + epoch_ktime
+ return epoch_time
+
+ def _log_message(self, msg):
+ # Convert syslog-type l
+ syslog_level = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL).u8
+ if syslog_level <= 6:
+ loglevel = logging.INFO
+ else:
+ loglevel = logging.DEBUG
+ rec = logging.LogRecord(
+ self.KTEST_MODULE_NAME,
+ loglevel,
+ msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FILE).text,
+ msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LINE).u32,
+ "%s",
+ (msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT).text),
+ None,
+ msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC).text,
+ None,
+ )
+ rec.created = self._get_record_time(msg)
+ logger.handle(rec)
+
+ def _runtest_name(self, test_name: str, test_data):
+ module_name = self.KTEST_MODULE_NAME
+ # print("Running kernel test {} for module {}".format(test_name, module_name))
+ helper = NlHelper()
+ nlsock = Nlsock(NlConst.NETLINK_GENERIC, helper)
+ family_id = nlsock.get_genl_family_id(NETLINK_FAMILY)
+ msg = KtestInfoMessage(helper, family_id, KtestMsgType.KTEST_CMD_RUN)
+ msg.set_request()
+ msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, module_name))
+ msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_TEST_NAME, test_name))
+ if test_data is not None:
+ msg.add_nla(NlAttrNested(KtestAttrType.KTEST_ATTR_TEST_META, test_data))
+ nlsock.write_message(msg, verbose=False)
+
+ for log_msg in NetlinkMultipartIterator(
+ nlsock, msg.nl_hdr.nlmsg_seq, family_id
+ ):
+ self._log_message(log_msg)
+
+ def runtest(self, test_data=None):
+ self._runtest_name(nodeid_to_method_name(self.test_id), test_data)
diff --git a/tests/atf_python/sys/netlink/attrs.py b/tests/atf_python/sys/netlink/attrs.py
index f6fe9ee43c98..58fbab7fc8db 100644
--- a/tests/atf_python/sys/netlink/attrs.py
+++ b/tests/atf_python/sys/netlink/attrs.py
@@ -7,6 +7,8 @@ from atf_python.sys.netlink.utils import enum_or_int
class NlAttr(object):
+ HDR_LEN = 4 # sizeof(struct nlattr)
+
def __init__(self, nla_type, data):
if isinstance(nla_type, Enum):
self._nla_type = nla_type.value
diff --git a/tests/atf_python/sys/netlink/base_headers.py b/tests/atf_python/sys/netlink/base_headers.py
index 759d8827fb3c..71771a249b3d 100644
--- a/tests/atf_python/sys/netlink/base_headers.py
+++ b/tests/atf_python/sys/netlink/base_headers.py
@@ -15,6 +15,13 @@ class Nlmsghdr(Structure):
]
+class Nlattr(Structure):
+ _fields_ = [
+ ("nla_len", c_ushort),
+ ("nla_type", c_ushort),
+ ]
+
+
class NlMsgType(Enum):
NLMSG_NOOP = 1
NLMSG_ERROR = 2
diff --git a/tests/atf_python/sys/netlink/netlink.py b/tests/atf_python/sys/netlink/netlink.py
index f813727d55b4..4bdefc2d5014 100644
--- a/tests/atf_python/sys/netlink/netlink.py
+++ b/tests/atf_python/sys/netlink/netlink.py
@@ -22,8 +22,8 @@ from atf_python.sys.netlink.message import BaseNetlinkMessage
from atf_python.sys.netlink.message import NlMsgCategory
from atf_python.sys.netlink.message import NlMsgProps
from atf_python.sys.netlink.message import StdNetlinkMessage
-from atf_python.sys.netlink.netlink_generic import GenlCtrlMsgType
from atf_python.sys.netlink.netlink_generic import GenlCtrlAttrType
+from atf_python.sys.netlink.netlink_generic import GenlCtrlMsgType
from atf_python.sys.netlink.netlink_generic import handler_classes as genl_classes
from atf_python.sys.netlink.netlink_route import handler_classes as rt_classes
from atf_python.sys.netlink.utils import align4
diff --git a/tests/atf_python/sys/netlink/netlink_generic.py b/tests/atf_python/sys/netlink/netlink_generic.py
index ee75d5bf37f3..06dc8704fe07 100644
--- a/tests/atf_python/sys/netlink/netlink_generic.py
+++ b/tests/atf_python/sys/netlink/netlink_generic.py
@@ -1,10 +1,16 @@
#!/usr/local/bin/python3
+from ctypes import c_int64
+from ctypes import c_long
from ctypes import sizeof
+from ctypes import Structure
from enum import Enum
+import struct
+from atf_python.sys.netlink.attrs import NlAttr
from atf_python.sys.netlink.attrs import NlAttrStr
from atf_python.sys.netlink.attrs import NlAttrU16
from atf_python.sys.netlink.attrs import NlAttrU32
+from atf_python.sys.netlink.attrs import NlAttrU8
from atf_python.sys.netlink.base_headers import GenlMsgHdr
from atf_python.sys.netlink.message import NlMsgCategory
from atf_python.sys.netlink.message import NlMsgProps
@@ -105,6 +111,118 @@ class NetlinkGenlCtrlMessage(NetlinkGenlMessage):
family_name = GenlCtrlFamilyName
+KtestFamilyName = "ktest"
+
+
+class KtestMsgType(Enum):
+ KTEST_CMD_UNSPEC = 0
+ KTEST_CMD_LIST = 1
+ KTEST_CMD_RUN = 2
+ KTEST_CMD_NEWTEST = 3
+ KTEST_CMD_NEWMESSAGE = 4
+
+
+class KtestAttrType(Enum):
+ KTEST_ATTR_MOD_NAME = 1
+ KTEST_ATTR_TEST_NAME = 2
+ KTEST_ATTR_TEST_DESCR = 3
+ KTEST_ATTR_TEST_META = 4
+
+
+class KtestLogMsgType(Enum):
+ KTEST_MSG_START = 1
+ KTEST_MSG_END = 2
+ KTEST_MSG_LOG = 3
+ KTEST_MSG_FAIL = 4
+
+
+class KtestMsgAttrType(Enum):
+ KTEST_MSG_ATTR_TS = 1
+ KTEST_MSG_ATTR_FUNC = 2
+ KTEST_MSG_ATTR_FILE = 3
+ KTEST_MSG_ATTR_LINE = 4
+ KTEST_MSG_ATTR_TEXT = 5
+ KTEST_MSG_ATTR_LEVEL = 6
+ KTEST_MSG_ATTR_META = 7
+
+
+class timespec(Structure):
+ _fields_ = [
+ ("tv_sec", c_int64),
+ ("tv_nsec", c_long),
+ ]
+
+
+class NlAttrTS(NlAttr):
+ DATA_LEN = sizeof(timespec)
+
+ def __init__(self, nla_type, val):
+ self.ts = val
+ super().__init__(nla_type, b"")
+
+ @property
+ def nla_len(self):
+ return NlAttr.HDR_LEN + self.DATA_LEN
+
+ def _print_attr_value(self):
+ return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec)
+
+ @staticmethod
+ def _validate(data):
+ assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
+ nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
+ assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
+
+ @classmethod
+ def _parse(cls, data):
+ nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
+ val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:])
+ return cls(nla_type, val)
+
+ def __bytes__(self):
+ return self._to_bytes(bytes(self.ts))
+
+
+ktest_info_attrs = prepare_attrs_map(
+ [
+ AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr),
+ AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr),
+ AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr),
+ ]
+)
+
+
+ktest_msg_attrs = prepare_attrs_map(
+ [
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS),
+ ]
+)
+
+
+class KtestInfoMessage(NetlinkGenlMessage):
+ messages = [
+ NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET),
+ NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW),
+ NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW),
+ ]
+ nl_attrs_map = ktest_info_attrs
+ family_name = KtestFamilyName
+
+
+class KtestMsgMessage(NetlinkGenlMessage):
+ messages = [
+ NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW),
+ ]
+ nl_attrs_map = ktest_msg_attrs
+ family_name = KtestFamilyName
+
+
handler_classes = {
GenlCtrlFamilyName: [NetlinkGenlCtrlMessage],
+ KtestFamilyName: [KtestInfoMessage, KtestMsgMessage],
}
diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py
index 591a532ca476..1c0a68dad383 100644
--- a/tests/atf_python/utils.py
+++ b/tests/atf_python/utils.py
@@ -28,6 +28,11 @@ class LibCWrapper(object):
return get_errno()
return 0
+ def kldload(self, kld_name: str) -> int:
+ if self._libc.kldload(bytes(kld_name, encoding="ascii")) == -1:
+ return get_errno()
+ return 0
+
def jail_attach(self, jid: int) -> int:
if self._libc.jail_attach(jid) != 0:
return get_errno()