aboutsummaryrefslogtreecommitdiff
path: root/tests/sys/netpfil/pf/frag6.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sys/netpfil/pf/frag6.py')
-rw-r--r--tests/sys/netpfil/pf/frag6.py248
1 files changed, 232 insertions, 16 deletions
diff --git a/tests/sys/netpfil/pf/frag6.py b/tests/sys/netpfil/pf/frag6.py
index 28b1829d418c..26ae7af7c90c 100644
--- a/tests/sys/netpfil/pf/frag6.py
+++ b/tests/sys/netpfil/pf/frag6.py
@@ -1,25 +1,13 @@
import pytest
import logging
-import threading
-import time
+import random
logging.getLogger("scapy").setLevel(logging.CRITICAL)
+from utils import DelayedSend
from atf_python.sys.net.tools import ToolsHelper
from atf_python.sys.net.vnet import VnetTestTemplate
-class DelayedSend(threading.Thread):
- def __init__(self, packet):
- threading.Thread.__init__(self)
- self._packet = packet
-
- self.start()
-
- def run(self):
- import scapy.all as sp
- time.sleep(1)
- sp.send(self._packet)
-
class TestFrag6(VnetTestTemplate):
- REQUIRED_MODULES = ["pf"]
+ REQUIRED_MODULES = ["pf", "dummymbuf"]
TOPOLOGY = {
"vnet1": {"ifaces": ["if1"]},
"vnet2": {"ifaces": ["if1"]},
@@ -27,18 +15,22 @@ class TestFrag6(VnetTestTemplate):
}
def vnet2_handler(self, vnet):
+ ifname = vnet.iface_alias_map["if1"].name
ToolsHelper.print_output("/sbin/pfctl -e")
ToolsHelper.pf_rules([
- "scrub fragment reassemble",
+ "scrub fragment reassemble min-ttl 10",
"pass",
"block in inet6 proto icmp6 icmp6-type echoreq",
])
+ ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6")
+ ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname)
def check_ping_reply(self, packet):
print(packet)
return False
@pytest.mark.require_user("root")
+ @pytest.mark.require_progs(["scapy"])
def test_dup_frag_hdr(self):
"Test packets with duplicate fragment headers"
srv_vnet = self.vnet_map["vnet2"]
@@ -58,3 +50,227 @@ class TestFrag6(VnetTestTemplate):
timeout=3)
for p in packets:
assert not p.getlayer(sp.ICMPv6EchoReply)
+
+ @pytest.mark.require_user("root")
+ @pytest.mark.require_progs(["scapy"])
+ def test_overlong(self):
+ "Test overly long fragmented packet"
+
+ # Import in the correct vnet, so at to not confuse Scapy
+ import scapy.all as sp
+
+ curr = 0
+ pkts = []
+
+ frag_id = random.randint(0,0xffffffff)
+ gran = 1200
+
+ i = 0
+ while curr <= 65535:
+ ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2")
+ more = True
+ g = gran
+ if curr + gran > 65535:
+ more = False
+ g = 65530 - curr
+ if i == 0:
+ pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \
+ sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
+ else:
+ pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
+ pkts.append(pkt)
+ curr += gran
+ i += 1
+
+ sp.send(pkts, inter = 0.1)
+
+class TestFrag6HopHyHop(VnetTestTemplate):
+ REQUIRED_MODULES = ["pf"]
+ TOPOLOGY = {
+ "vnet1": {"ifaces": ["if1", "if2"]},
+ "vnet2": {"ifaces": ["if1", "if2"]},
+ "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
+ "if2": {"prefixes6": [("2001:db8:666::1/64", "2001:db8:1::2/64")]},
+ }
+
+ def vnet2_handler(self, vnet):
+ ifname = vnet.iface_alias_map["if1"].name
+ ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
+ ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::1 00:01:02:03:04:05")
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.print_output("/sbin/pfctl -x loud")
+ ToolsHelper.pf_rules([
+ "scrub fragment reassemble min-ttl 10",
+ "pass allow-opts",
+ ])
+
+ @pytest.mark.require_user("root")
+ @pytest.mark.require_progs(["scapy"])
+ def test_hop_by_hop(self):
+ "Verify that we reject non-first hop-by-hop headers"
+ if1 = self.vnet.iface_alias_map["if1"].name
+ if2 = self.vnet.iface_alias_map["if2"].name
+ ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2")
+ ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2")
+
+ # Import in the correct vnet, so at to not confuse Scapy
+ import scapy.all as sp
+
+ # A hop-by-hop header is accepted if it's the first header
+ pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
+ / sp.IPv6ExtHdrHopByHop() \
+ / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
+ pkt.show()
+
+ # Delay the send so the sniffer is running when we transmit.
+ s = DelayedSend(pkt)
+
+ replies = sp.sniff(iface=if2, timeout=3)
+ found = False
+ for p in replies:
+ p.show()
+ ip6 = p.getlayer(sp.IPv6)
+ hbh = p.getlayer(sp.IPv6ExtHdrHopByHop)
+ icmp6 = p.getlayer(sp.ICMPv6EchoRequest)
+
+ if not ip6 or not icmp6:
+ continue
+ assert ip6.src == "2001:db8::1"
+ assert ip6.dst == "2001:db8:1::1"
+ assert hbh
+ assert icmp6
+ found = True
+ assert found
+
+ # A hop-by-hop header causes the packet to be dropped if it's not the
+ # first extension header
+ pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
+ / sp.IPv6ExtHdrFragment(offset=0, m=0) \
+ / sp.IPv6ExtHdrHopByHop() \
+ / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
+ pkt2 = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
+ / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
+
+ # Delay the send so the sniffer is running when we transmit.
+ ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2")
+
+ s = DelayedSend([ pkt2, pkt ])
+ replies = sp.sniff(iface=if2, timeout=10)
+ found = False
+ for p in replies:
+ # Expect to find the packet without the hop-by-hop header, not the
+ # one with
+ p.show()
+ ip6 = p.getlayer(sp.IPv6)
+ hbh = p.getlayer(sp.IPv6ExtHdrHopByHop)
+ icmp6 = p.getlayer(sp.ICMPv6EchoRequest)
+
+ if not ip6 or not icmp6:
+ continue
+ assert ip6.src == "2001:db8::1"
+ assert ip6.dst == "2001:db8:1::1"
+ assert not hbh
+ assert icmp6
+ found = True
+ assert found
+
+class TestFrag6_Overlap(VnetTestTemplate):
+ REQUIRED_MODULES = ["pf"]
+ TOPOLOGY = {
+ "vnet1": {"ifaces": ["if1"]},
+ "vnet2": {"ifaces": ["if1"]},
+ "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
+ }
+
+ def vnet2_handler(self, vnet):
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.print_output("/sbin/pfctl -x loud")
+ ToolsHelper.pf_rules([
+ "scrub fragment reassemble",
+ "pass",
+ ])
+
+ @pytest.mark.require_user("root")
+ @pytest.mark.require_progs(["scapy"])
+ def test_overlap(self):
+ "Ensure we discard packets with overlapping fragments"
+
+ # Import in the correct vnet, so at to not confuse Scapy
+ import scapy.all as sp
+
+ packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
+ / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90))
+ frags = sp.fragment6(packet, 128)
+ assert len(frags) == 3
+
+ f = frags[0].getlayer(sp.IPv6ExtHdrFragment)
+ # Fragment with overlap
+ overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
+ / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \
+ / sp.raw(bytes.fromhex('f00f') * 4)
+ frags = [ frags[0], frags[1], overlap, frags[2] ]
+
+ # Delay the send so the sniffer is running when we transmit.
+ s = DelayedSend(frags)
+
+ packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
+ timeout=3)
+ for p in packets:
+ p.show()
+ assert not p.getlayer(sp.ICMPv6EchoReply)
+
+class TestFrag6_RouteTo(VnetTestTemplate):
+ REQUIRED_MODULES = ["pf"]
+ TOPOLOGY = {
+ "vnet1": {"ifaces": ["if1"]},
+ "vnet2": {"ifaces": ["if1", "if2"]},
+ "vnet3": {"ifaces": ["if2"]},
+ "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
+ "if2": {"prefixes6": [("2001:db8:1::1/64", "2001:db8:1::2/64")]},
+ }
+
+ def vnet2_handler(self, vnet):
+ if2name = vnet.iface_alias_map["if2"].name
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.print_output("/sbin/pfctl -x loud")
+ ToolsHelper.pf_rules([
+ "scrub fragment reassemble",
+ "pass in route-to (%s 2001:db8:1::2) from 2001:db8::1 to 2001:db8:666::1" % if2name,
+ ])
+
+ ToolsHelper.print_output("/sbin/ifconfig %s mtu 1300" % if2name)
+ ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
+
+ def vnet3_handler(self, vnet):
+ pass
+
+ @pytest.mark.require_user("root")
+ @pytest.mark.require_progs(["scapy"])
+ def test_too_big(self):
+ ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2")
+
+ # Import in the correct vnet, so at to not confuse Scapy
+ import scapy.all as sp
+
+ pkt = sp.IPv6(dst="2001:db8:666::1") \
+ / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 3000))
+ frags = sp.fragment6(pkt, 1320)
+
+ reply = sp.sr1(frags, timeout=3)
+ if reply:
+ reply.show()
+
+ assert reply
+
+ ip6 = reply.getlayer(sp.IPv6)
+ icmp6 = reply.getlayer(sp.ICMPv6PacketTooBig)
+ err_ip6 = reply.getlayer(sp.IPerror6)
+
+ assert ip6
+ assert ip6.src == "2001:db8::2"
+ assert ip6.dst == "2001:db8::1"
+ assert icmp6
+ assert icmp6.mtu == 1300
+ assert err_ip6
+ assert err_ip6.src == "2001:db8::1"
+ assert err_ip6.dst == "2001:db8:666::1"