diff options
Diffstat (limited to 'tests/atf_python/sys/netlink/message.py')
| -rw-r--r-- | tests/atf_python/sys/netlink/message.py | 78 |
1 files changed, 69 insertions, 9 deletions
diff --git a/tests/atf_python/sys/netlink/message.py b/tests/atf_python/sys/netlink/message.py index 6bc9f2932868..b6fb2f8e357a 100644 --- a/tests/atf_python/sys/netlink/message.py +++ b/tests/atf_python/sys/netlink/message.py @@ -1,15 +1,35 @@ #!/usr/local/bin/python3 import struct from ctypes import sizeof +from enum import Enum from typing import List +from typing import NamedTuple from atf_python.sys.netlink.attrs import NlAttr from atf_python.sys.netlink.attrs import NlAttrNested +from atf_python.sys.netlink.base_headers import NlmAckFlags +from atf_python.sys.netlink.base_headers import NlmNewFlags +from atf_python.sys.netlink.base_headers import NlmGetFlags +from atf_python.sys.netlink.base_headers import NlmDeleteFlags from atf_python.sys.netlink.base_headers import NlmBaseFlags from atf_python.sys.netlink.base_headers import Nlmsghdr from atf_python.sys.netlink.base_headers import NlMsgType from atf_python.sys.netlink.utils import align4 from atf_python.sys.netlink.utils import enum_or_int +from atf_python.sys.netlink.utils import get_bitmask_str + + +class NlMsgCategory(Enum): + UNKNOWN = 0 + GET = 1 + NEW = 2 + DELETE = 3 + ACK = 4 + + +class NlMsgProps(NamedTuple): + msg: Enum + category: NlMsgCategory class BaseNetlinkMessage(object): @@ -60,18 +80,40 @@ class BaseNetlinkMessage(object): def is_reply(self, hdr): return hdr.nlmsg_type == NlMsgType.NLMSG_ERROR.value - def print_nl_header(self, hdr, prepend=""): + @property + def msg_name(self): + return "msg#{}".format(self._get_msg_type()) + + def _get_nl_category(self): + if self.is_reply(self.nl_hdr): + return NlMsgCategory.ACK + return NlMsgCategory.UNKNOWN + + def get_nlm_flags_str(self): + category = self._get_nl_category() + flags = self.nl_hdr.nlmsg_flags + + if category == NlMsgCategory.UNKNOWN: + return self.helper.get_bitmask_str(NlmBaseFlags, flags) + elif category == NlMsgCategory.GET: + flags_enum = NlmGetFlags + elif category == NlMsgCategory.NEW: + flags_enum = NlmNewFlags + elif category == NlMsgCategory.DELETE: + flags_enum = NlmDeleteFlags + elif category == NlMsgCategory.ACK: + flags_enum = NlmAckFlags + return get_bitmask_str([NlmBaseFlags, flags_enum], flags) + + def print_nl_header(self, prepend=""): # len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501 - is_reply = self.is_reply(hdr) - msg_name = self.helper.get_nlmsg_name(hdr.nlmsg_type) + hdr = self.nl_hdr print( "{}len={}, type={}, flags={}(0x{:X}), seq={}, pid={}".format( prepend, hdr.nlmsg_len, - msg_name, - self.helper.get_nlm_flags_str( - msg_name, is_reply, hdr.nlmsg_flags - ), # noqa: E501 + self.msg_name, + self.get_nlm_flags_str(), hdr.nlmsg_flags, hdr.nlmsg_seq, hdr.nlmsg_pid, @@ -92,7 +134,7 @@ class BaseNetlinkMessage(object): return self def print_message(self): - self.print_nl_header(self.nl_hdr) + self.print_nl_header() @staticmethod def print_as_bytes(data: bytes, descr: str): @@ -191,11 +233,29 @@ class StdNetlinkMessage(BaseNetlinkMessage): self.nl_hdr.nlmsg_len = len(ret) + sizeof(Nlmsghdr) return bytes(self.nl_hdr) + ret + def _get_msg_type(self): + return self.nl_hdr.nlmsg_type + + @property + def msg_props(self): + msg_type = self._get_msg_type() + for msg_props in self.messages: + if msg_props.msg.value == msg_type: + return msg_props + return None + + @property + def msg_name(self): + msg_props = self.msg_props + if msg_props is not None: + return msg_props.msg.name + return super().msg_name + def print_base_header(self, hdr, prepend=""): pass def print_message(self): - self.print_nl_header(self.nl_hdr) + self.print_nl_header() self.print_base_header(self.base_hdr, " ") for nla in self.nla_list: nla.print_attr(" ") |
