aboutsummaryrefslogtreecommitdiff
path: root/tests/sys/netpfil/pf/sctp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sys/netpfil/pf/sctp.py')
-rw-r--r--tests/sys/netpfil/pf/sctp.py181
1 files changed, 177 insertions, 4 deletions
diff --git a/tests/sys/netpfil/pf/sctp.py b/tests/sys/netpfil/pf/sctp.py
index 6042badffb64..da42ce527195 100644
--- a/tests/sys/netpfil/pf/sctp.py
+++ b/tests/sys/netpfil/pf/sctp.py
@@ -268,7 +268,8 @@ class TestSCTP(VnetTestTemplate):
ToolsHelper.print_output("/sbin/pfctl -e")
ToolsHelper.pf_rules([
"block proto sctp",
- "pass inet proto sctp to 192.0.2.0/24"])
+ "pass inet proto sctp to 192.0.2.0/24",
+ "pass on lo"])
# Sanity check, we can communicate with the primary address.
client = SCTPClient("192.0.2.3", 1234)
@@ -305,6 +306,7 @@ class TestSCTP(VnetTestTemplate):
ToolsHelper.print_output("/sbin/pfctl -e")
ToolsHelper.pf_rules([
"block proto sctp",
+ "pass on lo",
"pass inet proto sctp from 192.0.2.0/24"])
# Sanity check, we can communicate with the primary address.
@@ -362,7 +364,7 @@ class TestSCTP(VnetTestTemplate):
@pytest.mark.require_user("root")
- def test_permutation(self):
+ def test_permutation_if_bound(self):
# Test that we generate all permutations of src/dst addresses.
# Assign two addresses to each end, and check for the expected states
srv_vnet = self.vnet_map["vnet2"]
@@ -374,6 +376,7 @@ class TestSCTP(VnetTestTemplate):
ToolsHelper.pf_rules([
"set state-policy if-bound",
"block proto sctp",
+ "pass on lo",
"pass inet proto sctp to 192.0.2.0/24"])
# Sanity check, we can communicate with the primary address.
@@ -387,11 +390,146 @@ class TestSCTP(VnetTestTemplate):
# Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4
states = ToolsHelper.get_output("/sbin/pfctl -ss")
print(states)
- assert re.search(r".*sctp 192.0.2.1:.*192.0.2.3:1234", states)
+ assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states)
+ assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states)
+ assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.3:1234", states)
+ assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.2:1234", states)
+
+ @pytest.mark.require_user("root")
+ def test_permutation_floating(self):
+ # Test that we generate all permutations of src/dst addresses.
+ # Assign two addresses to each end, and check for the expected states
+ srv_vnet = self.vnet_map["vnet2"]
+
+ ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
+ ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname)
+
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.pf_rules([
+ "block proto sctp",
+ "pass on lo",
+ "pass inet proto sctp to 192.0.2.0/24"])
+
+ # Sanity check, we can communicate with the primary address.
+ client = SCTPClient("192.0.2.3", 1234)
+ client.send(b"hello", 0)
+ rcvd = self.wait_object(srv_vnet.pipe)
+ print(rcvd)
+ assert rcvd['ppid'] == 0
+ assert rcvd['data'] == "hello"
+
+ # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4
+ states = ToolsHelper.get_output("/sbin/pfctl -ss")
+ print(states)
+ assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
- assert re.search(r".*sctp 192.0.2.4:.*192.0.2.3:1234", states)
+ assert re.search(r"all sctp 192.0.2.4:.*192.0.2.3:1234", states)
assert re.search(r"all sctp 192.0.2.4:.*192.0.2.2:1234", states)
+ @pytest.mark.require_user("root")
+ def test_limit_addresses(self):
+ srv_vnet = self.vnet_map["vnet2"]
+
+ ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
+ for i in range(0, 16):
+ ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.%d/24" % (ifname, 4 + i))
+
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.pf_rules([
+ "block proto sctp",
+ "pass on lo",
+ "pass inet proto sctp to 192.0.2.0/24"])
+
+ # Set up a connection, which will try to create states for all addresses
+ # we have assigned
+ client = SCTPClient("192.0.2.3", 1234)
+ client.send(b"hello", 0)
+ rcvd = self.wait_object(srv_vnet.pipe)
+ print(rcvd)
+ assert rcvd['ppid'] == 0
+ assert rcvd['data'] == "hello"
+
+ # But the number should be limited to 9 (original + 8 extra)
+ states = ToolsHelper.get_output("/sbin/pfctl -ss | grep 192.0.2.2")
+ print(states)
+ assert(states.count('\n') <= 9)
+
+ @pytest.mark.require_user("root")
+ def test_disallow_related(self):
+ srv_vnet = self.vnet_map["vnet2"]
+
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.pf_rules([
+ "block proto sctp",
+ "pass inet proto sctp to 192.0.2.3",
+ "pass on lo"])
+
+ # Sanity check, we can communicate with the primary address.
+ client = SCTPClient("192.0.2.3", 1234)
+ client.send(b"hello", 0)
+ rcvd = self.wait_object(srv_vnet.pipe)
+ print(rcvd)
+ assert rcvd['ppid'] == 0
+ assert rcvd['data'] == "hello"
+
+ # This shouldn't work
+ success=False
+ try:
+ client.newpeer("192.0.2.2")
+ client.send(b"world", 0)
+ rcvd = self.wait_object(srv_vnet.pipe)
+ print(rcvd)
+ assert rcvd['ppid'] == 0
+ assert rcvd['data'] == "world"
+ success=True
+ except:
+ success=False
+ assert not success
+
+ # Check that we have a state for 192.0.2.3, but not 192.0.2.2 to 192.0.2.1
+ states = ToolsHelper.get_output("/sbin/pfctl -ss")
+ assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
+ assert not re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
+
+ @pytest.mark.require_user("root")
+ def test_allow_related(self):
+ srv_vnet = self.vnet_map["vnet2"]
+
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.pf_rules([
+ "set state-policy if-bound",
+ "block proto sctp",
+ "pass inet proto sctp to 192.0.2.3 keep state (allow-related)",
+ "pass on lo"])
+
+ # Sanity check, we can communicate with the primary address.
+ client = SCTPClient("192.0.2.3", 1234)
+ client.send(b"hello", 0)
+ rcvd = self.wait_object(srv_vnet.pipe)
+ print(rcvd)
+ assert rcvd['ppid'] == 0
+ assert rcvd['data'] == "hello"
+
+ success=False
+ try:
+ client.newpeer("192.0.2.2")
+ client.send(b"world", 0)
+ rcvd = self.wait_object(srv_vnet.pipe)
+ print(rcvd)
+ assert rcvd['ppid'] == 0
+ assert rcvd['data'] == "world"
+ success=True
+ finally:
+ # Debug output
+ ToolsHelper.print_output("/sbin/pfctl -ss")
+ ToolsHelper.print_output("/sbin/pfctl -sr -vv")
+ assert success
+
+ # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1
+ states = ToolsHelper.get_output("/sbin/pfctl -ss")
+ assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states)
+ assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states)
+
class TestSCTPv6(VnetTestTemplate):
REQUIRED_MODULES = ["sctp", "pf"]
TOPOLOGY = {
@@ -417,6 +555,7 @@ class TestSCTPv6(VnetTestTemplate):
ToolsHelper.print_output("/sbin/pfctl -e")
ToolsHelper.pf_rules([
"block proto sctp",
+ "pass on lo",
"pass inet6 proto sctp to 2001:db8::0/64"])
# Sanity check, we can communicate with the primary address.
@@ -454,6 +593,7 @@ class TestSCTPv6(VnetTestTemplate):
ToolsHelper.print_output("/sbin/pfctl -e")
ToolsHelper.pf_rules([
"block proto sctp",
+ "pass on lo",
"pass inet6 proto sctp from 2001:db8::/64"])
# Sanity check, we can communicate with the primary address.
@@ -520,7 +660,40 @@ class TestSCTPv6(VnetTestTemplate):
ToolsHelper.print_output("/sbin/pfctl -e")
ToolsHelper.pf_rules([
+ "set state-policy if-bound",
+ "block proto sctp",
+ "pass on lo",
+ "pass inet6 proto sctp to 2001:db8::0/64"])
+
+ # Sanity check, we can communicate with the primary address.
+ client = SCTPClient("2001:db8::3", 1234)
+ client.send(b"hello", 0)
+ rcvd = self.wait_object(srv_vnet.pipe)
+ print(rcvd)
+ assert rcvd['ppid'] == 0
+ assert rcvd['data'] == "hello"
+
+ # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4
+ states = ToolsHelper.get_output("/sbin/pfctl -ss")
+ print(states)
+ assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states)
+ assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
+ assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states)
+ assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states)
+
+ @pytest.mark.require_user("root")
+ def test_permutation_floating(self):
+ # Test that we generate all permutations of src/dst addresses.
+ # Assign two addresses to each end, and check for the expected states
+ srv_vnet = self.vnet_map["vnet2"]
+
+ ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
+ ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname)
+
+ ToolsHelper.print_output("/sbin/pfctl -e")
+ ToolsHelper.pf_rules([
"block proto sctp",
+ "pass on lo",
"pass inet6 proto sctp to 2001:db8::0/64"])
# Sanity check, we can communicate with the primary address.