aboutsummaryrefslogtreecommitdiff
path: root/tests/sys/netlink/test_rtnl_iface.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sys/netlink/test_rtnl_iface.py')
-rw-r--r--tests/sys/netlink/test_rtnl_iface.py355
1 files changed, 355 insertions, 0 deletions
diff --git a/tests/sys/netlink/test_rtnl_iface.py b/tests/sys/netlink/test_rtnl_iface.py
new file mode 100644
index 000000000000..41cb4d16de94
--- /dev/null
+++ b/tests/sys/netlink/test_rtnl_iface.py
@@ -0,0 +1,355 @@
+import errno
+import socket
+
+import pytest
+from atf_python.sys.netlink.netlink_route import IflattrType
+from atf_python.sys.netlink.netlink_route import IflinkInfo
+from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
+from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
+from atf_python.sys.netlink.netlink import NetlinkTestTemplate
+from atf_python.sys.netlink.attrs import NlAttrNested
+from atf_python.sys.netlink.attrs import NlAttrStr
+from atf_python.sys.netlink.attrs import NlAttrStrn
+from atf_python.sys.netlink.attrs import NlAttrU16
+from atf_python.sys.netlink.attrs import NlAttrU32
+from atf_python.sys.netlink.utils import NlConst
+from atf_python.sys.netlink.base_headers import NlmBaseFlags
+from atf_python.sys.netlink.base_headers import NlmNewFlags
+from atf_python.sys.netlink.base_headers import NlMsgType
+from atf_python.sys.netlink.netlink_route import NlRtMsgType
+from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
+from atf_python.sys.net.vnet import SingleVnetTestTemplate
+from atf_python.sys.net.tools import ToolsHelper
+
+
+class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
+ def setup_method(self, method):
+ super().setup_method(method)
+ self.setup_netlink(NlConst.NETLINK_ROUTE)
+
+ def get_interface_byname(self, ifname):
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
+ self.write_message(msg)
+ while True:
+ rx_msg = self.read_message()
+ if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
+ if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
+ if rx_msg.error_code != 0:
+ raise ValueError("unable to get interface {}".format(ifname))
+ elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
+ return rx_msg
+ else:
+ raise ValueError("bad message")
+
+ def test_get_iface_byname_error(self):
+ """Tests error on fetching non-existing interface name"""
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == errno.ENODEV
+
+ def test_get_iface_byindex_error(self):
+ """Tests error on fetching non-existing interface index"""
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = 2147483647
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == errno.ENODEV
+
+ @pytest.mark.require_user("root")
+ def test_create_iface_plain(self):
+ """Tests loopback creation w/o any parameters"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ self.get_interface_byname("lo10")
+
+ @pytest.mark.require_user("root")
+ def test_create_iface_plain_retvals(self):
+ """Tests loopback creation w/o any parameters"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+ assert rx_msg.cookie is not None
+ nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
+ nla_map = {n.nla_type: n for n in nla_list}
+ assert IflattrType.IFLA_IFNAME.value in nla_map
+ assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
+ assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
+ assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
+
+ lo_msg = self.get_interface_byname("lo10")
+ assert (
+ lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
+ )
+
+ @pytest.mark.require_user("root")
+ def test_create_iface_attrs(self):
+ """Tests interface creation with additional properties"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ # Custom attributes
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
+ msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ iface_msg = self.get_interface_byname("lo10")
+ assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
+ assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
+
+ @pytest.mark.require_user("root")
+ def test_modify_iface_attrs(self):
+ """Tests interface modifications"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+
+ # Custom attributes
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
+ msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ iface_msg = self.get_interface_byname("lo10")
+ assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
+ assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
+
+ @pytest.mark.require_user("root")
+ def test_delete_iface(self):
+ """Tests interface modifications"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ iface_msg = self.get_interface_byname("lo10")
+ iface_idx = iface_msg.base_hdr.ifi_index
+
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = iface_idx
+ # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = 2147483647
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == errno.ENODEV
+
+ @pytest.mark.require_user("root")
+ def test_dump_ifaces_many(self):
+ """Tests if interface dummp is not missing interfaces"""
+
+ ifmap = {}
+ ifmap[socket.if_nametoindex("lo0")] = "lo0"
+
+ for i in range(40):
+ ifname = "lo{}".format(i + 1)
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
+ nla_map = {n.nla_type: n for n in nla_list}
+ assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
+ ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
+ assert ifindex > 0
+ assert ifindex not in ifmap
+ ifmap[ifindex] = ifname
+
+ # Dump all interfaces and check if the output matches ifmap
+ kernel_ifmap = {}
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ self.write_message(msg)
+ while True:
+ rx_msg = self.read_message()
+ if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
+ raise ValueError(
+ "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
+ )
+ if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
+ raise ValueError("unexpected message {}".format(rx_msg))
+ if rx_msg.is_type(NlMsgType.NLMSG_DONE):
+ break
+ if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
+ raise ValueError("unexpected message {}".format(rx_msg))
+
+ ifindex = rx_msg.base_hdr.ifi_index
+ assert ifindex == rx_msg.base_hdr.ifi_index
+ ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
+ if ifname.startswith("lo"):
+ kernel_ifmap[ifindex] = ifname
+ assert kernel_ifmap == ifmap
+
+ #
+ # *
+ # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
+ # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
+ # * {{nla_len=8, nla_type=IFLA_LINK}, 2},
+ # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
+ # * {{nla_len=24, nla_type=IFLA_LINKINFO},
+ # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
+ # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
+ # */
+ @pytest.mark.require_user("root")
+ def test_create_vlan_plain(self):
+ """Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
+ self.require_module("if_vlan")
+ os_ifname = self.vnet.iface_alias_map["if1"].name
+ ifindex = socket.if_nametoindex(os_ifname)
+
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = ifindex
+
+ msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
+
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
+ NlAttrNested(
+ IflinkInfo.IFLA_INFO_DATA,
+ [
+ NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
+ ],
+ ),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ ToolsHelper.print_net_debug()
+ self.get_interface_byname("vlan22")
+ # ToolsHelper.print_net_debug()