diff options
Diffstat (limited to 'tests/sys/netpfil/pf/nat64.py')
| -rw-r--r-- | tests/sys/netpfil/pf/nat64.py | 358 | 
1 files changed, 358 insertions, 0 deletions
| diff --git a/tests/sys/netpfil/pf/nat64.py b/tests/sys/netpfil/pf/nat64.py new file mode 100644 index 000000000000..705de72f5bc4 --- /dev/null +++ b/tests/sys/netpfil/pf/nat64.py @@ -0,0 +1,358 @@ +# +# SPDX-License-Identifier: BSD-2-Clause +# +# Copyright (c) 2024 Rubicon Communications, LLC (Netgate) +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +#    notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +#    notice, this list of conditions and the following disclaimer in the +#    documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. + +import pytest +import selectors +import socket +import sys +from utils import DelayedSend +from atf_python.sys.net.tools import ToolsHelper +from atf_python.sys.net.vnet import VnetTestTemplate + +class TestNAT64(VnetTestTemplate): +    REQUIRED_MODULES = [ "pf", "pflog" ] +    TOPOLOGY = { +        "vnet1": {"ifaces": ["if1"]}, +        "vnet2": {"ifaces": ["if1", "if2"]}, +        "vnet3": {"ifaces": ["if2", "if3"]}, +        "vnet4": {"ifaces": ["if3"]}, +        "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, +        "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, +        "if3": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]} +    } + +    def vnet4_handler(self, vnet): +        ToolsHelper.print_output("/sbin/route add default 198.51.100.1") + +    def vnet3_handler(self, vnet): +        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") +        ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") +        ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0") + +        sel = selectors.DefaultSelector() +        t = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +        t.bind(("0.0.0.0", 1234)) +        t.setblocking(False) +        t.listen() +        sel.register(t, selectors.EVENT_READ, data=None) + +        u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +        u.bind(("0.0.0.0", 4444)) +        u.setblocking(False) +        sel.register(u, selectors.EVENT_READ, data="UDP") + +        while True: +            events = sel.select(timeout=20) +            for key, mask in events: +                sock = key.fileobj +                if key.data is None: +                    conn, addr = sock.accept() +                    print(f"Accepted connection from {addr}") +                    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") +                    events = selectors.EVENT_READ | selectors.EVENT_WRITE +                    sel.register(conn, events, data=data) +                elif key.data == "UDP": +                    recv_data, addr = sock.recvfrom(1024) +                    print(f"Received UDP {recv_data} from {addr}") +                    sock.sendto(b"foo", addr) +                else: +                    if mask & selectors.EVENT_READ: +                        recv_data = sock.recv(1024) +                        print(f"Received TCP {recv_data}") +                        sock.send(b"foo") +                    else: +                        print("Unknown event?") +                        t.close() +                        u.close() +                        return + +    def vnet2_handler(self, vnet): +        ifname = vnet.iface_alias_map["if1"].name + +        ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") +        ToolsHelper.print_output("/sbin/route add default 192.0.2.2") +        ToolsHelper.print_output("/sbin/pfctl -e") +        ToolsHelper.pf_rules([ +            "block", +            "pass inet6 proto icmp6 icmp6-type { neighbrsol, neighbradv }", +            "pass in on %s inet6 af-to inet from 192.0.2.1" % ifname, +        ]) + +        vnet.pipe.send(socket.if_nametoindex("pflog0")) + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_tcp_rst(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + +        import scapy.all as sp + +        # Send a SYN +        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ +            / sp.TCP(dport=1222, flags="S") + +        # Get a reply +        reply = sp.sr1(packet) + +        # We expect to get a RST here. +        tcp = reply.getlayer(sp.TCP) +        assert tcp +        assert "R" in tcp.flags + +        # Now try to SYN to an open port +        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ +            / sp.TCP(dport=1234, flags="S") +        reply = sp.sr1(packet) + +        tcp = reply.getlayer(sp.TCP) +        assert tcp + +        # We don't get RST +        assert "R" not in tcp.flags + +        # We do get SYN|ACK +        assert "S" in tcp.flags +        assert "A" in tcp.flags + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_udp_port_closed(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + +        import scapy.all as sp + +        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ +            / sp.UDP(dport=1222) / sp.Raw("bar") +        reply = sp.sr1(packet, timeout=3) +        print(reply.show()) + +        # We expect an ICMPv6 error, not a UDP reply +        assert not reply.getlayer(sp.UDP) +        icmp = reply.getlayer(sp.ICMPv6DestUnreach) +        assert icmp +        assert icmp.type == 1 +        assert icmp.code == 4 +        udp = reply.getlayer(sp.UDPerror) +        assert udp +        assert udp.dport == 1222 + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_address_unreachable(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + +        import scapy.all as sp + +        packet = sp.IPv6(dst="64:ff9b::203.0.113.2") \ +            / sp.UDP(dport=1222) / sp.Raw("bar") +        reply = sp.sr1(packet, timeout=3) +        print(reply.show()) + +        # We expect an ICMPv6 error, not a UDP reply +        assert not reply.getlayer(sp.UDP) +        icmp = reply.getlayer(sp.ICMPv6DestUnreach) +        assert icmp +        assert icmp.type == 1 +        assert icmp.code == 0 +        udp = reply.getlayer(sp.UDPerror) +        assert udp +        assert udp.dport == 1222 + +        # Check the hop limit +        ip6 = reply.getlayer(sp.IPv6) +        assert ip6.hlim == 61 + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_udp_checksum(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + +        import scapy.all as sp + +        # Send an outbound UDP packet to establish state +        packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \ +            / sp.UDP(sport=3333, dport=4444) / sp.Raw("foo") + +        # Get a reply +        # We'll send the reply without UDP checksum on the IPv4 side +        # but that's not valid for IPv6, so expect pf to update the checksum. +        reply = sp.sr1(packet, timeout=5) + +        udp = reply.getlayer(sp.UDP) +        assert udp +        assert udp.chksum != 0 + +    def common_test_source_addr(self, packet): +        vnet = self.vnet_map["vnet1"] +        sendif = vnet.iface_alias_map["if1"].name + +        import scapy.all as sp + +        print("Outbound:\n") +        packet.show() + +        s = DelayedSend(packet) + +        # We expect an ICMPv6 error here, where we'll verify the source address of +        # the outer packet +        packets = sp.sniff(iface=sendif, timeout=5) + +        for reply in packets: +            print("Reply:\n") +            reply.show() +            icmp = reply.getlayer(sp.ICMPv6TimeExceeded) +            if not icmp: +                continue + +            ip = reply.getlayer(sp.IPv6) +            assert icmp +            assert ip.src == "64:ff9b::c000:202" +            return reply + +        # If we don't find the packet we expect to see +        assert False + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_source_addr_tcp(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") +        import scapy.all as sp + +        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ +            / sp.TCP(sport=1111, dport=2222, flags="S") +        self.common_test_source_addr(packet) + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_source_addr_udp(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") +        import scapy.all as sp + +        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ +            / sp.UDP(sport=1111, dport=2222) / sp.Raw("foo") +        self.common_test_source_addr(packet) + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_source_addr_sctp(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") +        import scapy.all as sp + +        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ +            / sp.SCTP(sport=1111, dport=2222) \ +            / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500) +        self.common_test_source_addr(packet) + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_source_addr_icmp(self): +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") +        import scapy.all as sp + +        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \ +            / sp.ICMPv6EchoRequest() / sp.Raw("foo") +        reply = self.common_test_source_addr(packet) +        icmp = reply.getlayer(sp.ICMPv6EchoRequest) +        assert icmp + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_bad_len(self): +        """ +            PR 288224: we can panic if the IPv6 plen is longer than the packet length. +        """ +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") +        import scapy.all as sp + +        packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2, plen=512) \ +            / sp.ICMPv6EchoRequest() / sp.Raw("foo") +        reply = sp.sr1(packet, timeout=3) +        # We don't expect a reply to a corrupted packet +        assert not reply + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_noip6(self): +        """ +            PR 288263: link-local target address in icmp6 ADVERT can cause NULL deref +        """ +        ifname = self.vnet.iface_alias_map["if1"].name +        gw_mac = self.vnet.iface_alias_map["if1"].epairb.ether +        scopeid = self.wait_object(self.vnet_map["vnet2"].pipe) +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + +        import scapy.all as sp + +        pkt = sp.Ether(dst=gw_mac) \ +            / sp.IPv6(dst="64:ff9b::203.0.113.2") \ +            / sp.ICMPv6ND_NA(tgt="FFA2:%x:2821:125F:1D27:B3B2:3F6F:C43C" % scopeid) +        pkt.show() +        sp.hexdump(pkt) +        s = DelayedSend(pkt, sendif=ifname) + +        packets = sp.sniff(iface=ifname, timeout=5) +        for r in packets: +            r.show() + +        # Try scope id that likely doesn't have an interface at all +        pkt = sp.Ether(dst=gw_mac) \ +            / sp.IPv6(dst="64:ff9b::203.0.113.2") \ +            / sp.ICMPv6ND_NA(tgt="FFA2:%x:2821:125F:1D27:B3B2:3F6F:C43C" % 255) +        pkt.show() +        sp.hexdump(pkt) +        s = DelayedSend(pkt, sendif=ifname) + +        packets = sp.sniff(iface=ifname, timeout=5) +        for r in packets: +            r.show() + +    @pytest.mark.require_user("root") +    @pytest.mark.require_progs(["scapy"]) +    def test_ttl_zero(self): +        """ +            PR 288274: we can use an mbuf after free on TTL = 0 +        """ +        ifname = self.vnet.iface_alias_map["if1"].name +        gw_mac = self.vnet.iface_alias_map["if1"].epairb.ether +        ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + +        import scapy.all as sp + +        pkt = sp.Ether(dst=gw_mac) \ +            / sp.IPv6(dst="64:ff9b::192.0.2.2", hlim=0) \ +            / sp.SCTP(sport=1111, dport=2222) \ +            / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, \ +                a_rwnd=1500, params=[ \ +                    sp.SCTPChunkParamIPv4Addr() \ +                ]) +        pkt.show() +        sp.hexdump(pkt) +        s = DelayedSend(pkt, sendif=ifname) + +        packets = sp.sniff(iface=ifname, timeout=5) +        for r in packets: +            r.show() + | 
