aboutsummaryrefslogtreecommitdiff
path: root/tests/sys/netinet6/test_ip6_output.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sys/netinet6/test_ip6_output.py')
-rw-r--r--tests/sys/netinet6/test_ip6_output.py540
1 files changed, 540 insertions, 0 deletions
diff --git a/tests/sys/netinet6/test_ip6_output.py b/tests/sys/netinet6/test_ip6_output.py
new file mode 100644
index 000000000000..fc821606a726
--- /dev/null
+++ b/tests/sys/netinet6/test_ip6_output.py
@@ -0,0 +1,540 @@
+import errno
+import ipaddress
+import socket
+import struct
+import time
+from ctypes import c_byte
+from ctypes import c_uint
+from ctypes import Structure
+
+import pytest
+from atf_python.sys.net.rtsock import SaHelper
+from atf_python.sys.net.tools import ToolsHelper
+from atf_python.sys.net.vnet import run_cmd
+from atf_python.sys.net.vnet import SingleVnetTestTemplate
+from atf_python.sys.net.vnet import VnetTestTemplate
+
+
+class In6Pktinfo(Structure):
+ _fields_ = [
+ ("ipi6_addr", c_byte * 16),
+ ("ipi6_ifindex", c_uint),
+ ]
+
+
+class VerboseSocketServer:
+ def __init__(self, ip: str, port: int, ifname: str = None):
+ self.ip = ip
+ self.port = port
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
+ addr = ipaddress.ip_address(ip)
+ if addr.is_link_local and ifname:
+ ifindex = socket.if_nametoindex(ifname)
+ addr_tuple = (ip, port, 0, ifindex)
+ elif addr.is_multicast and ifname:
+ ifindex = socket.if_nametoindex(ifname)
+ mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)
+ s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
+ print("## JOINED group {} % {}".format(ip, ifname))
+ addr_tuple = ("::", port, 0, ifindex)
+ else:
+ addr_tuple = (ip, port, 0, 0)
+ print("## Listening on [{}]:{}".format(addr_tuple[0], port))
+ s.bind(addr_tuple)
+ self.socket = s
+
+ def recv(self):
+ # data = self.socket.recv(4096)
+ # print("RX: " + data)
+ data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)
+ # Assume ancdata has just 1 item
+ info = In6Pktinfo.from_buffer_copy(ancdata[0][2])
+ dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)
+ dst_iface = socket.if_indextoname(info.ipi6_ifindex)
+
+ tx_obj = {
+ "data": data,
+ "src_ip": address[0],
+ "dst_ip": dst_ip,
+ "dst_iface": dst_iface,
+ }
+ return tx_obj
+
+
+class BaseTestIP6Ouput(VnetTestTemplate):
+ TOPOLOGY = {
+ "vnet1": {"ifaces": ["if1", "if2", "if3"]},
+ "vnet2": {"ifaces": ["if1", "if2", "if3"]},
+ "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
+ "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
+ "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
+ }
+ DEFAULT_PORT = 45365
+
+ def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):
+ """Generic listener that sends first received packet with metadata
+ back to the sender via pipw
+ """
+ ll_data = ToolsHelper.get_linklocals()
+ # Start listener
+ ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)
+ vnet.pipe.send(ll_data)
+
+ tx_obj = ss.recv()
+ tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias
+ vnet.pipe.send(tx_obj)
+
+
+class TestIP6Output(BaseTestIP6Ouput):
+ def vnet2_handler(self, vnet):
+ ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
+ self._vnet2_handler(vnet, ip, None)
+
+ @pytest.mark.require_user("root")
+ def test_output6_base(self):
+ """Tests simple UDP output"""
+ second_vnet = self.vnet_map["vnet2"]
+
+ # Pick target on if2 vnet2's end
+ ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
+ ip = str(ifaddr.ip)
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ data = bytes("AAAA", "utf-8")
+ print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
+
+ # Wait for the child to become ready
+ self.wait_object(second_vnet.pipe)
+ s.sendto(data, (ip, self.DEFAULT_PORT))
+
+ # Wait for the received object
+ rx_obj = self.wait_object(second_vnet.pipe)
+ assert rx_obj["dst_ip"] == ip
+ assert rx_obj["dst_iface_alias"] == "if2"
+
+ @pytest.mark.require_user("root")
+ def test_output6_nhop(self):
+ """Tests UDP output with custom nhop set"""
+ second_vnet = self.vnet_map["vnet2"]
+
+ # Pick target on if2 vnet2's end
+ ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
+ ip_dst = str(ifaddr.ip)
+ # Pick nexthop on if1
+ ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])
+ ip_next = str(ifaddr.ip)
+ sin6_next = SaHelper.ip6_sa(ip_next, 0)
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
+ s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
+
+ # Wait for the child to become ready
+ self.wait_object(second_vnet.pipe)
+ data = bytes("AAAA", "utf-8")
+ s.sendto(data, (ip_dst, self.DEFAULT_PORT))
+
+ # Wait for the received object
+ rx_obj = self.wait_object(second_vnet.pipe)
+ assert rx_obj["dst_ip"] == ip_dst
+ assert rx_obj["dst_iface_alias"] == "if1"
+
+ @pytest.mark.parametrize(
+ "params",
+ [
+ # esrc: src-ip, if: src-interface, esrc: expected-src,
+ # eif: expected-rx-interface
+ pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),
+ pytest.param(
+ {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},
+ id="iponly1",
+ ),
+ pytest.param(
+ {
+ "src": "2001:db8:c::1",
+ "if": "if3",
+ "ex": errno.EHOSTUNREACH,
+ },
+ id="ipandif",
+ ),
+ pytest.param(
+ {
+ "src": "2001:db8:c::aaaa",
+ "ex": errno.EADDRNOTAVAIL,
+ },
+ id="nolocalip",
+ ),
+ pytest.param(
+ {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"
+ ),
+ ],
+ )
+ @pytest.mark.require_user("root")
+ def test_output6_pktinfo(self, params):
+ """Tests simple UDP output"""
+ second_vnet = self.vnet_map["vnet2"]
+ vnet = self.vnet
+
+ # Pick target on if2 vnet2's end
+ ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
+ dst_ip = str(ifaddr.ip)
+
+ src_ip = params.get("src", "")
+ src_ifname = params.get("if", "")
+ expected_ip = params.get("esrc", "")
+ expected_ifname = params.get("eif", "")
+ errno = params.get("ex", 0)
+
+ pktinfo = In6Pktinfo()
+ if src_ip:
+ for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):
+ pktinfo.ipi6_addr[i] = b
+ if src_ifname:
+ os_ifname = vnet.iface_alias_map[src_ifname].name
+ pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)
+
+ # Wait for the child to become ready
+ self.wait_object(second_vnet.pipe)
+ data = bytes("AAAA", "utf-8")
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
+ try:
+ s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
+ aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
+ s.sendto(data, (dst_ip, self.DEFAULT_PORT))
+ except OSError as e:
+ if not errno:
+ raise
+ assert e.errno == errno
+ print("Correctly raised {}".format(e))
+ return
+
+ # Wait for the received object
+ rx_obj = self.wait_object(second_vnet.pipe)
+
+ assert rx_obj["dst_ip"] == dst_ip
+ if expected_ip:
+ assert rx_obj["src_ip"] == expected_ip
+ if expected_ifname:
+ assert rx_obj["dst_iface_alias"] == expected_ifname
+
+
+class TestIP6OutputLL(BaseTestIP6Ouput):
+ def vnet2_handler(self, vnet):
+ """Generic listener that sends first received packet with metadata
+ back to the sender via pipw
+ """
+ os_ifname = vnet.iface_alias_map["if2"].name
+ ll_data = ToolsHelper.get_linklocals()
+ ll_ip, _ = ll_data[os_ifname][0]
+ self._vnet2_handler(vnet, ll_ip, os_ifname)
+
+ @pytest.mark.require_user("root")
+ def test_output6_linklocal(self):
+ """Tests simple UDP output"""
+ second_vnet = self.vnet_map["vnet2"]
+
+ # Wait for the child to become ready
+ ll_data = self.wait_object(second_vnet.pipe)
+
+ # Pick LL address on if2 vnet2's end
+ ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]
+ # Get local interface scope
+ os_ifname = self.vnet.iface_alias_map["if2"].name
+ scopeid = socket.if_nametoindex(os_ifname)
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ data = bytes("AAAA", "utf-8")
+ target = (ip, self.DEFAULT_PORT, 0, scopeid)
+ print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))
+
+ s.sendto(data, target)
+
+ # Wait for the received object
+ rx_obj = self.wait_object(second_vnet.pipe)
+ assert rx_obj["dst_ip"] == ip
+ assert rx_obj["dst_iface_alias"] == "if2"
+
+
+class TestIP6OutputNhopLL(BaseTestIP6Ouput):
+ def vnet2_handler(self, vnet):
+ """Generic listener that sends first received packet with metadata
+ back to the sender via pipw
+ """
+ ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
+ self._vnet2_handler(vnet, ip, None)
+
+ @pytest.mark.require_user("root")
+ def test_output6_nhop_linklocal(self):
+ """Tests UDP output with custom link-local nhop set"""
+ second_vnet = self.vnet_map["vnet2"]
+
+ # Wait for the child to become ready
+ ll_data = self.wait_object(second_vnet.pipe)
+
+ # Pick target on if2 vnet2's end
+ ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
+ ip_dst = str(ifaddr.ip)
+ # Pick nexthop on if1
+ ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]
+ # Get local interfaces
+ os_ifname = self.vnet.iface_alias_map["if1"].name
+ scopeid = socket.if_nametoindex(os_ifname)
+ sin6_next = SaHelper.ip6_sa(ip_next, scopeid)
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
+ s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
+
+ data = bytes("AAAA", "utf-8")
+ s.sendto(data, (ip_dst, self.DEFAULT_PORT))
+
+ # Wait for the received object
+ rx_obj = self.wait_object(second_vnet.pipe)
+ assert rx_obj["dst_ip"] == ip_dst
+ assert rx_obj["dst_iface_alias"] == "if1"
+
+
+class TestIP6OutputScope(BaseTestIP6Ouput):
+ def vnet2_handler(self, vnet):
+ """Generic listener that sends first received packet with metadata
+ back to the sender via pipw
+ """
+ bind_ip, bind_ifp = self.wait_object(vnet.pipe)
+ if bind_ip is None:
+ os_ifname = vnet.iface_alias_map[bind_ifp].name
+ ll_data = ToolsHelper.get_linklocals()
+ bind_ip, _ = ll_data[os_ifname][0]
+ if bind_ifp is not None:
+ bind_ifp = vnet.iface_alias_map[bind_ifp].name
+ print("## BIND {}%{}".format(bind_ip, bind_ifp))
+ self._vnet2_handler(vnet, bind_ip, bind_ifp)
+
+ @pytest.mark.parametrize(
+ "params",
+ [
+ # sif/dif: source/destination interface (for link-local addr)
+ # sip/dip: source/destination ip (for non-LL addr)
+ # ex: OSError errno that sendto() must raise
+ pytest.param({"sif": "if2", "dif": "if2"}, id="same"),
+ pytest.param(
+ {
+ "sif": "if1",
+ "dif": "if2",
+ "ex": errno.EHOSTUNREACH,
+ },
+ id="ll_differentif1",
+ ),
+ pytest.param(
+ {
+ "sif": "if1",
+ "dip": "2001:db8:b::2",
+ "ex": errno.EHOSTUNREACH,
+ },
+ id="ll_differentif2",
+ ),
+ pytest.param(
+ {
+ "sip": "2001:db8:a::1",
+ "dif": "if2",
+ },
+ id="gu_to_ll",
+ ),
+ ],
+ )
+ @pytest.mark.require_user("root")
+ def test_output6_linklocal_scope(self, params):
+ """Tests simple UDP output"""
+ second_vnet = self.vnet_map["vnet2"]
+
+ src_ifp = params.get("sif")
+ src_ip = params.get("sip")
+ dst_ifp = params.get("dif")
+ dst_ip = params.get("dip")
+ errno = params.get("ex", 0)
+
+ # Sent ifp/IP to bind on
+ second_vnet = self.vnet_map["vnet2"]
+ second_vnet.pipe.send((dst_ip, dst_ifp))
+
+ # Wait for the child to become ready
+ ll_data = self.wait_object(second_vnet.pipe)
+
+ if dst_ip is None:
+ # Pick LL address on dst_ifp vnet2's end
+ dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]
+ # Get local interface scope
+ os_ifname = self.vnet.iface_alias_map[dst_ifp].name
+ scopeid = socket.if_nametoindex(os_ifname)
+ target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)
+ else:
+ target = (dst_ip, self.DEFAULT_PORT, 0, 0)
+
+ # Bind
+ if src_ip is None:
+ ll_data = ToolsHelper.get_linklocals()
+ os_ifname = self.vnet.iface_alias_map[src_ifp].name
+ src_ip, _ = ll_data[os_ifname][0]
+ scopeid = socket.if_nametoindex(os_ifname)
+ src = (src_ip, self.DEFAULT_PORT, 0, scopeid)
+ else:
+ src = (src_ip, self.DEFAULT_PORT, 0, 0)
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ s.bind(src)
+ data = bytes("AAAA", "utf-8")
+ print("## TX packet {} -> {}".format(src, target))
+
+ try:
+ s.sendto(data, target)
+ except OSError as e:
+ if not errno:
+ raise
+ assert e.errno == errno
+ print("Correctly raised {}".format(e))
+ return
+
+ # Wait for the received object
+ rx_obj = self.wait_object(second_vnet.pipe)
+ assert rx_obj["dst_ip"] == dst_ip
+ assert rx_obj["src_ip"] == src_ip
+ # assert rx_obj["dst_iface_alias"] == "if2"
+
+
+class TestIP6OutputMulticast(BaseTestIP6Ouput):
+ def vnet2_handler(self, vnet):
+ group = self.wait_object(vnet.pipe)
+ os_ifname = vnet.iface_alias_map["if2"].name
+ self._vnet2_handler(vnet, group, os_ifname)
+
+ @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])
+ @pytest.mark.require_user("root")
+ def test_output6_multicast(self, group_scope):
+ """Tests simple UDP output"""
+ second_vnet = self.vnet_map["vnet2"]
+
+ group = "{}::3456".format(group_scope)
+ second_vnet.pipe.send(group)
+
+ # Pick target on if2 vnet2's end
+ ip = group
+ os_ifname = self.vnet.iface_alias_map["if2"].name
+ ifindex = socket.if_nametoindex(os_ifname)
+ optval = struct.pack("I", ifindex)
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)
+
+ data = bytes("AAAA", "utf-8")
+
+ # Wait for the child to become ready
+ self.wait_object(second_vnet.pipe)
+
+ print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
+ s.sendto(data, (ip, self.DEFAULT_PORT))
+
+ # Wait for the received object
+ rx_obj = self.wait_object(second_vnet.pipe)
+ assert rx_obj["dst_ip"] == ip
+ assert rx_obj["dst_iface_alias"] == "if2"
+
+
+class TestIP6OutputLoopback(SingleVnetTestTemplate):
+ IPV6_PREFIXES = ["2001:db8:a::1/64"]
+ DEFAULT_PORT = 45365
+
+ @pytest.mark.parametrize(
+ "source_validation",
+ [
+ pytest.param(0, id="no_sav"),
+ pytest.param(1, id="sav"),
+ ],
+ )
+ @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
+ def test_output6_self_tcp(self, scope, source_validation):
+ """Tests IPv6 TCP connection to the local IPv6 address"""
+
+ ToolsHelper.set_sysctl(
+ "net.inet6.ip6.source_address_validation", source_validation
+ )
+
+ if scope == "gu":
+ ip = "2001:db8:a::1"
+ addr_tuple = (ip, self.DEFAULT_PORT)
+ elif scope == "ll":
+ os_ifname = self.vnet.iface_alias_map["if1"].name
+ ifindex = socket.if_nametoindex(os_ifname)
+ ll_data = ToolsHelper.get_linklocals()
+ ip, _ = ll_data[os_ifname][0]
+ addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
+ elif scope == "lo":
+ ip = "::1"
+ ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
+ ifindex = socket.if_nametoindex("lo0")
+ addr_tuple = (ip, self.DEFAULT_PORT)
+ else:
+ assert 0 == 1
+ print("address: {}".format(addr_tuple))
+
+ start = time.perf_counter()
+ ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
+ ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
+ ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ ss.bind(addr_tuple)
+ ss.listen()
+ s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
+ s.settimeout(2.0)
+ s.connect(addr_tuple)
+ conn, from_addr = ss.accept()
+ duration = time.perf_counter() - start
+
+ assert from_addr[0] == ip
+ assert duration < 1.0
+
+ @pytest.mark.parametrize(
+ "source_validation",
+ [
+ pytest.param(0, id="no_sav"),
+ pytest.param(1, id="sav"),
+ ],
+ )
+ @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
+ def test_output6_self_udp(self, scope, source_validation):
+ """Tests IPv6 UDP connection to the local IPv6 address"""
+
+ ToolsHelper.set_sysctl(
+ "net.inet6.ip6.source_address_validation", source_validation
+ )
+
+ if scope == "gu":
+ ip = "2001:db8:a::1"
+ addr_tuple = (ip, self.DEFAULT_PORT)
+ elif scope == "ll":
+ os_ifname = self.vnet.iface_alias_map["if1"].name
+ ifindex = socket.if_nametoindex(os_ifname)
+ ll_data = ToolsHelper.get_linklocals()
+ ip, _ = ll_data[os_ifname][0]
+ addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
+ elif scope == "lo":
+ ip = "::1"
+ ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
+ ifindex = socket.if_nametoindex("lo0")
+ addr_tuple = (ip, self.DEFAULT_PORT)
+ else:
+ assert 0 == 1
+ print("address: {}".format(addr_tuple))
+
+ start = time.perf_counter()
+ ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
+ ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
+ ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ ss.bind(addr_tuple)
+ ss.listen()
+ s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
+ s.settimeout(2.0)
+ s.connect(addr_tuple)
+ conn, from_addr = ss.accept()
+ duration = time.perf_counter() - start
+
+ assert from_addr[0] == ip
+ assert duration < 1.0