diff options
Diffstat (limited to 'tests/atf_python/sys/netpfil/ipfw/ipfw.py')
| -rw-r--r-- | tests/atf_python/sys/netpfil/ipfw/ipfw.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/tests/atf_python/sys/netpfil/ipfw/ipfw.py b/tests/atf_python/sys/netpfil/ipfw/ipfw.py new file mode 100644 index 000000000000..0bcc907eeab8 --- /dev/null +++ b/tests/atf_python/sys/netpfil/ipfw/ipfw.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +import os +import socket +import struct +import subprocess +import sys +from ctypes import c_byte +from ctypes import c_char +from ctypes import c_int +from ctypes import c_long +from ctypes import c_uint32 +from ctypes import c_uint8 +from ctypes import c_ulong +from ctypes import c_ushort +from ctypes import sizeof +from ctypes import Structure +from enum import Enum +from typing import Any +from typing import Dict +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Union + +from atf_python.sys.netpfil.ipfw.ioctl import get3_classes +from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes +from atf_python.sys.netpfil.ipfw.ioctl import set3_classes +from atf_python.sys.netpfil.ipfw.utils import AttrDescr +from atf_python.sys.netpfil.ipfw.utils import enum_from_int +from atf_python.sys.netpfil.ipfw.utils import enum_or_int +from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map + + +class DebugHeader(Structure): + _fields_ = [ + ("cmd_type", c_ushort), + ("spare1", c_ushort), + ("opt_name", c_uint32), + ("total_len", c_uint32), + ("spare2", c_uint32), + ] + + +class DebugType(Enum): + DO_CMD = 1 + DO_SET3 = 2 + DO_GET3 = 3 + + +class DebugIoReader(object): + HANDLER_CLASSES = { + DebugType.DO_CMD: legacy_classes, + DebugType.DO_SET3: set3_classes, + DebugType.DO_GET3: get3_classes, + } + + def __init__(self, ipfw_path): + self._msgmap = self.build_msgmap() + self.ipfw_path = ipfw_path + + def build_msgmap(self): + xmap = {} + for debug_type, handler_classes in self.HANDLER_CLASSES.items(): + debug_type = enum_or_int(debug_type) + if debug_type not in xmap: + xmap[debug_type] = {} + for handler_class in handler_classes: + for msg in handler_class.messages: + xmap[debug_type][enum_or_int(msg)] = handler_class + return xmap + + def print_obj_header(self, hdr): + debug_type = "#{}".format(hdr.cmd_type) + for _type in self.HANDLER_CLASSES.keys(): + if _type.value == hdr.cmd_type: + debug_type = _type.name.lower() + break + print( + "@@ record for {} len={} optname={}".format( + debug_type, hdr.total_len, hdr.opt_name + ) + ) + + def parse_record(self, data): + hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)]) + data = data[sizeof(DebugHeader) :] + cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name) + if cls is not None: + return cls.from_bytes(data) + raise ValueError( + "unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name) + ) + + def get_record_from_stdin(self): + data = sys.stdin.buffer.peek(sizeof(DebugHeader)) + if len(data) == 0: + return None + + hdr = DebugHeader.from_buffer_copy(data) + data = sys.stdin.buffer.read(hdr.total_len) + return self.parse_record(data) + + def get_records_from_buffer(self, data): + off = 0 + ret = [] + while off + sizeof(DebugHeader) <= len(data): + hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)]) + ret.append(self.parse_record(data[off : off + hdr.total_len])) + off += hdr.total_len + return ret + + def run_ipfw(self, cmd: str) -> bytes: + args = [self.ipfw_path, "-xqn"] + cmd.split() + r = subprocess.run(args, capture_output=True) + return r.stdout + + def get_records(self, cmd: str): + return self.get_records_from_buffer(self.run_ipfw(cmd)) |
