aboutsummaryrefslogtreecommitdiff
path: root/tests/atf_python/sys/netlink/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/atf_python/sys/netlink/message.py')
-rw-r--r--tests/atf_python/sys/netlink/message.py78
1 files changed, 69 insertions, 9 deletions
diff --git a/tests/atf_python/sys/netlink/message.py b/tests/atf_python/sys/netlink/message.py
index 6bc9f2932868..b6fb2f8e357a 100644
--- a/tests/atf_python/sys/netlink/message.py
+++ b/tests/atf_python/sys/netlink/message.py
@@ -1,15 +1,35 @@
#!/usr/local/bin/python3
import struct
from ctypes import sizeof
+from enum import Enum
from typing import List
+from typing import NamedTuple
from atf_python.sys.netlink.attrs import NlAttr
from atf_python.sys.netlink.attrs import NlAttrNested
+from atf_python.sys.netlink.base_headers import NlmAckFlags
+from atf_python.sys.netlink.base_headers import NlmNewFlags
+from atf_python.sys.netlink.base_headers import NlmGetFlags
+from atf_python.sys.netlink.base_headers import NlmDeleteFlags
from atf_python.sys.netlink.base_headers import NlmBaseFlags
from atf_python.sys.netlink.base_headers import Nlmsghdr
from atf_python.sys.netlink.base_headers import NlMsgType
from atf_python.sys.netlink.utils import align4
from atf_python.sys.netlink.utils import enum_or_int
+from atf_python.sys.netlink.utils import get_bitmask_str
+
+
+class NlMsgCategory(Enum):
+ UNKNOWN = 0
+ GET = 1
+ NEW = 2
+ DELETE = 3
+ ACK = 4
+
+
+class NlMsgProps(NamedTuple):
+ msg: Enum
+ category: NlMsgCategory
class BaseNetlinkMessage(object):
@@ -60,18 +80,40 @@ class BaseNetlinkMessage(object):
def is_reply(self, hdr):
return hdr.nlmsg_type == NlMsgType.NLMSG_ERROR.value
- def print_nl_header(self, hdr, prepend=""):
+ @property
+ def msg_name(self):
+ return "msg#{}".format(self._get_msg_type())
+
+ def _get_nl_category(self):
+ if self.is_reply(self.nl_hdr):
+ return NlMsgCategory.ACK
+ return NlMsgCategory.UNKNOWN
+
+ def get_nlm_flags_str(self):
+ category = self._get_nl_category()
+ flags = self.nl_hdr.nlmsg_flags
+
+ if category == NlMsgCategory.UNKNOWN:
+ return self.helper.get_bitmask_str(NlmBaseFlags, flags)
+ elif category == NlMsgCategory.GET:
+ flags_enum = NlmGetFlags
+ elif category == NlMsgCategory.NEW:
+ flags_enum = NlmNewFlags
+ elif category == NlMsgCategory.DELETE:
+ flags_enum = NlmDeleteFlags
+ elif category == NlMsgCategory.ACK:
+ flags_enum = NlmAckFlags
+ return get_bitmask_str([NlmBaseFlags, flags_enum], flags)
+
+ def print_nl_header(self, prepend=""):
# len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501
- is_reply = self.is_reply(hdr)
- msg_name = self.helper.get_nlmsg_name(hdr.nlmsg_type)
+ hdr = self.nl_hdr
print(
"{}len={}, type={}, flags={}(0x{:X}), seq={}, pid={}".format(
prepend,
hdr.nlmsg_len,
- msg_name,
- self.helper.get_nlm_flags_str(
- msg_name, is_reply, hdr.nlmsg_flags
- ), # noqa: E501
+ self.msg_name,
+ self.get_nlm_flags_str(),
hdr.nlmsg_flags,
hdr.nlmsg_seq,
hdr.nlmsg_pid,
@@ -92,7 +134,7 @@ class BaseNetlinkMessage(object):
return self
def print_message(self):
- self.print_nl_header(self.nl_hdr)
+ self.print_nl_header()
@staticmethod
def print_as_bytes(data: bytes, descr: str):
@@ -191,11 +233,29 @@ class StdNetlinkMessage(BaseNetlinkMessage):
self.nl_hdr.nlmsg_len = len(ret) + sizeof(Nlmsghdr)
return bytes(self.nl_hdr) + ret
+ def _get_msg_type(self):
+ return self.nl_hdr.nlmsg_type
+
+ @property
+ def msg_props(self):
+ msg_type = self._get_msg_type()
+ for msg_props in self.messages:
+ if msg_props.msg.value == msg_type:
+ return msg_props
+ return None
+
+ @property
+ def msg_name(self):
+ msg_props = self.msg_props
+ if msg_props is not None:
+ return msg_props.msg.name
+ return super().msg_name
+
def print_base_header(self, hdr, prepend=""):
pass
def print_message(self):
- self.print_nl_header(self.nl_hdr)
+ self.print_nl_header()
self.print_base_header(self.base_hdr, " ")
for nla in self.nla_list:
nla.print_attr(" ")