aboutsummaryrefslogtreecommitdiff
path: root/tests/atf_python/sys/netlink/netlink_generic.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/atf_python/sys/netlink/netlink_generic.py')
-rw-r--r--tests/atf_python/sys/netlink/netlink_generic.py312
1 files changed, 312 insertions, 0 deletions
diff --git a/tests/atf_python/sys/netlink/netlink_generic.py b/tests/atf_python/sys/netlink/netlink_generic.py
new file mode 100644
index 000000000000..80c6eea72a93
--- /dev/null
+++ b/tests/atf_python/sys/netlink/netlink_generic.py
@@ -0,0 +1,312 @@
+#!/usr/local/bin/python3
+import struct
+from ctypes import c_int64
+from ctypes import c_long
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+
+from atf_python.sys.netlink.attrs import NlAttr
+from atf_python.sys.netlink.attrs import NlAttrIp4
+from atf_python.sys.netlink.attrs import NlAttrIp6
+from atf_python.sys.netlink.attrs import NlAttrNested
+from atf_python.sys.netlink.attrs import NlAttrS32
+from atf_python.sys.netlink.attrs import NlAttrStr
+from atf_python.sys.netlink.attrs import NlAttrU16
+from atf_python.sys.netlink.attrs import NlAttrU32
+from atf_python.sys.netlink.attrs import NlAttrU8
+from atf_python.sys.netlink.base_headers import GenlMsgHdr
+from atf_python.sys.netlink.message import NlMsgCategory
+from atf_python.sys.netlink.message import NlMsgProps
+from atf_python.sys.netlink.message import StdNetlinkMessage
+from atf_python.sys.netlink.utils import AttrDescr
+from atf_python.sys.netlink.utils import enum_or_int
+from atf_python.sys.netlink.utils import prepare_attrs_map
+
+
+class NetlinkGenlMessage(StdNetlinkMessage):
+ messages = []
+ nl_attrs_map = {}
+ family_name = None
+
+ def __init__(self, helper, family_id, cmd=0):
+ super().__init__(helper, family_id)
+ self.base_hdr = GenlMsgHdr(cmd=enum_or_int(cmd))
+
+ def parse_base_header(self, data):
+ if len(data) < sizeof(GenlMsgHdr):
+ raise ValueError("length less than GenlMsgHdr header")
+ ghdr = GenlMsgHdr.from_buffer_copy(data)
+ return (ghdr, sizeof(GenlMsgHdr))
+
+ def _get_msg_type(self):
+ return self.base_hdr.cmd
+
+ def print_nl_header(self, prepend=""):
+ # len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501
+ hdr = self.nl_hdr
+ print(
+ "{}len={}, family={}, flags={}(0x{:X}), seq={}, pid={}".format(
+ prepend,
+ hdr.nlmsg_len,
+ self.family_name,
+ self.get_nlm_flags_str(),
+ hdr.nlmsg_flags,
+ hdr.nlmsg_seq,
+ hdr.nlmsg_pid,
+ )
+ )
+
+ def print_base_header(self, hdr, prepend=""):
+ print(
+ "{}cmd={} version={} reserved={}".format(
+ prepend, self.msg_name, hdr.version, hdr.reserved
+ )
+ )
+
+
+GenlCtrlFamilyName = "nlctrl"
+
+
+class GenlCtrlMsgType(Enum):
+ CTRL_CMD_UNSPEC = 0
+ CTRL_CMD_NEWFAMILY = 1
+ CTRL_CMD_DELFAMILY = 2
+ CTRL_CMD_GETFAMILY = 3
+ CTRL_CMD_NEWOPS = 4
+ CTRL_CMD_DELOPS = 5
+ CTRL_CMD_GETOPS = 6
+ CTRL_CMD_NEWMCAST_GRP = 7
+ CTRL_CMD_DELMCAST_GRP = 8
+ CTRL_CMD_GETMCAST_GRP = 9
+ CTRL_CMD_GETPOLICY = 10
+
+
+class GenlCtrlAttrType(Enum):
+ CTRL_ATTR_FAMILY_ID = 1
+ CTRL_ATTR_FAMILY_NAME = 2
+ CTRL_ATTR_VERSION = 3
+ CTRL_ATTR_HDRSIZE = 4
+ CTRL_ATTR_MAXATTR = 5
+ CTRL_ATTR_OPS = 6
+ CTRL_ATTR_MCAST_GROUPS = 7
+ CTRL_ATTR_POLICY = 8
+ CTRL_ATTR_OP_POLICY = 9
+ CTRL_ATTR_OP = 10
+
+
+class GenlCtrlAttrOpType(Enum):
+ CTRL_ATTR_OP_ID = 1
+ CTRL_ATTR_OP_FLAGS = 2
+
+
+class GenlCtrlAttrMcastGroupsType(Enum):
+ CTRL_ATTR_MCAST_GRP_NAME = 1
+ CTRL_ATTR_MCAST_GRP_ID = 2
+
+
+genl_ctrl_attrs = prepare_attrs_map(
+ [
+ AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16),
+ AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_NAME, NlAttrStr),
+ AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32),
+ AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32),
+ AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32),
+ AttrDescr(
+ GenlCtrlAttrType.CTRL_ATTR_OPS,
+ NlAttrNested,
+ [
+ AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_ID, NlAttrU32),
+ AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_FLAGS, NlAttrU32),
+ ],
+ True,
+ ),
+ AttrDescr(
+ GenlCtrlAttrType.CTRL_ATTR_MCAST_GROUPS,
+ NlAttrNested,
+ [
+ AttrDescr(
+ GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_NAME, NlAttrStr
+ ),
+ AttrDescr(
+ GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_ID, NlAttrU32
+ ),
+ ],
+ True,
+ ),
+ ]
+)
+
+
+class NetlinkGenlCtrlMessage(NetlinkGenlMessage):
+ messages = [
+ NlMsgProps(GenlCtrlMsgType.CTRL_CMD_NEWFAMILY, NlMsgCategory.NEW),
+ NlMsgProps(GenlCtrlMsgType.CTRL_CMD_GETFAMILY, NlMsgCategory.GET),
+ NlMsgProps(GenlCtrlMsgType.CTRL_CMD_DELFAMILY, NlMsgCategory.DELETE),
+ ]
+ nl_attrs_map = genl_ctrl_attrs
+ family_name = GenlCtrlFamilyName
+
+
+CarpFamilyName = "carp"
+
+
+class CarpMsgType(Enum):
+ CARP_NL_CMD_UNSPEC = 0
+ CARP_NL_CMD_GET = 1
+ CARP_NL_CMD_SET = 2
+
+
+class CarpAttrType(Enum):
+ CARP_NL_UNSPEC = 0
+ CARP_NL_VHID = 1
+ CARP_NL_STATE = 2
+ CARP_NL_ADVBASE = 3
+ CARP_NL_ADVSKEW = 4
+ CARP_NL_KEY = 5
+ CARP_NL_IFINDEX = 6
+ CARP_NL_ADDR = 7
+ CARP_NL_ADDR6 = 8
+ CARP_NL_IFNAME = 9
+
+
+carp_gen_attrs = prepare_attrs_map(
+ [
+ AttrDescr(CarpAttrType.CARP_NL_VHID, NlAttrU32),
+ AttrDescr(CarpAttrType.CARP_NL_STATE, NlAttrU32),
+ AttrDescr(CarpAttrType.CARP_NL_ADVBASE, NlAttrS32),
+ AttrDescr(CarpAttrType.CARP_NL_ADVSKEW, NlAttrS32),
+ AttrDescr(CarpAttrType.CARP_NL_KEY, NlAttr),
+ AttrDescr(CarpAttrType.CARP_NL_IFINDEX, NlAttrU32),
+ AttrDescr(CarpAttrType.CARP_NL_ADDR, NlAttrIp4),
+ AttrDescr(CarpAttrType.CARP_NL_ADDR6, NlAttrIp6),
+ AttrDescr(CarpAttrType.CARP_NL_IFNAME, NlAttrStr),
+ ]
+)
+
+
+class CarpGenMessage(NetlinkGenlMessage):
+ messages = [
+ NlMsgProps(CarpMsgType.CARP_NL_CMD_GET, NlMsgCategory.GET),
+ NlMsgProps(CarpMsgType.CARP_NL_CMD_SET, NlMsgCategory.NEW),
+ ]
+ nl_attrs_map = carp_gen_attrs
+ family_name = CarpFamilyName
+
+
+KtestFamilyName = "ktest"
+
+
+class KtestMsgType(Enum):
+ KTEST_CMD_UNSPEC = 0
+ KTEST_CMD_LIST = 1
+ KTEST_CMD_RUN = 2
+ KTEST_CMD_NEWTEST = 3
+ KTEST_CMD_NEWMESSAGE = 4
+
+
+class KtestAttrType(Enum):
+ KTEST_ATTR_MOD_NAME = 1
+ KTEST_ATTR_TEST_NAME = 2
+ KTEST_ATTR_TEST_DESCR = 3
+ KTEST_ATTR_TEST_META = 4
+
+
+class KtestLogMsgType(Enum):
+ KTEST_MSG_START = 1
+ KTEST_MSG_END = 2
+ KTEST_MSG_LOG = 3
+ KTEST_MSG_FAIL = 4
+
+
+class KtestMsgAttrType(Enum):
+ KTEST_MSG_ATTR_TS = 1
+ KTEST_MSG_ATTR_FUNC = 2
+ KTEST_MSG_ATTR_FILE = 3
+ KTEST_MSG_ATTR_LINE = 4
+ KTEST_MSG_ATTR_TEXT = 5
+ KTEST_MSG_ATTR_LEVEL = 6
+ KTEST_MSG_ATTR_META = 7
+
+
+class timespec(Structure):
+ _fields_ = [
+ ("tv_sec", c_int64),
+ ("tv_nsec", c_long),
+ ]
+
+
+class NlAttrTS(NlAttr):
+ DATA_LEN = sizeof(timespec)
+
+ def __init__(self, nla_type, val):
+ self.ts = val
+ super().__init__(nla_type, b"")
+
+ @property
+ def nla_len(self):
+ return NlAttr.HDR_LEN + self.DATA_LEN
+
+ def _print_attr_value(self):
+ return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec)
+
+ @staticmethod
+ def _validate(data):
+ assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
+ nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
+ assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
+
+ @classmethod
+ def _parse(cls, data):
+ nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
+ val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN :])
+ return cls(nla_type, val)
+
+ def __bytes__(self):
+ return self._to_bytes(bytes(self.ts))
+
+
+ktest_info_attrs = prepare_attrs_map(
+ [
+ AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr),
+ AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr),
+ AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr),
+ ]
+)
+
+
+ktest_msg_attrs = prepare_attrs_map(
+ [
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8),
+ AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS),
+ ]
+)
+
+
+class KtestInfoMessage(NetlinkGenlMessage):
+ messages = [
+ NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET),
+ NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW),
+ NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW),
+ ]
+ nl_attrs_map = ktest_info_attrs
+ family_name = KtestFamilyName
+
+
+class KtestMsgMessage(NetlinkGenlMessage):
+ messages = [
+ NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW),
+ ]
+ nl_attrs_map = ktest_msg_attrs
+ family_name = KtestFamilyName
+
+
+handler_classes = {
+ CarpFamilyName: [CarpGenMessage],
+ GenlCtrlFamilyName: [NetlinkGenlCtrlMessage],
+ KtestFamilyName: [KtestInfoMessage, KtestMsgMessage],
+}