diff options
Diffstat (limited to 'tests/sys/netlink/test_rtnl_ifaddr.py')
-rw-r--r-- | tests/sys/netlink/test_rtnl_ifaddr.py | 762 |
1 files changed, 762 insertions, 0 deletions
diff --git a/tests/sys/netlink/test_rtnl_ifaddr.py b/tests/sys/netlink/test_rtnl_ifaddr.py new file mode 100644 index 000000000000..768bf38153ff --- /dev/null +++ b/tests/sys/netlink/test_rtnl_ifaddr.py @@ -0,0 +1,762 @@ +import ipaddress +import socket +import struct + +import pytest +from atf_python.sys.net.vnet import SingleVnetTestTemplate +from atf_python.sys.netlink.attrs import NlAttr +from atf_python.sys.netlink.attrs import NlAttrIp +from atf_python.sys.netlink.attrs import NlAttrNested +from atf_python.sys.netlink.attrs import NlAttrU32 +from atf_python.sys.netlink.base_headers import NlmBaseFlags +from atf_python.sys.netlink.base_headers import NlmNewFlags +from atf_python.sys.netlink.base_headers import Nlmsghdr +from atf_python.sys.netlink.message import NlMsgType +from atf_python.sys.netlink.netlink import NetlinkTestTemplate +from atf_python.sys.netlink.netlink import Nlsock +from atf_python.sys.netlink.netlink_generic import CarpAttrType +from atf_python.sys.netlink.netlink_generic import CarpGenMessage +from atf_python.sys.netlink.netlink_generic import CarpMsgType +from atf_python.sys.netlink.netlink_route import IfaAttrType +from atf_python.sys.netlink.netlink_route import IfaCacheInfo +from atf_python.sys.netlink.netlink_route import IfafAttrType +from atf_python.sys.netlink.netlink_route import IfafFlags6 +from atf_python.sys.netlink.netlink_route import IfaFlags +from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage +from atf_python.sys.netlink.netlink_route import NlRtMsgType +from atf_python.sys.netlink.netlink_route import RtScope +from atf_python.sys.netlink.utils import enum_or_int +from atf_python.sys.netlink.utils import NlConst + + +class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate): + def setup_method(self, method): + method_name = method.__name__ + if "4" in method_name: + if "nofilter" in method_name: + self.IPV4_PREFIXES = ["192.0.2.1/24", "169.254.169.254/16"] + else: + self.IPV4_PREFIXES = ["192.0.2.1/24"] + if "6" in method_name: + self.IPV6_PREFIXES = ["2001:db8::1/64"] + super().setup_method(method) + self.setup_netlink(NlConst.NETLINK_ROUTE) + + def test_46_nofilter(self): + """Tests that listing outputs both IPv4/IPv6 and interfaces""" + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) + msg.set_request() + self.write_message(msg) + + ret = [] + for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): + ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) + family = rx_msg.base_hdr.ifa_family + scope = rx_msg.base_hdr.ifa_scope + ret.append((ifname, family, scope)) + + ifname = "lo0" + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1 + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1 + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1 + assert len([r for r in ret if r[0] == ifname]) == 3 + + ifname = self.vnet.iface_alias_map["if1"].name + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1 + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1 + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1 + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1 + assert len([r for r in ret if r[0] == ifname]) == 4 + + def test_46_filter_iface(self): + """Tests that listing outputs both IPv4/IPv6 for the specific interface""" + epair_ifname = self.vnet.iface_alias_map["if1"].name + + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) + msg.set_request() + msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) + self.write_message(msg) + + ret = [] + for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): + ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) + family = rx_msg.base_hdr.ifa_family + ret.append((ifname, family, rx_msg)) + + ifname = epair_ifname + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1 + assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2 + assert len(ret) == 3 + + def test_46_filter_family_compat(self): + """Tests that family filtering works with the stripped header""" + + hdr = Nlmsghdr( + nlmsg_len=17, + nlmsg_type=NlRtMsgType.RTM_GETADDR.value, + nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value, + nlmsg_seq=self.helper.get_seq(), + ) + data = bytes(hdr) + struct.pack("@B", socket.AF_INET) + self.nlsock.write_data(data) + + ret = [] + for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): + ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index) + family = rx_msg.base_hdr.ifa_family + ret.append((ifname, family, rx_msg)) + assert len(ret) == 2 + + def filter_iface_family(self, family, num_items): + """Tests that listing outputs IPv4 for the specific interface""" + epair_ifname = self.vnet.iface_alias_map["if1"].name + + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) + msg.set_request() + msg.base_hdr.ifa_family = family + msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname) + self.write_message(msg) + + ret = [] + for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR): + assert family == rx_msg.base_hdr.ifa_family + assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index) + ret.append(rx_msg) + assert len(ret) == num_items + return ret + + def test_4_broadcast(self): + """Tests header/attr output for listing IPv4 ifas on broadcast iface""" + ret = self.filter_iface_family(socket.AF_INET, 1) + # Should be 192.0.2.1/24 + msg = ret[0] + # Family and ifindex has been checked already + assert msg.base_hdr.ifa_prefixlen == 24 + # Ignore IFA_FLAGS for now + assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + + assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1" + assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1" + assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255" + + epair_ifname = self.vnet.iface_alias_map["if1"].name + assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname + + def test_6_broadcast(self): + """Tests header/attr output for listing IPv6 ifas on broadcast iface""" + ret = self.filter_iface_family(socket.AF_INET6, 2) + # Should be 192.0.2.1/24 + if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value: + (gmsg, lmsg) = ret + else: + (lmsg, gmsg) = ret + # Start with global ( 2001:db8::1/64 ) + msg = gmsg + # Family and ifindex has been checked already + assert msg.base_hdr.ifa_prefixlen == 64 + # Ignore IFA_FLAGS for now + assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + + assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1" + assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None + assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None + + epair_ifname = self.vnet.iface_alias_map["if1"].name + assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname + + # Local: fe80::/64 + msg = lmsg + assert msg.base_hdr.ifa_prefixlen == 64 + # Ignore IFA_FLAGS for now + assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value + + addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr) + assert addr.is_link_local + # Verify that ifindex is not emmbedded + assert struct.unpack("!H", addr.packed[2:4])[0] == 0 + assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None + assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None + + epair_ifname = self.vnet.iface_alias_map["if1"].name + assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname + + +class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate): + def setup_method(self, method): + super().setup_method(method) + self.setup_netlink(NlConst.NETLINK_ROUTE) + + def send_check_success(self, msg): + rx_msg = self.get_reply(msg) + assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) + assert rx_msg.error_code == 0 + + @staticmethod + def get_family_from_ip(ip): + if ip.version == 4: + return socket.AF_INET + return socket.AF_INET6 + + def create_msg(self, ifa): + iface = self.vnet.iface_alias_map["if1"] + + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) + msg.set_request() + msg.nl_hdr.nlmsg_flags |= ( + NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value + ) + msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) + msg.base_hdr.ifa_index = iface.ifindex + msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen + return msg + + def get_ifa_list(self, ifindex=0, family=0): + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) + msg.set_request() + msg.base_hdr.ifa_family = family + msg.base_hdr.ifa_index = ifindex + self.write_message(msg) + return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR) + + def find_msg_by_ifa(self, msg_list, ip): + for msg in msg_list: + if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip): + return msg + return None + + def setup_dummy_carp(self, ifindex: int, vhid: int): + self.require_module("carp") + + nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper) + family_id = nlsock.get_genl_family_id("carp") + + msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET) + msg.set_request() + msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid)) + msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex)) + rx_msg = nlsock.get_reply(msg) + + assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) + assert rx_msg.error_code == 0 + + +class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps): + def test_add_4(self): + """Tests IPv4 address addition to the standard broadcast interface""" + ifa = ipaddress.ip_interface("192.0.2.1/24") + ifa_brd = ifa.network.broadcast_address + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 1 + rx_msg = lst[0] + + assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + + assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) + + @pytest.mark.parametrize( + "brd", + [ + pytest.param((32, True, "192.0.2.1"), id="auto_32"), + pytest.param((31, True, "255.255.255.255"), id="auto_31"), + pytest.param((30, True, "192.0.2.3"), id="auto_30"), + pytest.param((30, False, "192.0.2.2"), id="custom_30"), + pytest.param((24, False, "192.0.2.7"), id="custom_24"), + ], + ) + def test_add_4_brd(self, brd): + """Tests proper broadcast setup when adding IPv4 ifa""" + plen, auto_brd, ifa_brd_str = brd + ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen)) + iface = self.vnet.iface_alias_map["if1"] + ifa_brd = ipaddress.ip_address(ifa_brd_str) + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + if not auto_brd: + msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 1 + rx_msg = lst[0] + + assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + + assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) + + def test_add_6(self): + ifa = ipaddress.ip_interface("2001:db8::1/64") + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 2 + rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg_gu is not None + + assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) + + def test_add_4_carp(self): + ifa = ipaddress.ip_interface("192.0.2.1/24") + ifa_brd = ifa.network.broadcast_address + iface = self.vnet.iface_alias_map["if1"] + vhid = 77 + + self.setup_dummy_carp(iface.ifindex, vhid) + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) + attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] + msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 1 + rx_msg = lst[0] + + assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + + assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd) + ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) + assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid + + def test_add_6_carp(self): + ifa = ipaddress.ip_interface("2001:db8::1/64") + iface = self.vnet.iface_alias_map["if1"] + vhid = 77 + + self.setup_dummy_carp(iface.ifindex, vhid) + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)] + msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 2 + rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg_gu is not None + + assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) + ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD) + assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid + + def test_add_6_lifetime(self): + ifa = ipaddress.ip_interface("2001:db8::1/64") + iface = self.vnet.iface_alias_map["if1"] + pref_time = 43200 + valid_time = 86400 + + ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time) + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 2 + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is not None + + ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci + assert pref_time - 5 <= ci.ifa_prefered <= pref_time + assert valid_time - 5 <= ci.ifa_valid <= valid_time + assert ci.cstamp > 0 + assert ci.tstamp > 0 + assert ci.tstamp >= ci.cstamp + + @pytest.mark.parametrize( + "flags_str", + [ + "autoconf", + "deprecated", + "autoconf,deprecated", + "prefer_source", + ], + ) + def test_add_6_flags(self, flags_str): + ifa = ipaddress.ip_interface("2001:db8::1/64") + iface = self.vnet.iface_alias_map["if1"] + + flags_map = { + "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF}, + "deprecated": { + "nl": IfaFlags.IFA_F_DEPRECATED, + "f": IfafFlags6.IN6_IFF_DEPRECATED, + }, + "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE}, + } + nl_flags = 0 + f_flags = 0 + + for flag_str in flags_str.split(","): + d = flags_map.get(flag_str, {}) + nl_flags |= enum_or_int(d.get("nl", 0)) + f_flags |= enum_or_int(d.get("f", 0)) + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags)) + attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)] + msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd)) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 2 + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is not None + + assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags + ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD) + assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags + + def test_add_4_empty_message(self): + """Tests correct failure w/ empty message""" + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value) + msg.set_request() + msg.nl_hdr.nlmsg_flags |= ( + NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value + ) + + rx_msg = self.get_reply(msg) + assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) + assert rx_msg.error_code != 0 + + def test_add_4_empty_ifindex(self): + """Tests correct failure w/ empty ifindex""" + ifa = ipaddress.ip_interface("192.0.2.1/24") + ifa_brd = ifa.network.broadcast_address + + msg = self.create_msg(ifa) + msg.base_hdr.ifa_index = 0 + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) + + rx_msg = self.get_reply(msg) + assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) + assert rx_msg.error_code != 0 + + def test_add_4_empty_addr(self): + """Tests correct failure w/ empty address""" + ifa = ipaddress.ip_interface("192.0.2.1/24") + ifa_brd = ifa.network.broadcast_address + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) + + rx_msg = self.get_reply(msg) + assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) + assert rx_msg.error_code != 0 + + @pytest.mark.parametrize( + "ifa_str", + [ + pytest.param("192.0.2.1/32", id="ipv4_host"), + pytest.param("192.0.2.1/24", id="ipv4_prefix"), + pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), + pytest.param("2001:db8::1/128", id="ipv6_gu_host"), + ], + ) + @pytest.mark.parametrize( + "tlv", + [ + pytest.param("local", id="ifa_local"), + pytest.param("address", id="ifa_address"), + ], + ) + def test_del(self, tlv, ifa_str): + """Tests address deletion from the standard broadcast interface""" + ifa = ipaddress.ip_interface(ifa_str) + ifa_brd = ifa.network.broadcast_address + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd))) + + self.send_check_success(msg) + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is not None + + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) + msg.set_request() + msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) + msg.base_hdr.ifa_index = iface.ifindex + msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen + + if tlv == "local": + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + if tlv == "address": + msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) + + self.send_check_success(msg) + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is None + + +class TestRtNlIfaddrOpsP2p(RtnlIfaOps): + IFTYPE = "gif" + + @pytest.mark.parametrize( + "ifa_pair", + [ + pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"), + pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"), + pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"), + pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"), + pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"), + ], + ) + def test_add_4(self, ifa_pair): + """Tests IPv4 address addition to the p2p interface""" + ifa = ipaddress.ip_interface(ifa_pair[0]) + peer_ip = ipaddress.ip_address(ifa_pair[1]) + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 1 + rx_msg = lst[0] + + assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + + assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) + + @pytest.mark.parametrize( + "ifa_pair", + [ + pytest.param( + ["2001:db8::1/64", "2001:db8::2"], + id="dst_inside_64", + marks=pytest.mark.xfail(reason="currently fails"), + ), + pytest.param( + ["2001:db8::1/127", "2001:db8::2"], + id="dst_inside_127", + marks=pytest.mark.xfail(reason="currently fails"), + ), + pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"), + pytest.param( + ["2001:db8::1/64", "2001:db8:2::2"], + id="dst_outside_64", + marks=pytest.mark.xfail(reason="currently fails"), + ), + ], + ) + def test_add_6(self, ifa_pair): + """Tests IPv6 address addition to the p2p interface""" + ifa = ipaddress.ip_interface(ifa_pair[0]) + peer_ip = ipaddress.ip_address(ifa_pair[1]) + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 2 + rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip) + assert rx_msg_gu is not None + + assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) + assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip) + + @pytest.mark.parametrize( + "ifa_pair", + [ + pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"), + pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"), + pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"), + ], + ) + @pytest.mark.parametrize( + "tlv_pair", + [ + pytest.param(["a", ""], id="ifa_addr=addr"), + pytest.param(["", "a"], id="ifa_local=addr"), + pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"), + ], + ) + def test_del(self, tlv_pair, ifa_pair): + """Tests address deletion from the P2P interface""" + ifa = ipaddress.ip_interface(ifa_pair[0]) + peer_ip = ipaddress.ip_address(ifa_pair[1]) + iface = self.vnet.iface_alias_map["if1"] + ifa_addr_str, ifa_local_str = tlv_pair + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) + + self.send_check_success(msg) + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + rx_msg = self.find_msg_by_ifa(lst, peer_ip) + assert rx_msg is not None + + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) + msg.set_request() + msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) + msg.base_hdr.ifa_index = iface.ifindex + msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen + + if "a" in ifa_addr_str: + msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) + if "p" in ifa_addr_str: + msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip))) + if "a" in ifa_local_str: + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + if "p" in ifa_local_str: + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip))) + + self.send_check_success(msg) + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is None + + +class TestRtNlAddIfaddrLo(RtnlIfaOps): + IFTYPE = "lo" + + @pytest.mark.parametrize( + "ifa_str", + [ + pytest.param("192.0.2.1/24", id="prefix"), + pytest.param("192.0.2.1/32", id="host"), + ], + ) + def test_add_4(self, ifa_str): + """Tests IPv4 address addition to the loopback interface""" + ifa = ipaddress.ip_interface(ifa_str) + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 1 + rx_msg = lst[0] + + assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + + assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) + assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip) + + @pytest.mark.parametrize( + "ifa_str", + [ + pytest.param("2001:db8::1/64", id="gu_prefix"), + pytest.param("2001:db8::1/128", id="gu_host"), + ], + ) + def test_add_6(self, ifa_str): + """Tests IPv6 address addition to the loopback interface""" + ifa = ipaddress.ip_interface(ifa_str) + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + + self.send_check_success(msg) + + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + assert len(lst) == 2 # link-local should be auto-created as well + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is not None + + assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen + assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value + assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip) + + @pytest.mark.parametrize( + "ifa_str", + [ + pytest.param("192.0.2.1/32", id="ipv4_host"), + pytest.param("192.0.2.1/24", id="ipv4_prefix"), + pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"), + pytest.param("2001:db8::1/128", id="ipv6_gu_host"), + ], + ) + @pytest.mark.parametrize( + "tlv", + [ + pytest.param("local", id="ifa_local"), + pytest.param("address", id="ifa_address"), + ], + ) + def test_del(self, tlv, ifa_str): + """Tests address deletion from the loopback interface""" + ifa = ipaddress.ip_interface(ifa_str) + iface = self.vnet.iface_alias_map["if1"] + + msg = self.create_msg(ifa) + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + + self.send_check_success(msg) + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is not None + + msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value) + msg.set_request() + msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip) + msg.base_hdr.ifa_index = iface.ifindex + msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen + + if tlv == "local": + msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip))) + if tlv == "address": + msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip))) + + self.send_check_success(msg) + lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip)) + rx_msg = self.find_msg_by_ifa(lst, ifa.ip) + assert rx_msg is None |