diff options
Diffstat (limited to 'tests/atf_python/sys/netlink/netlink_generic.py')
-rw-r--r-- | tests/atf_python/sys/netlink/netlink_generic.py | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/tests/atf_python/sys/netlink/netlink_generic.py b/tests/atf_python/sys/netlink/netlink_generic.py new file mode 100644 index 000000000000..80c6eea72a93 --- /dev/null +++ b/tests/atf_python/sys/netlink/netlink_generic.py @@ -0,0 +1,312 @@ +#!/usr/local/bin/python3 +import struct +from ctypes import c_int64 +from ctypes import c_long +from ctypes import sizeof +from ctypes import Structure +from enum import Enum + +from atf_python.sys.netlink.attrs import NlAttr +from atf_python.sys.netlink.attrs import NlAttrIp4 +from atf_python.sys.netlink.attrs import NlAttrIp6 +from atf_python.sys.netlink.attrs import NlAttrNested +from atf_python.sys.netlink.attrs import NlAttrS32 +from atf_python.sys.netlink.attrs import NlAttrStr +from atf_python.sys.netlink.attrs import NlAttrU16 +from atf_python.sys.netlink.attrs import NlAttrU32 +from atf_python.sys.netlink.attrs import NlAttrU8 +from atf_python.sys.netlink.base_headers import GenlMsgHdr +from atf_python.sys.netlink.message import NlMsgCategory +from atf_python.sys.netlink.message import NlMsgProps +from atf_python.sys.netlink.message import StdNetlinkMessage +from atf_python.sys.netlink.utils import AttrDescr +from atf_python.sys.netlink.utils import enum_or_int +from atf_python.sys.netlink.utils import prepare_attrs_map + + +class NetlinkGenlMessage(StdNetlinkMessage): + messages = [] + nl_attrs_map = {} + family_name = None + + def __init__(self, helper, family_id, cmd=0): + super().__init__(helper, family_id) + self.base_hdr = GenlMsgHdr(cmd=enum_or_int(cmd)) + + def parse_base_header(self, data): + if len(data) < sizeof(GenlMsgHdr): + raise ValueError("length less than GenlMsgHdr header") + ghdr = GenlMsgHdr.from_buffer_copy(data) + return (ghdr, sizeof(GenlMsgHdr)) + + def _get_msg_type(self): + return self.base_hdr.cmd + + def print_nl_header(self, prepend=""): + # len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501 + hdr = self.nl_hdr + print( + "{}len={}, family={}, flags={}(0x{:X}), seq={}, pid={}".format( + prepend, + hdr.nlmsg_len, + self.family_name, + self.get_nlm_flags_str(), + hdr.nlmsg_flags, + hdr.nlmsg_seq, + hdr.nlmsg_pid, + ) + ) + + def print_base_header(self, hdr, prepend=""): + print( + "{}cmd={} version={} reserved={}".format( + prepend, self.msg_name, hdr.version, hdr.reserved + ) + ) + + +GenlCtrlFamilyName = "nlctrl" + + +class GenlCtrlMsgType(Enum): + CTRL_CMD_UNSPEC = 0 + CTRL_CMD_NEWFAMILY = 1 + CTRL_CMD_DELFAMILY = 2 + CTRL_CMD_GETFAMILY = 3 + CTRL_CMD_NEWOPS = 4 + CTRL_CMD_DELOPS = 5 + CTRL_CMD_GETOPS = 6 + CTRL_CMD_NEWMCAST_GRP = 7 + CTRL_CMD_DELMCAST_GRP = 8 + CTRL_CMD_GETMCAST_GRP = 9 + CTRL_CMD_GETPOLICY = 10 + + +class GenlCtrlAttrType(Enum): + CTRL_ATTR_FAMILY_ID = 1 + CTRL_ATTR_FAMILY_NAME = 2 + CTRL_ATTR_VERSION = 3 + CTRL_ATTR_HDRSIZE = 4 + CTRL_ATTR_MAXATTR = 5 + CTRL_ATTR_OPS = 6 + CTRL_ATTR_MCAST_GROUPS = 7 + CTRL_ATTR_POLICY = 8 + CTRL_ATTR_OP_POLICY = 9 + CTRL_ATTR_OP = 10 + + +class GenlCtrlAttrOpType(Enum): + CTRL_ATTR_OP_ID = 1 + CTRL_ATTR_OP_FLAGS = 2 + + +class GenlCtrlAttrMcastGroupsType(Enum): + CTRL_ATTR_MCAST_GRP_NAME = 1 + CTRL_ATTR_MCAST_GRP_ID = 2 + + +genl_ctrl_attrs = prepare_attrs_map( + [ + AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16), + AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_NAME, NlAttrStr), + AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32), + AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32), + AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32), + AttrDescr( + GenlCtrlAttrType.CTRL_ATTR_OPS, + NlAttrNested, + [ + AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_ID, NlAttrU32), + AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_FLAGS, NlAttrU32), + ], + True, + ), + AttrDescr( + GenlCtrlAttrType.CTRL_ATTR_MCAST_GROUPS, + NlAttrNested, + [ + AttrDescr( + GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_NAME, NlAttrStr + ), + AttrDescr( + GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_ID, NlAttrU32 + ), + ], + True, + ), + ] +) + + +class NetlinkGenlCtrlMessage(NetlinkGenlMessage): + messages = [ + NlMsgProps(GenlCtrlMsgType.CTRL_CMD_NEWFAMILY, NlMsgCategory.NEW), + NlMsgProps(GenlCtrlMsgType.CTRL_CMD_GETFAMILY, NlMsgCategory.GET), + NlMsgProps(GenlCtrlMsgType.CTRL_CMD_DELFAMILY, NlMsgCategory.DELETE), + ] + nl_attrs_map = genl_ctrl_attrs + family_name = GenlCtrlFamilyName + + +CarpFamilyName = "carp" + + +class CarpMsgType(Enum): + CARP_NL_CMD_UNSPEC = 0 + CARP_NL_CMD_GET = 1 + CARP_NL_CMD_SET = 2 + + +class CarpAttrType(Enum): + CARP_NL_UNSPEC = 0 + CARP_NL_VHID = 1 + CARP_NL_STATE = 2 + CARP_NL_ADVBASE = 3 + CARP_NL_ADVSKEW = 4 + CARP_NL_KEY = 5 + CARP_NL_IFINDEX = 6 + CARP_NL_ADDR = 7 + CARP_NL_ADDR6 = 8 + CARP_NL_IFNAME = 9 + + +carp_gen_attrs = prepare_attrs_map( + [ + AttrDescr(CarpAttrType.CARP_NL_VHID, NlAttrU32), + AttrDescr(CarpAttrType.CARP_NL_STATE, NlAttrU32), + AttrDescr(CarpAttrType.CARP_NL_ADVBASE, NlAttrS32), + AttrDescr(CarpAttrType.CARP_NL_ADVSKEW, NlAttrS32), + AttrDescr(CarpAttrType.CARP_NL_KEY, NlAttr), + AttrDescr(CarpAttrType.CARP_NL_IFINDEX, NlAttrU32), + AttrDescr(CarpAttrType.CARP_NL_ADDR, NlAttrIp4), + AttrDescr(CarpAttrType.CARP_NL_ADDR6, NlAttrIp6), + AttrDescr(CarpAttrType.CARP_NL_IFNAME, NlAttrStr), + ] +) + + +class CarpGenMessage(NetlinkGenlMessage): + messages = [ + NlMsgProps(CarpMsgType.CARP_NL_CMD_GET, NlMsgCategory.GET), + NlMsgProps(CarpMsgType.CARP_NL_CMD_SET, NlMsgCategory.NEW), + ] + nl_attrs_map = carp_gen_attrs + family_name = CarpFamilyName + + +KtestFamilyName = "ktest" + + +class KtestMsgType(Enum): + KTEST_CMD_UNSPEC = 0 + KTEST_CMD_LIST = 1 + KTEST_CMD_RUN = 2 + KTEST_CMD_NEWTEST = 3 + KTEST_CMD_NEWMESSAGE = 4 + + +class KtestAttrType(Enum): + KTEST_ATTR_MOD_NAME = 1 + KTEST_ATTR_TEST_NAME = 2 + KTEST_ATTR_TEST_DESCR = 3 + KTEST_ATTR_TEST_META = 4 + + +class KtestLogMsgType(Enum): + KTEST_MSG_START = 1 + KTEST_MSG_END = 2 + KTEST_MSG_LOG = 3 + KTEST_MSG_FAIL = 4 + + +class KtestMsgAttrType(Enum): + KTEST_MSG_ATTR_TS = 1 + KTEST_MSG_ATTR_FUNC = 2 + KTEST_MSG_ATTR_FILE = 3 + KTEST_MSG_ATTR_LINE = 4 + KTEST_MSG_ATTR_TEXT = 5 + KTEST_MSG_ATTR_LEVEL = 6 + KTEST_MSG_ATTR_META = 7 + + +class timespec(Structure): + _fields_ = [ + ("tv_sec", c_int64), + ("tv_nsec", c_long), + ] + + +class NlAttrTS(NlAttr): + DATA_LEN = sizeof(timespec) + + def __init__(self, nla_type, val): + self.ts = val + super().__init__(nla_type, b"") + + @property + def nla_len(self): + return NlAttr.HDR_LEN + self.DATA_LEN + + def _print_attr_value(self): + return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec) + + @staticmethod + def _validate(data): + assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN + nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN]) + assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN + + @classmethod + def _parse(cls, data): + nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN]) + val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN :]) + return cls(nla_type, val) + + def __bytes__(self): + return self._to_bytes(bytes(self.ts)) + + +ktest_info_attrs = prepare_attrs_map( + [ + AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr), + AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr), + AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr), + ] +) + + +ktest_msg_attrs = prepare_attrs_map( + [ + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8), + AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS), + ] +) + + +class KtestInfoMessage(NetlinkGenlMessage): + messages = [ + NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET), + NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW), + NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW), + ] + nl_attrs_map = ktest_info_attrs + family_name = KtestFamilyName + + +class KtestMsgMessage(NetlinkGenlMessage): + messages = [ + NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW), + ] + nl_attrs_map = ktest_msg_attrs + family_name = KtestFamilyName + + +handler_classes = { + CarpFamilyName: [CarpGenMessage], + GenlCtrlFamilyName: [NetlinkGenlCtrlMessage], + KtestFamilyName: [KtestInfoMessage, KtestMsgMessage], +} |