aboutsummaryrefslogtreecommitdiff
path: root/tests/atf_python/sys/netpfil/ipfw/ipfw.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/atf_python/sys/netpfil/ipfw/ipfw.py')
-rw-r--r--tests/atf_python/sys/netpfil/ipfw/ipfw.py118
1 files changed, 118 insertions, 0 deletions
diff --git a/tests/atf_python/sys/netpfil/ipfw/ipfw.py b/tests/atf_python/sys/netpfil/ipfw/ipfw.py
new file mode 100644
index 000000000000..0bcc907eeab8
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/ipfw.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python3
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+from atf_python.sys.netpfil.ipfw.ioctl import get3_classes
+from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes
+from atf_python.sys.netpfil.ipfw.ioctl import set3_classes
+from atf_python.sys.netpfil.ipfw.utils import AttrDescr
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.sys.netpfil.ipfw.utils import enum_or_int
+from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
+
+
+class DebugHeader(Structure):
+ _fields_ = [
+ ("cmd_type", c_ushort),
+ ("spare1", c_ushort),
+ ("opt_name", c_uint32),
+ ("total_len", c_uint32),
+ ("spare2", c_uint32),
+ ]
+
+
+class DebugType(Enum):
+ DO_CMD = 1
+ DO_SET3 = 2
+ DO_GET3 = 3
+
+
+class DebugIoReader(object):
+ HANDLER_CLASSES = {
+ DebugType.DO_CMD: legacy_classes,
+ DebugType.DO_SET3: set3_classes,
+ DebugType.DO_GET3: get3_classes,
+ }
+
+ def __init__(self, ipfw_path):
+ self._msgmap = self.build_msgmap()
+ self.ipfw_path = ipfw_path
+
+ def build_msgmap(self):
+ xmap = {}
+ for debug_type, handler_classes in self.HANDLER_CLASSES.items():
+ debug_type = enum_or_int(debug_type)
+ if debug_type not in xmap:
+ xmap[debug_type] = {}
+ for handler_class in handler_classes:
+ for msg in handler_class.messages:
+ xmap[debug_type][enum_or_int(msg)] = handler_class
+ return xmap
+
+ def print_obj_header(self, hdr):
+ debug_type = "#{}".format(hdr.cmd_type)
+ for _type in self.HANDLER_CLASSES.keys():
+ if _type.value == hdr.cmd_type:
+ debug_type = _type.name.lower()
+ break
+ print(
+ "@@ record for {} len={} optname={}".format(
+ debug_type, hdr.total_len, hdr.opt_name
+ )
+ )
+
+ def parse_record(self, data):
+ hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)])
+ data = data[sizeof(DebugHeader) :]
+ cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name)
+ if cls is not None:
+ return cls.from_bytes(data)
+ raise ValueError(
+ "unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name)
+ )
+
+ def get_record_from_stdin(self):
+ data = sys.stdin.buffer.peek(sizeof(DebugHeader))
+ if len(data) == 0:
+ return None
+
+ hdr = DebugHeader.from_buffer_copy(data)
+ data = sys.stdin.buffer.read(hdr.total_len)
+ return self.parse_record(data)
+
+ def get_records_from_buffer(self, data):
+ off = 0
+ ret = []
+ while off + sizeof(DebugHeader) <= len(data):
+ hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)])
+ ret.append(self.parse_record(data[off : off + hdr.total_len]))
+ off += hdr.total_len
+ return ret
+
+ def run_ipfw(self, cmd: str) -> bytes:
+ args = [self.ipfw_path, "-xqn"] + cmd.split()
+ r = subprocess.run(args, capture_output=True)
+ return r.stdout
+
+ def get_records(self, cmd: str):
+ return self.get_records_from_buffer(self.run_ipfw(cmd))