aboutsummaryrefslogtreecommitdiff
path: root/tests/sys/netlink
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sys/netlink')
-rw-r--r--tests/sys/netlink/Makefile17
-rw-r--r--tests/sys/netlink/netlink_socket.c368
-rw-r--r--tests/sys/netlink/test_netlink_message_writer.py39
-rw-r--r--tests/sys/netlink/test_nl_core.py33
-rw-r--r--tests/sys/netlink/test_rtnl_iface.py355
-rw-r--r--tests/sys/netlink/test_rtnl_ifaddr.py762
-rw-r--r--tests/sys/netlink/test_rtnl_neigh.py53
-rw-r--r--tests/sys/netlink/test_rtnl_route.py142
-rw-r--r--tests/sys/netlink/test_snl.c240
-rw-r--r--tests/sys/netlink/test_snl_generic.c110
10 files changed, 2119 insertions, 0 deletions
diff --git a/tests/sys/netlink/Makefile b/tests/sys/netlink/Makefile
new file mode 100644
index 000000000000..c07ef8663867
--- /dev/null
+++ b/tests/sys/netlink/Makefile
@@ -0,0 +1,17 @@
+PACKAGE= tests
+WARNS?= 1
+
+TESTSDIR= ${TESTSBASE}/sys/netlink
+
+ATF_TESTS_C+= netlink_socket
+ATF_TESTS_C+= test_snl test_snl_generic
+ATF_TESTS_PYTEST += test_nl_core.py
+ATF_TESTS_PYTEST += test_rtnl_iface.py
+ATF_TESTS_PYTEST += test_rtnl_ifaddr.py
+ATF_TESTS_PYTEST += test_rtnl_neigh.py
+ATF_TESTS_PYTEST += test_rtnl_route.py
+ATF_TESTS_PYTEST += test_netlink_message_writer.py
+
+CFLAGS+= -I${.CURDIR:H:H:H}
+
+.include <bsd.test.mk>
diff --git a/tests/sys/netlink/netlink_socket.c b/tests/sys/netlink/netlink_socket.c
new file mode 100644
index 000000000000..cee864bb9bab
--- /dev/null
+++ b/tests/sys/netlink/netlink_socket.c
@@ -0,0 +1,368 @@
+/*-
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Copyright (c) 2023 Gleb Smirnoff <glebius@FreeBSD.org>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <sys/param.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>
+#include <sys/socket.h>
+#include <sys/module.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <netlink/netlink.h>
+#include <netlink/netlink_route.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <atf-c.h>
+
+static struct itimerval itv = {
+ .it_interval = { 0, 0 },
+ .it_value = { 1, 0 }, /* one second */
+};
+static sig_atomic_t timer_done = 0;
+static void
+sigalarm(int sig __unused)
+{
+
+ timer_done = 1;
+}
+
+static struct sigaction sigact = {
+ .sa_handler = sigalarm,
+};
+
+static struct nlmsghdr hdr = (struct nlmsghdr) {
+ .nlmsg_type = RTM_GETLINK,
+ .nlmsg_flags = NLM_F_REQUEST | NLM_F_ACK,
+ .nlmsg_len = sizeof(struct nlmsghdr),
+};
+
+#define BUFLEN 1000
+
+static int
+fullsocket(void)
+{
+ char buf[BUFLEN];
+ socklen_t slen = sizeof(int);
+ int fd, sendspace, recvspace, sendavail, recvavail, rsize;
+ u_int cnt = 0;
+
+ ATF_REQUIRE((fd = socket(PF_NETLINK, SOCK_RAW, NETLINK_ROUTE)) != -1);
+ ATF_REQUIRE(getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendspace,
+ &slen) == 0);
+ ATF_REQUIRE(getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvspace,
+ &slen) == 0);
+
+ /* Check the expected size of reply on a single RTM_GETLINK. */
+ ATF_REQUIRE(send(fd, &hdr, sizeof(hdr), 0) == sizeof(hdr));
+ ATF_REQUIRE(recv(fd, buf, sizeof(hdr), MSG_WAITALL | MSG_PEEK) ==
+ sizeof(hdr));
+ ATF_REQUIRE(ioctl(fd, FIONREAD, &rsize) != -1);
+
+
+ /*
+ * Flood the socket with requests, without reading out the replies.
+ * While we are flooding, the kernel tries to process the requests.
+ * Kernel takes off requests from the send buffer and puts replies
+ * on receive buffer. Once the receive buffer is full it stops working
+ * on queue in the send buffer. At this point we must get a solid
+ * failure. However, if we flood faster than kernel taskqueue runs,
+ * we may get intermittent failures.
+ */
+ do {
+ ssize_t rv;
+
+ rv = send(fd, &hdr, sizeof(hdr), MSG_DONTWAIT);
+ if (__predict_true(rv == sizeof(hdr)))
+ cnt++;
+ else {
+ ATF_REQUIRE(errno == EAGAIN);
+ ATF_REQUIRE(sizeof(hdr) * cnt > sendspace);
+ }
+ ATF_REQUIRE(ioctl(fd, FIONREAD, &recvavail) != -1);
+ ATF_REQUIRE(ioctl(fd, FIONWRITE, &sendavail) != -1);
+ } while (recvavail <= recvspace - rsize ||
+ sendavail <= sendspace - sizeof(hdr));
+
+ return (fd);
+}
+
+ATF_TC(overflow);
+ATF_TC_HEAD(overflow, tc)
+{
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+ATF_TC_BODY(overflow, tc)
+{
+ char buf[BUFLEN];
+ int fd;
+
+ fd = fullsocket();
+
+ /* Both buffers full: block. */
+ timer_done = 0;
+ ATF_REQUIRE(sigaction(SIGALRM, &sigact, NULL) == 0);
+ ATF_REQUIRE(setitimer(ITIMER_REAL, &itv, NULL) == 0);
+ ATF_REQUIRE(send(fd, &hdr, sizeof(hdr), 0) == -1);
+ ATF_REQUIRE(errno == EINTR);
+ ATF_REQUIRE(timer_done == 1);
+
+ /*
+ * Now, reading something from the receive buffer should wake up the
+ * taskqueue and send buffer should start getting drained.
+ */
+ ATF_REQUIRE(recv(fd, buf, BUFLEN, 0) > sizeof(hdr));
+ timer_done = 0;
+ ATF_REQUIRE(setitimer(ITIMER_REAL, &itv, NULL) == 0);
+ ATF_REQUIRE(send(fd, &hdr, sizeof(hdr), 0) == sizeof(hdr));
+ ATF_REQUIRE(timer_done == 0);
+}
+
+ATF_TC(peek);
+ATF_TC_HEAD(peek, tc)
+{
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+ATF_TC_BODY(peek, tc)
+{
+ char *buf;
+ ssize_t ss, ss1;
+ int fd;
+
+ ATF_REQUIRE((fd = socket(PF_NETLINK, SOCK_RAW, NETLINK_ROUTE)) != -1);
+
+ ATF_REQUIRE(send(fd, &hdr, sizeof(hdr), 0) == sizeof(hdr));
+ ss = recv(fd, buf, 0, MSG_WAITALL | MSG_PEEK | MSG_TRUNC);
+ ATF_REQUIRE((buf = malloc(ss)) != NULL);
+ ATF_REQUIRE(recv(fd, buf, ss, MSG_WAITALL) == ss);
+}
+
+struct nl_control {
+ struct nlattr nla;
+ uint32_t val;
+};
+
+static void
+cmsg_check(struct msghdr *msg)
+{
+ static pid_t pid = 0;
+ struct cmsghdr *cmsg;
+ struct nl_control *nlc;
+
+ ATF_REQUIRE((cmsg = CMSG_FIRSTHDR(msg)) != NULL);
+ ATF_REQUIRE(cmsg->cmsg_level == SOL_NETLINK);
+ ATF_REQUIRE(cmsg->cmsg_type == NETLINK_MSG_INFO);
+ nlc = (struct nl_control *)CMSG_DATA(cmsg);
+ ATF_REQUIRE(nlc[0].nla.nla_type == NLMSGINFO_ATTR_PROCESS_ID);
+ if (pid == 0)
+ pid = getpid();
+ ATF_REQUIRE(nlc[0].val == pid);
+ ATF_REQUIRE(nlc[1].nla.nla_type == NLMSGINFO_ATTR_PORT_ID);
+ /* XXX need another test to test port id */
+ ATF_REQUIRE(nlc[1].val == 0);
+ ATF_REQUIRE(CMSG_NXTHDR(msg, cmsg) == NULL);
+ ATF_REQUIRE((msg->msg_flags & MSG_CTRUNC) == 0);
+}
+
+ATF_TC(sizes);
+ATF_TC_HEAD(sizes, tc)
+{
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+ATF_TC_BODY(sizes, tc)
+{
+#define NLMSG_LARGE 2048 /* XXX: match kernel nl_buf */
+ char buf[NLMSG_LARGE * 10];
+ char cbuf[CMSG_SPACE(sizeof(struct nl_control) * 2)];
+ struct iovec iov;
+ struct msghdr msg = {
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ .msg_control = cbuf,
+ .msg_controllen = sizeof(cbuf),
+ };
+ ssize_t ss;
+ int fd, size, msize, rsize;
+
+ /*
+ * Create a socket with NMSGS messages in the receive buffer.
+ */
+#define NMSGS 5
+ ATF_REQUIRE((fd = socket(PF_NETLINK, SOCK_RAW, NETLINK_ROUTE)) != -1);
+ ATF_REQUIRE(send(fd, &hdr, sizeof(hdr), 0) == sizeof(hdr));
+ ATF_REQUIRE(recv(fd, buf, sizeof(hdr), MSG_WAITALL | MSG_PEEK) ==
+ sizeof(hdr));
+ ATF_REQUIRE(ioctl(fd, FIONREAD, &msize) != -1);
+ for (u_int i = 0; i < NMSGS; i++)
+ ATF_REQUIRE(send(fd, &hdr, sizeof(hdr), 0) == sizeof(hdr));
+ do {
+ ATF_REQUIRE(ioctl(fd, FIONREAD, &rsize) != -1);
+ } while (rsize < msize * (NMSGS + 1));
+
+ /*
+ * Set NETLINK_MSG_INFO, so that later cmsg_check will check that any
+ * read is accompanied with control data.
+ */
+ ATF_REQUIRE(setsockopt(fd, SOL_NETLINK, NETLINK_MSG_INFO,
+ &(int){1}, sizeof(int)) == 0);
+
+ iov = (struct iovec ){
+ .iov_base = &hdr,
+ .iov_len = sizeof(hdr),
+ };
+ /* Obtain size of the first message in the socket. */
+ ss = recvmsg(fd, &msg, MSG_WAITALL | MSG_PEEK | MSG_TRUNC);
+ ATF_REQUIRE(ss == hdr.nlmsg_len);
+ /* And overall amount of data in the socket. */
+ ATF_REQUIRE(ioctl(fd, FIONREAD, &rsize) != -1);
+ cmsg_check(&msg);
+
+ /* Zero-sized read should not affect state of the socket buffer. */
+ ATF_REQUIRE(recv(fd, buf, 0, 0) == 0);
+ ATF_REQUIRE(ioctl(fd, FIONREAD, &size) != -1);
+ ATF_REQUIRE(size == rsize);
+
+ /*
+ * Undersized read should lose a message. This isn't exactly
+ * pronounced in the Netlink RFC, but it always says that Netlink
+ * socket is an analog of the BSD routing socket, and this is how
+ * a route(4) socket deals with undersized read.
+ */
+ iov = (struct iovec ){
+ .iov_base = buf,
+ .iov_len = sizeof(hdr),
+ };
+ ATF_REQUIRE(recvmsg(fd, &msg, 0) == sizeof(hdr));
+ ATF_REQUIRE(msg.msg_flags & MSG_TRUNC);
+ ATF_REQUIRE(hdr.nlmsg_len > sizeof(hdr));
+ size = rsize - hdr.nlmsg_len;
+ ATF_REQUIRE(ioctl(fd, FIONREAD, &rsize) != -1);
+ ATF_REQUIRE(size == rsize);
+ cmsg_check(&msg);
+
+ /*
+ * Large read should span several nl_bufs, seeing no boundaries.
+ */
+ iov = (struct iovec ){
+ .iov_base = buf,
+ .iov_len = sizeof(buf) < rsize ? sizeof(buf) : rsize,
+ };
+ ss = recvmsg(fd, &msg, 0);
+ ATF_REQUIRE(ss > hdr.nlmsg_len);
+ ATF_REQUIRE(ss > NLMSG_LARGE * 9 || ss == rsize);
+ cmsg_check(&msg);
+}
+
+static struct nlattr *
+nla_RTA_DST(struct nlattr *start, ssize_t len)
+{
+ struct nlattr *nla;
+
+ for (nla = start; (char *)nla < (char *)start + len;
+ nla = (struct nlattr *)((char *)nla + NLA_ALIGN(nla->nla_len))) {
+ if (nla->nla_type == RTA_DST)
+ return (nla);
+ }
+
+ return (NULL);
+}
+/*
+ * Check that NETLINK_ADD_MEMBERSHIP subscribes us. Add & delete a temporary
+ * route and check if announcements came in.
+ */
+ATF_TC(membership);
+ATF_TC_HEAD(membership, tc)
+{
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+ATF_TC_BODY(membership, tc)
+{
+ struct {
+ struct nlmsghdr hdr;
+ struct rtmsg rtm;
+ struct nlattr rta_dst;
+ struct in_addr dst;
+ struct nlattr rta_oif;
+ uint32_t oif;
+ } reply, msg = {
+ .hdr.nlmsg_type = RTM_NEWROUTE,
+ .hdr.nlmsg_flags = NLM_F_REQUEST | NLM_F_CREATE | NLM_F_EXCL,
+ .hdr.nlmsg_len = sizeof(msg),
+ .rtm.rtm_family = AF_INET,
+ .rtm.rtm_protocol = RTPROT_STATIC,
+ .rtm.rtm_type = RTN_UNICAST,
+ .rtm.rtm_dst_len = 32,
+ .rta_dst.nla_type = RTA_DST,
+ .rta_dst.nla_len = sizeof(struct in_addr) +
+ sizeof(struct nlattr),
+ .dst.s_addr = inet_addr("127.0.0.127"),
+ .rta_oif.nla_type = RTA_OIF,
+ .rta_oif.nla_len = sizeof(uint32_t) + sizeof(struct nlattr),
+ .oif = 1,
+ };
+ struct nlattr *nla;
+ int fd;
+
+ ATF_REQUIRE((fd = socket(PF_NETLINK, SOCK_RAW, NETLINK_ROUTE)) != -1);
+ ATF_REQUIRE(setsockopt(fd, SOL_NETLINK, NETLINK_ADD_MEMBERSHIP,
+ &(int){RTNLGRP_IPV4_ROUTE}, sizeof(int)) == 0);
+
+ ATF_REQUIRE(send(fd, &msg, sizeof(msg), 0) == sizeof(msg));
+ ATF_REQUIRE(recv(fd, &reply, sizeof(reply), 0) == sizeof(reply));
+ ATF_REQUIRE(reply.hdr.nlmsg_type == msg.hdr.nlmsg_type);
+ ATF_REQUIRE(reply.rtm.rtm_type == msg.rtm.rtm_type);
+ ATF_REQUIRE(reply.rtm.rtm_dst_len == msg.rtm.rtm_dst_len);
+ ATF_REQUIRE(nla = nla_RTA_DST(&reply.rta_dst, sizeof(reply)));
+ ATF_REQUIRE(memcmp(&msg.dst, (char *)nla + sizeof(struct nlattr),
+ sizeof(struct in_addr)) == 0);
+
+ msg.hdr.nlmsg_type = RTM_DELROUTE;
+ msg.hdr.nlmsg_len -= sizeof(struct nlattr) + sizeof(uint32_t);
+ ATF_REQUIRE(send(fd, &msg, msg.hdr.nlmsg_len, 0) == msg.hdr.nlmsg_len);
+ ATF_REQUIRE(recv(fd, &reply, sizeof(reply), 0) == sizeof(reply));
+ ATF_REQUIRE(reply.hdr.nlmsg_type == msg.hdr.nlmsg_type);
+ ATF_REQUIRE(reply.rtm.rtm_type == msg.rtm.rtm_type);
+ ATF_REQUIRE(reply.rtm.rtm_dst_len == msg.rtm.rtm_dst_len);
+ ATF_REQUIRE(nla = nla_RTA_DST(&reply.rta_dst, sizeof(reply)));
+ ATF_REQUIRE(memcmp(&msg.dst, (char *)nla + sizeof(struct nlattr),
+ sizeof(struct in_addr)) == 0);
+}
+
+ATF_TP_ADD_TCS(tp)
+{
+ ATF_TP_ADD_TC(tp, overflow);
+ ATF_TP_ADD_TC(tp, peek);
+ ATF_TP_ADD_TC(tp, sizes);
+ ATF_TP_ADD_TC(tp, membership);
+
+ return (atf_no_error());
+}
diff --git a/tests/sys/netlink/test_netlink_message_writer.py b/tests/sys/netlink/test_netlink_message_writer.py
new file mode 100644
index 000000000000..5f854b14ca45
--- /dev/null
+++ b/tests/sys/netlink/test_netlink_message_writer.py
@@ -0,0 +1,39 @@
+import mmap
+import pytest
+
+from atf_python.ktest import BaseKernelTest
+from atf_python.sys.netlink.attrs import NlAttrU32
+
+M_NOWAIT = 1
+M_WAITOK = 2
+
+NLMSG_SMALL = 128
+NLMSG_LARGE = 2048
+
+class TestNetlinkMessageWriter(BaseKernelTest):
+ KTEST_MODULE_NAME = "ktest_netlink_message_writer"
+
+ @pytest.mark.parametrize(
+ "malloc_flags",
+ [
+ pytest.param(M_NOWAIT, id="NOWAIT"),
+ pytest.param(M_WAITOK, id="WAITOK"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "sz",
+ [
+ pytest.param([NLMSG_SMALL, NLMSG_SMALL], id="NLMSG_SMALL"),
+ pytest.param([NLMSG_LARGE, NLMSG_LARGE], id="NLMSG_LARGE"),
+ pytest.param([NLMSG_LARGE + 256, NLMSG_LARGE + 256], id="NLMSG_LARGE+256"),
+ ],
+ )
+ def test_nlbuf_writer_allocation(self, sz, malloc_flags):
+ """override to parametrize"""
+
+ test_meta = [
+ NlAttrU32(1, sz[0]), # size
+ NlAttrU32(2, sz[1]), # expected_avail
+ NlAttrU32(3, malloc_flags),
+ ]
+ self.runtest(test_meta)
diff --git a/tests/sys/netlink/test_nl_core.py b/tests/sys/netlink/test_nl_core.py
new file mode 100644
index 000000000000..5b4306cbb932
--- /dev/null
+++ b/tests/sys/netlink/test_nl_core.py
@@ -0,0 +1,33 @@
+import errno
+import socket
+
+import pytest
+from atf_python.sys.net.vnet import SingleVnetTestTemplate
+from atf_python.sys.netlink.netlink import NetlinkTestTemplate
+from atf_python.sys.netlink.utils import NlConst
+
+
+class TestNlCore(NetlinkTestTemplate, SingleVnetTestTemplate):
+ @pytest.mark.parametrize(
+ "params",
+ [
+ pytest.param({"type": socket.SOCK_RAW}, id="SOCK_RAW"),
+ pytest.param({"type": socket.SOCK_DGRAM}, id="SOCK_DGRAM"),
+ ],
+ )
+ def test_socket_type(self, params):
+ s = socket.socket(NlConst.AF_NETLINK, params["type"], NlConst.NETLINK_ROUTE)
+ s.close()
+
+ @pytest.mark.parametrize(
+ "params",
+ [
+ pytest.param({"type": socket.SOCK_STREAM}, id="SOCK_STREAM"),
+ pytest.param({"type": socket.SOCK_RDM}, id="SOCK_RDM"),
+ pytest.param({"type": socket.SOCK_SEQPACKET}, id="SOCK_SEQPACKET"),
+ ],
+ )
+ def test_socket_type_unsup(self, params):
+ with pytest.raises(OSError) as exc_info:
+ socket.socket(NlConst.AF_NETLINK, params["type"], NlConst.NETLINK_ROUTE)
+ assert exc_info.value.errno == errno.EPROTOTYPE
diff --git a/tests/sys/netlink/test_rtnl_iface.py b/tests/sys/netlink/test_rtnl_iface.py
new file mode 100644
index 000000000000..41cb4d16de94
--- /dev/null
+++ b/tests/sys/netlink/test_rtnl_iface.py
@@ -0,0 +1,355 @@
+import errno
+import socket
+
+import pytest
+from atf_python.sys.netlink.netlink_route import IflattrType
+from atf_python.sys.netlink.netlink_route import IflinkInfo
+from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
+from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
+from atf_python.sys.netlink.netlink import NetlinkTestTemplate
+from atf_python.sys.netlink.attrs import NlAttrNested
+from atf_python.sys.netlink.attrs import NlAttrStr
+from atf_python.sys.netlink.attrs import NlAttrStrn
+from atf_python.sys.netlink.attrs import NlAttrU16
+from atf_python.sys.netlink.attrs import NlAttrU32
+from atf_python.sys.netlink.utils import NlConst
+from atf_python.sys.netlink.base_headers import NlmBaseFlags
+from atf_python.sys.netlink.base_headers import NlmNewFlags
+from atf_python.sys.netlink.base_headers import NlMsgType
+from atf_python.sys.netlink.netlink_route import NlRtMsgType
+from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
+from atf_python.sys.net.vnet import SingleVnetTestTemplate
+from atf_python.sys.net.tools import ToolsHelper
+
+
+class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
+ def setup_method(self, method):
+ super().setup_method(method)
+ self.setup_netlink(NlConst.NETLINK_ROUTE)
+
+ def get_interface_byname(self, ifname):
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
+ self.write_message(msg)
+ while True:
+ rx_msg = self.read_message()
+ if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
+ if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
+ if rx_msg.error_code != 0:
+ raise ValueError("unable to get interface {}".format(ifname))
+ elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
+ return rx_msg
+ else:
+ raise ValueError("bad message")
+
+ def test_get_iface_byname_error(self):
+ """Tests error on fetching non-existing interface name"""
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == errno.ENODEV
+
+ def test_get_iface_byindex_error(self):
+ """Tests error on fetching non-existing interface index"""
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = 2147483647
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == errno.ENODEV
+
+ @pytest.mark.require_user("root")
+ def test_create_iface_plain(self):
+ """Tests loopback creation w/o any parameters"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ self.get_interface_byname("lo10")
+
+ @pytest.mark.require_user("root")
+ def test_create_iface_plain_retvals(self):
+ """Tests loopback creation w/o any parameters"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+ assert rx_msg.cookie is not None
+ nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
+ nla_map = {n.nla_type: n for n in nla_list}
+ assert IflattrType.IFLA_IFNAME.value in nla_map
+ assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
+ assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
+ assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
+
+ lo_msg = self.get_interface_byname("lo10")
+ assert (
+ lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
+ )
+
+ @pytest.mark.require_user("root")
+ def test_create_iface_attrs(self):
+ """Tests interface creation with additional properties"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ # Custom attributes
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
+ msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ iface_msg = self.get_interface_byname("lo10")
+ assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
+ assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
+
+ @pytest.mark.require_user("root")
+ def test_modify_iface_attrs(self):
+ """Tests interface modifications"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+
+ # Custom attributes
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
+ msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ iface_msg = self.get_interface_byname("lo10")
+ assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
+ assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
+
+ @pytest.mark.require_user("root")
+ def test_delete_iface(self):
+ """Tests interface modifications"""
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ iface_msg = self.get_interface_byname("lo10")
+ iface_idx = iface_msg.base_hdr.ifi_index
+
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = iface_idx
+ # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = 2147483647
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == errno.ENODEV
+
+ @pytest.mark.require_user("root")
+ def test_dump_ifaces_many(self):
+ """Tests if interface dummp is not missing interfaces"""
+
+ ifmap = {}
+ ifmap[socket.if_nametoindex("lo0")] = "lo0"
+
+ for i in range(40):
+ ifname = "lo{}".format(i + 1)
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
+ nla_map = {n.nla_type: n for n in nla_list}
+ assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
+ ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
+ assert ifindex > 0
+ assert ifindex not in ifmap
+ ifmap[ifindex] = ifname
+
+ # Dump all interfaces and check if the output matches ifmap
+ kernel_ifmap = {}
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ self.write_message(msg)
+ while True:
+ rx_msg = self.read_message()
+ if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
+ raise ValueError(
+ "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
+ )
+ if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
+ raise ValueError("unexpected message {}".format(rx_msg))
+ if rx_msg.is_type(NlMsgType.NLMSG_DONE):
+ break
+ if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
+ raise ValueError("unexpected message {}".format(rx_msg))
+
+ ifindex = rx_msg.base_hdr.ifi_index
+ assert ifindex == rx_msg.base_hdr.ifi_index
+ ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
+ if ifname.startswith("lo"):
+ kernel_ifmap[ifindex] = ifname
+ assert kernel_ifmap == ifmap
+
+ #
+ # *
+ # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
+ # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
+ # * {{nla_len=8, nla_type=IFLA_LINK}, 2},
+ # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
+ # * {{nla_len=24, nla_type=IFLA_LINKINFO},
+ # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
+ # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
+ # */
+ @pytest.mark.require_user("root")
+ def test_create_vlan_plain(self):
+ """Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
+ self.require_module("if_vlan")
+ os_ifname = self.vnet.iface_alias_map["if1"].name
+ ifindex = socket.if_nametoindex(os_ifname)
+
+ flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
+ msg.nl_hdr.nlmsg_flags = (
+ flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
+ )
+ msg.base_hdr.ifi_index = ifindex
+
+ msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
+ msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
+
+ msg.add_nla(
+ NlAttrNested(
+ IflattrType.IFLA_LINKINFO,
+ [
+ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
+ NlAttrNested(
+ IflinkInfo.IFLA_INFO_DATA,
+ [
+ NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
+ ],
+ ),
+ ],
+ )
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ ToolsHelper.print_net_debug()
+ self.get_interface_byname("vlan22")
+ # ToolsHelper.print_net_debug()
diff --git a/tests/sys/netlink/test_rtnl_ifaddr.py b/tests/sys/netlink/test_rtnl_ifaddr.py
new file mode 100644
index 000000000000..768bf38153ff
--- /dev/null
+++ b/tests/sys/netlink/test_rtnl_ifaddr.py
@@ -0,0 +1,762 @@
+import ipaddress
+import socket
+import struct
+
+import pytest
+from atf_python.sys.net.vnet import SingleVnetTestTemplate
+from atf_python.sys.netlink.attrs import NlAttr
+from atf_python.sys.netlink.attrs import NlAttrIp
+from atf_python.sys.netlink.attrs import NlAttrNested
+from atf_python.sys.netlink.attrs import NlAttrU32
+from atf_python.sys.netlink.base_headers import NlmBaseFlags
+from atf_python.sys.netlink.base_headers import NlmNewFlags
+from atf_python.sys.netlink.base_headers import Nlmsghdr
+from atf_python.sys.netlink.message import NlMsgType
+from atf_python.sys.netlink.netlink import NetlinkTestTemplate
+from atf_python.sys.netlink.netlink import Nlsock
+from atf_python.sys.netlink.netlink_generic import CarpAttrType
+from atf_python.sys.netlink.netlink_generic import CarpGenMessage
+from atf_python.sys.netlink.netlink_generic import CarpMsgType
+from atf_python.sys.netlink.netlink_route import IfaAttrType
+from atf_python.sys.netlink.netlink_route import IfaCacheInfo
+from atf_python.sys.netlink.netlink_route import IfafAttrType
+from atf_python.sys.netlink.netlink_route import IfafFlags6
+from atf_python.sys.netlink.netlink_route import IfaFlags
+from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage
+from atf_python.sys.netlink.netlink_route import NlRtMsgType
+from atf_python.sys.netlink.netlink_route import RtScope
+from atf_python.sys.netlink.utils import enum_or_int
+from atf_python.sys.netlink.utils import NlConst
+
+
+class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate):
+ def setup_method(self, method):
+ method_name = method.__name__
+ if "4" in method_name:
+ if "nofilter" in method_name:
+ self.IPV4_PREFIXES = ["192.0.2.1/24", "169.254.169.254/16"]
+ else:
+ self.IPV4_PREFIXES = ["192.0.2.1/24"]
+ if "6" in method_name:
+ self.IPV6_PREFIXES = ["2001:db8::1/64"]
+ super().setup_method(method)
+ self.setup_netlink(NlConst.NETLINK_ROUTE)
+
+ def test_46_nofilter(self):
+ """Tests that listing outputs both IPv4/IPv6 and interfaces"""
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
+ msg.set_request()
+ self.write_message(msg)
+
+ ret = []
+ for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
+ ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
+ family = rx_msg.base_hdr.ifa_family
+ scope = rx_msg.base_hdr.ifa_scope
+ ret.append((ifname, family, scope))
+
+ ifname = "lo0"
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
+ assert len([r for r in ret if r[0] == ifname]) == 3
+
+ ifname = self.vnet.iface_alias_map["if1"].name
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
+ assert len([r for r in ret if r[0] == ifname]) == 4
+
+ def test_46_filter_iface(self):
+ """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
+ epair_ifname = self.vnet.iface_alias_map["if1"].name
+
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
+ msg.set_request()
+ msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
+ self.write_message(msg)
+
+ ret = []
+ for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
+ ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
+ family = rx_msg.base_hdr.ifa_family
+ ret.append((ifname, family, rx_msg))
+
+ ifname = epair_ifname
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
+ assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
+ assert len(ret) == 3
+
+ def test_46_filter_family_compat(self):
+ """Tests that family filtering works with the stripped header"""
+
+ hdr = Nlmsghdr(
+ nlmsg_len=17,
+ nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
+ nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
+ nlmsg_seq=self.helper.get_seq(),
+ )
+ data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
+ self.nlsock.write_data(data)
+
+ ret = []
+ for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
+ ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
+ family = rx_msg.base_hdr.ifa_family
+ ret.append((ifname, family, rx_msg))
+ assert len(ret) == 2
+
+ def filter_iface_family(self, family, num_items):
+ """Tests that listing outputs IPv4 for the specific interface"""
+ epair_ifname = self.vnet.iface_alias_map["if1"].name
+
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
+ msg.set_request()
+ msg.base_hdr.ifa_family = family
+ msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
+ self.write_message(msg)
+
+ ret = []
+ for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
+ assert family == rx_msg.base_hdr.ifa_family
+ assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
+ ret.append(rx_msg)
+ assert len(ret) == num_items
+ return ret
+
+ def test_4_broadcast(self):
+ """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
+ ret = self.filter_iface_family(socket.AF_INET, 1)
+ # Should be 192.0.2.1/24
+ msg = ret[0]
+ # Family and ifindex has been checked already
+ assert msg.base_hdr.ifa_prefixlen == 24
+ # Ignore IFA_FLAGS for now
+ assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+
+ assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1"
+ assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1"
+ assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255"
+
+ epair_ifname = self.vnet.iface_alias_map["if1"].name
+ assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
+
+ def test_6_broadcast(self):
+ """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
+ ret = self.filter_iface_family(socket.AF_INET6, 2)
+ # Should be 192.0.2.1/24
+ if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
+ (gmsg, lmsg) = ret
+ else:
+ (lmsg, gmsg) = ret
+ # Start with global ( 2001:db8::1/64 )
+ msg = gmsg
+ # Family and ifindex has been checked already
+ assert msg.base_hdr.ifa_prefixlen == 64
+ # Ignore IFA_FLAGS for now
+ assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+
+ assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1"
+ assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
+ assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
+
+ epair_ifname = self.vnet.iface_alias_map["if1"].name
+ assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
+
+ # Local: fe80::/64
+ msg = lmsg
+ assert msg.base_hdr.ifa_prefixlen == 64
+ # Ignore IFA_FLAGS for now
+ assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
+
+ addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr)
+ assert addr.is_link_local
+ # Verify that ifindex is not emmbedded
+ assert struct.unpack("!H", addr.packed[2:4])[0] == 0
+ assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
+ assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
+
+ epair_ifname = self.vnet.iface_alias_map["if1"].name
+ assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
+
+
+class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate):
+ def setup_method(self, method):
+ super().setup_method(method)
+ self.setup_netlink(NlConst.NETLINK_ROUTE)
+
+ def send_check_success(self, msg):
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ @staticmethod
+ def get_family_from_ip(ip):
+ if ip.version == 4:
+ return socket.AF_INET
+ return socket.AF_INET6
+
+ def create_msg(self, ifa):
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
+ msg.set_request()
+ msg.nl_hdr.nlmsg_flags |= (
+ NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ )
+ msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
+ msg.base_hdr.ifa_index = iface.ifindex
+ msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
+ return msg
+
+ def get_ifa_list(self, ifindex=0, family=0):
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
+ msg.set_request()
+ msg.base_hdr.ifa_family = family
+ msg.base_hdr.ifa_index = ifindex
+ self.write_message(msg)
+ return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR)
+
+ def find_msg_by_ifa(self, msg_list, ip):
+ for msg in msg_list:
+ if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip):
+ return msg
+ return None
+
+ def setup_dummy_carp(self, ifindex: int, vhid: int):
+ self.require_module("carp")
+
+ nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
+ family_id = nlsock.get_genl_family_id("carp")
+
+ msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET)
+ msg.set_request()
+ msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid))
+ msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex))
+ rx_msg = nlsock.get_reply(msg)
+
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+
+class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps):
+ def test_add_4(self):
+ """Tests IPv4 address addition to the standard broadcast interface"""
+ ifa = ipaddress.ip_interface("192.0.2.1/24")
+ ifa_brd = ifa.network.broadcast_address
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 1
+ rx_msg = lst[0]
+
+ assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+
+ assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
+
+ @pytest.mark.parametrize(
+ "brd",
+ [
+ pytest.param((32, True, "192.0.2.1"), id="auto_32"),
+ pytest.param((31, True, "255.255.255.255"), id="auto_31"),
+ pytest.param((30, True, "192.0.2.3"), id="auto_30"),
+ pytest.param((30, False, "192.0.2.2"), id="custom_30"),
+ pytest.param((24, False, "192.0.2.7"), id="custom_24"),
+ ],
+ )
+ def test_add_4_brd(self, brd):
+ """Tests proper broadcast setup when adding IPv4 ifa"""
+ plen, auto_brd, ifa_brd_str = brd
+ ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen))
+ iface = self.vnet.iface_alias_map["if1"]
+ ifa_brd = ipaddress.ip_address(ifa_brd_str)
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ if not auto_brd:
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 1
+ rx_msg = lst[0]
+
+ assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+
+ assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
+
+ def test_add_6(self):
+ ifa = ipaddress.ip_interface("2001:db8::1/64")
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 2
+ rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg_gu is not None
+
+ assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+ assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
+
+ def test_add_4_carp(self):
+ ifa = ipaddress.ip_interface("192.0.2.1/24")
+ ifa_brd = ifa.network.broadcast_address
+ iface = self.vnet.iface_alias_map["if1"]
+ vhid = 77
+
+ self.setup_dummy_carp(iface.ifindex, vhid)
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
+ attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
+ msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 1
+ rx_msg = lst[0]
+
+ assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+
+ assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
+ ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
+ assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
+
+ def test_add_6_carp(self):
+ ifa = ipaddress.ip_interface("2001:db8::1/64")
+ iface = self.vnet.iface_alias_map["if1"]
+ vhid = 77
+
+ self.setup_dummy_carp(iface.ifindex, vhid)
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
+ msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 2
+ rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg_gu is not None
+
+ assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+ assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
+ ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD)
+ assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
+
+ def test_add_6_lifetime(self):
+ ifa = ipaddress.ip_interface("2001:db8::1/64")
+ iface = self.vnet.iface_alias_map["if1"]
+ pref_time = 43200
+ valid_time = 86400
+
+ ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time)
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 2
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is not None
+
+ ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci
+ assert pref_time - 5 <= ci.ifa_prefered <= pref_time
+ assert valid_time - 5 <= ci.ifa_valid <= valid_time
+ assert ci.cstamp > 0
+ assert ci.tstamp > 0
+ assert ci.tstamp >= ci.cstamp
+
+ @pytest.mark.parametrize(
+ "flags_str",
+ [
+ "autoconf",
+ "deprecated",
+ "autoconf,deprecated",
+ "prefer_source",
+ ],
+ )
+ def test_add_6_flags(self, flags_str):
+ ifa = ipaddress.ip_interface("2001:db8::1/64")
+ iface = self.vnet.iface_alias_map["if1"]
+
+ flags_map = {
+ "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF},
+ "deprecated": {
+ "nl": IfaFlags.IFA_F_DEPRECATED,
+ "f": IfafFlags6.IN6_IFF_DEPRECATED,
+ },
+ "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE},
+ }
+ nl_flags = 0
+ f_flags = 0
+
+ for flag_str in flags_str.split(","):
+ d = flags_map.get(flag_str, {})
+ nl_flags |= enum_or_int(d.get("nl", 0))
+ f_flags |= enum_or_int(d.get("f", 0))
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags))
+ attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)]
+ msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 2
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is not None
+
+ assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags
+ ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
+ assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags
+
+ def test_add_4_empty_message(self):
+ """Tests correct failure w/ empty message"""
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
+ msg.set_request()
+ msg.nl_hdr.nlmsg_flags |= (
+ NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
+ )
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code != 0
+
+ def test_add_4_empty_ifindex(self):
+ """Tests correct failure w/ empty ifindex"""
+ ifa = ipaddress.ip_interface("192.0.2.1/24")
+ ifa_brd = ifa.network.broadcast_address
+
+ msg = self.create_msg(ifa)
+ msg.base_hdr.ifa_index = 0
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code != 0
+
+ def test_add_4_empty_addr(self):
+ """Tests correct failure w/ empty address"""
+ ifa = ipaddress.ip_interface("192.0.2.1/24")
+ ifa_brd = ifa.network.broadcast_address
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code != 0
+
+ @pytest.mark.parametrize(
+ "ifa_str",
+ [
+ pytest.param("192.0.2.1/32", id="ipv4_host"),
+ pytest.param("192.0.2.1/24", id="ipv4_prefix"),
+ pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
+ pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "tlv",
+ [
+ pytest.param("local", id="ifa_local"),
+ pytest.param("address", id="ifa_address"),
+ ],
+ )
+ def test_del(self, tlv, ifa_str):
+ """Tests address deletion from the standard broadcast interface"""
+ ifa = ipaddress.ip_interface(ifa_str)
+ ifa_brd = ifa.network.broadcast_address
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
+
+ self.send_check_success(msg)
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is not None
+
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
+ msg.set_request()
+ msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
+ msg.base_hdr.ifa_index = iface.ifindex
+ msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
+
+ if tlv == "local":
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ if tlv == "address":
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
+
+ self.send_check_success(msg)
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is None
+
+
+class TestRtNlIfaddrOpsP2p(RtnlIfaOps):
+ IFTYPE = "gif"
+
+ @pytest.mark.parametrize(
+ "ifa_pair",
+ [
+ pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"),
+ pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"),
+ pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"),
+ pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"),
+ pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"),
+ ],
+ )
+ def test_add_4(self, ifa_pair):
+ """Tests IPv4 address addition to the p2p interface"""
+ ifa = ipaddress.ip_interface(ifa_pair[0])
+ peer_ip = ipaddress.ip_address(ifa_pair[1])
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 1
+ rx_msg = lst[0]
+
+ assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+
+ assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
+
+ @pytest.mark.parametrize(
+ "ifa_pair",
+ [
+ pytest.param(
+ ["2001:db8::1/64", "2001:db8::2"],
+ id="dst_inside_64",
+ marks=pytest.mark.xfail(reason="currently fails"),
+ ),
+ pytest.param(
+ ["2001:db8::1/127", "2001:db8::2"],
+ id="dst_inside_127",
+ marks=pytest.mark.xfail(reason="currently fails"),
+ ),
+ pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"),
+ pytest.param(
+ ["2001:db8::1/64", "2001:db8:2::2"],
+ id="dst_outside_64",
+ marks=pytest.mark.xfail(reason="currently fails"),
+ ),
+ ],
+ )
+ def test_add_6(self, ifa_pair):
+ """Tests IPv6 address addition to the p2p interface"""
+ ifa = ipaddress.ip_interface(ifa_pair[0])
+ peer_ip = ipaddress.ip_address(ifa_pair[1])
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 2
+ rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip)
+ assert rx_msg_gu is not None
+
+ assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+ assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
+ assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
+
+ @pytest.mark.parametrize(
+ "ifa_pair",
+ [
+ pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"),
+ pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"),
+ pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "tlv_pair",
+ [
+ pytest.param(["a", ""], id="ifa_addr=addr"),
+ pytest.param(["", "a"], id="ifa_local=addr"),
+ pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"),
+ ],
+ )
+ def test_del(self, tlv_pair, ifa_pair):
+ """Tests address deletion from the P2P interface"""
+ ifa = ipaddress.ip_interface(ifa_pair[0])
+ peer_ip = ipaddress.ip_address(ifa_pair[1])
+ iface = self.vnet.iface_alias_map["if1"]
+ ifa_addr_str, ifa_local_str = tlv_pair
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
+
+ self.send_check_success(msg)
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ rx_msg = self.find_msg_by_ifa(lst, peer_ip)
+ assert rx_msg is not None
+
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
+ msg.set_request()
+ msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
+ msg.base_hdr.ifa_index = iface.ifindex
+ msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
+
+ if "a" in ifa_addr_str:
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
+ if "p" in ifa_addr_str:
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
+ if "a" in ifa_local_str:
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ if "p" in ifa_local_str:
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip)))
+
+ self.send_check_success(msg)
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is None
+
+
+class TestRtNlAddIfaddrLo(RtnlIfaOps):
+ IFTYPE = "lo"
+
+ @pytest.mark.parametrize(
+ "ifa_str",
+ [
+ pytest.param("192.0.2.1/24", id="prefix"),
+ pytest.param("192.0.2.1/32", id="host"),
+ ],
+ )
+ def test_add_4(self, ifa_str):
+ """Tests IPv4 address addition to the loopback interface"""
+ ifa = ipaddress.ip_interface(ifa_str)
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 1
+ rx_msg = lst[0]
+
+ assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+
+ assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
+ assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
+
+ @pytest.mark.parametrize(
+ "ifa_str",
+ [
+ pytest.param("2001:db8::1/64", id="gu_prefix"),
+ pytest.param("2001:db8::1/128", id="gu_host"),
+ ],
+ )
+ def test_add_6(self, ifa_str):
+ """Tests IPv6 address addition to the loopback interface"""
+ ifa = ipaddress.ip_interface(ifa_str)
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+
+ self.send_check_success(msg)
+
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ assert len(lst) == 2 # link-local should be auto-created as well
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is not None
+
+ assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
+ assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
+ assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
+
+ @pytest.mark.parametrize(
+ "ifa_str",
+ [
+ pytest.param("192.0.2.1/32", id="ipv4_host"),
+ pytest.param("192.0.2.1/24", id="ipv4_prefix"),
+ pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
+ pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "tlv",
+ [
+ pytest.param("local", id="ifa_local"),
+ pytest.param("address", id="ifa_address"),
+ ],
+ )
+ def test_del(self, tlv, ifa_str):
+ """Tests address deletion from the loopback interface"""
+ ifa = ipaddress.ip_interface(ifa_str)
+ iface = self.vnet.iface_alias_map["if1"]
+
+ msg = self.create_msg(ifa)
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+
+ self.send_check_success(msg)
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is not None
+
+ msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
+ msg.set_request()
+ msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
+ msg.base_hdr.ifa_index = iface.ifindex
+ msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
+
+ if tlv == "local":
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
+ if tlv == "address":
+ msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
+
+ self.send_check_success(msg)
+ lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
+ rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
+ assert rx_msg is None
diff --git a/tests/sys/netlink/test_rtnl_neigh.py b/tests/sys/netlink/test_rtnl_neigh.py
new file mode 100644
index 000000000000..aab682d1d8a4
--- /dev/null
+++ b/tests/sys/netlink/test_rtnl_neigh.py
@@ -0,0 +1,53 @@
+import socket
+
+import pytest
+from atf_python.sys.net.vnet import SingleVnetTestTemplate
+from atf_python.sys.netlink.netlink import NetlinkTestTemplate
+from atf_python.sys.netlink.netlink_route import NdAttrType
+from atf_python.sys.netlink.netlink_route import NetlinkNdMessage
+from atf_python.sys.netlink.netlink_route import NlRtMsgType
+from atf_python.sys.netlink.utils import NlConst
+
+
+class TestRtNlNeigh(NetlinkTestTemplate, SingleVnetTestTemplate):
+ def setup_method(self, method):
+ method_name = method.__name__
+ if "4" in method_name:
+ self.IPV4_PREFIXES = ["192.0.2.1/24"]
+ if "6" in method_name:
+ self.IPV6_PREFIXES = ["2001:db8::1/64"]
+ super().setup_method(method)
+ self.setup_netlink(NlConst.NETLINK_ROUTE)
+
+ def filter_iface(self, family, num_items):
+ epair_ifname = self.vnet.iface_alias_map["if1"].name
+ epair_ifindex = socket.if_nametoindex(epair_ifname)
+
+ msg = NetlinkNdMessage(self.helper, NlRtMsgType.RTM_GETNEIGH)
+ msg.set_request()
+ msg.base_hdr.ndm_family = family
+ msg.base_hdr.ndm_ifindex = epair_ifindex
+ self.write_message(msg)
+
+ ret = []
+ for rx_msg in self.read_msg_list(
+ msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWNEIGH
+ ):
+ ifname = socket.if_indextoname(rx_msg.base_hdr.ndm_ifindex)
+ family = rx_msg.base_hdr.ndm_family
+ assert ifname == epair_ifname
+ assert family == family
+ assert rx_msg.get_nla(NdAttrType.NDA_DST) is not None
+ assert rx_msg.get_nla(NdAttrType.NDA_LLADDR) is not None
+ ret.append(rx_msg)
+ assert len(ret) == num_items
+
+ @pytest.mark.timeout(5)
+ def test_6_filter_iface(self):
+ """Tests that listing outputs all nd6 records"""
+ return self.filter_iface(socket.AF_INET6, 2)
+
+ @pytest.mark.timeout(5)
+ def test_4_filter_iface(self):
+ """Tests that listing outputs all arp records"""
+ return self.filter_iface(socket.AF_INET, 1)
diff --git a/tests/sys/netlink/test_rtnl_route.py b/tests/sys/netlink/test_rtnl_route.py
new file mode 100644
index 000000000000..370c8a74a2de
--- /dev/null
+++ b/tests/sys/netlink/test_rtnl_route.py
@@ -0,0 +1,142 @@
+import ipaddress
+import socket
+
+import pytest
+from atf_python.sys.net.tools import ToolsHelper
+from atf_python.sys.net.vnet import IfaceFactory
+from atf_python.sys.net.vnet import SingleVnetTestTemplate
+from atf_python.sys.netlink.attrs import NlAttrIp
+from atf_python.sys.netlink.attrs import NlAttrU32
+from atf_python.sys.netlink.base_headers import NlmBaseFlags
+from atf_python.sys.netlink.base_headers import NlmGetFlags
+from atf_python.sys.netlink.base_headers import NlmNewFlags
+from atf_python.sys.netlink.base_headers import NlMsgType
+from atf_python.sys.netlink.netlink import NetlinkTestTemplate
+from atf_python.sys.netlink.netlink_route import NetlinkRtMessage
+from atf_python.sys.netlink.netlink_route import NlRtMsgType
+from atf_python.sys.netlink.netlink_route import RtattrType
+from atf_python.sys.netlink.utils import NlConst
+
+
+class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate):
+ IPV6_PREFIXES = ["2001:db8::1/64"]
+
+ def setup_method(self, method):
+ super().setup_method(method)
+ self.setup_netlink(NlConst.NETLINK_ROUTE)
+
+ @pytest.mark.timeout(5)
+ def test_add_route6_ll_gw(self):
+ epair_ifname = self.vnet.iface_alias_map["if1"].name
+ epair_ifindex = socket.if_nametoindex(epair_ifname)
+
+ msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
+ msg.set_request()
+ msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
+ msg.base_hdr.rtm_family = socket.AF_INET6
+ msg.base_hdr.rtm_dst_len = 64
+ msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
+ msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1"))
+ msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ ToolsHelper.print_net_debug()
+ ToolsHelper.print_output("netstat -6onW")
+
+ @pytest.mark.timeout(5)
+ def test_add_route6_ll_if_gw(self):
+ self.require_module("if_tun")
+ tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
+ tun_ifindex = socket.if_nametoindex(tun_ifname)
+
+ msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
+ msg.set_request()
+ msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
+ msg.base_hdr.rtm_family = socket.AF_INET6
+ msg.base_hdr.rtm_dst_len = 64
+ msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
+ msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ ToolsHelper.print_net_debug()
+ ToolsHelper.print_output("netstat -6onW")
+
+ @pytest.mark.timeout(5)
+ def test_add_route4_ll_if_gw(self):
+ self.require_module("if_tun")
+ tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
+ tun_ifindex = socket.if_nametoindex(tun_ifname)
+
+ msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
+ msg.set_request()
+ msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
+ msg.base_hdr.rtm_family = socket.AF_INET
+ msg.base_hdr.rtm_dst_len = 32
+ msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "192.0.2.1"))
+ msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
+
+ rx_msg = self.get_reply(msg)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert rx_msg.error_code == 0
+
+ ToolsHelper.print_net_debug()
+ ToolsHelper.print_output("netstat -4onW")
+
+ @pytest.mark.timeout(20)
+ def test_buffer_override(self):
+ msg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value
+ | NlmBaseFlags.NLM_F_REQUEST.value
+ | NlmNewFlags.NLM_F_CREATE.value
+ )
+
+ num_routes = 1000
+ base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed)
+ for i in range(num_routes):
+ base_address[7] = i % 256
+ base_address[6] = i // 256
+ prefix_address = ipaddress.IPv6Address(bytes(base_address))
+
+ msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value)
+ msg.nl_hdr.nlmsg_flags = msg_flags
+ msg.base_hdr.rtm_family = socket.AF_INET6
+ msg.base_hdr.rtm_dst_len = 65
+ msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address)))
+ msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2"))
+
+ self.write_message(msg, silent=True)
+ rx_msg = self.read_message(silent=True)
+ assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
+ assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq
+ assert rx_msg.error_code == 0
+ # Now, dump
+ msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value)
+ msg.nl_hdr.nlmsg_flags = (
+ NlmBaseFlags.NLM_F_ACK.value
+ | NlmBaseFlags.NLM_F_REQUEST.value
+ | NlmGetFlags.NLM_F_ROOT.value
+ | NlmGetFlags.NLM_F_MATCH.value
+ )
+ msg.base_hdr.rtm_family = socket.AF_INET6
+ self.write_message(msg)
+ num_received = 0
+ while True:
+ rx_msg = self.read_message(silent=True)
+ if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
+ if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
+ if rx_msg.error_code != 0:
+ raise ValueError(
+ "unable to dump routes: error {}".format(rx_msg.error_code)
+ )
+ if rx_msg.is_type(NlMsgType.NLMSG_DONE):
+ break
+ if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE):
+ if rx_msg.base_hdr.rtm_dst_len == 65:
+ num_received += 1
+ assert num_routes == num_received
diff --git a/tests/sys/netlink/test_snl.c b/tests/sys/netlink/test_snl.c
new file mode 100644
index 000000000000..3990aa0b075d
--- /dev/null
+++ b/tests/sys/netlink/test_snl.c
@@ -0,0 +1,240 @@
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include <sys/param.h>
+#include <sys/module.h>
+
+#include <netlink/netlink.h>
+#include <netlink/netlink_route.h>
+#include "netlink/netlink_snl.h"
+#include "netlink/netlink_snl_route.h"
+#include "netlink/netlink_snl_route_parsers.h"
+
+#include <atf-c.h>
+
+static const struct snl_hdr_parser *snl_all_core_parsers[] = {
+ &snl_errmsg_parser, &snl_donemsg_parser,
+ &_nla_bit_parser, &_nla_bitset_parser,
+};
+
+static const struct snl_hdr_parser *snl_all_route_parsers[] = {
+ &_metrics_mp_nh_parser, &_mpath_nh_parser, &_metrics_parser, &snl_rtm_route_parser,
+ &_link_fbsd_parser, &snl_rtm_link_parser, &snl_rtm_link_parser_simple,
+ &_neigh_fbsd_parser, &snl_rtm_neigh_parser,
+ &_addr_fbsd_parser, &snl_rtm_addr_parser, &_nh_fbsd_parser, &snl_nhmsg_parser,
+};
+
+ATF_TC(snl_verify_core_parsers);
+ATF_TC_HEAD(snl_verify_core_parsers, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests snl(3) core nlmsg parsers are correct");
+}
+
+ATF_TC_BODY(snl_verify_core_parsers, tc)
+{
+ SNL_VERIFY_PARSERS(snl_all_core_parsers);
+
+}
+
+ATF_TC(snl_verify_route_parsers);
+ATF_TC_HEAD(snl_verify_route_parsers, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests snl(3) route parsers are correct");
+}
+
+ATF_TC_BODY(snl_verify_route_parsers, tc)
+{
+ SNL_VERIFY_PARSERS(snl_all_route_parsers);
+
+}
+
+ATF_TC(snl_parse_errmsg_capped);
+ATF_TC_HEAD(snl_parse_errmsg_capped, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests snl(3) correctly parsing capped errors");
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+
+ATF_TC_BODY(snl_parse_errmsg_capped, tc)
+{
+ struct snl_state ss;
+ struct snl_writer nw;
+
+ if (!snl_init(&ss, NETLINK_ROUTE))
+ atf_tc_fail("snl_init() failed");
+
+ atf_tc_skip("does not work");
+
+ int optval = 1;
+ ATF_CHECK(setsockopt(ss.fd, SOL_NETLINK, NETLINK_CAP_ACK, &optval, sizeof(optval)) == 0);
+
+ snl_init_writer(&ss, &nw);
+
+ struct nlmsghdr *hdr = snl_create_msg_request(&nw, 255);
+ ATF_CHECK(hdr != NULL);
+ ATF_CHECK(snl_reserve_msg_object(&nw, struct ifinfomsg) != NULL);
+ snl_add_msg_attr_string(&nw, 143, "some random string");
+ ATF_CHECK(snl_finalize_msg(&nw) != NULL);
+
+ ATF_CHECK(snl_send_message(&ss, hdr));
+
+ struct nlmsghdr *rx_hdr = snl_read_reply(&ss, hdr->nlmsg_seq);
+ ATF_CHECK(rx_hdr != NULL);
+ ATF_CHECK(rx_hdr->nlmsg_type == NLMSG_ERROR);
+
+ struct snl_errmsg_data e = {};
+ ATF_CHECK(rx_hdr->nlmsg_len == sizeof(struct nlmsghdr) + sizeof(struct nlmsgerr));
+ ATF_CHECK(snl_parse_errmsg(&ss, rx_hdr, &e));
+ ATF_CHECK(e.error != 0);
+ ATF_CHECK(!memcmp(hdr, e.orig_hdr, sizeof(struct nlmsghdr)));
+}
+
+ATF_TC(snl_parse_errmsg_capped_extack);
+ATF_TC_HEAD(snl_parse_errmsg_capped_extack, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests snl(3) correctly parsing capped errors with extack");
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+
+ATF_TC_BODY(snl_parse_errmsg_capped_extack, tc)
+{
+ struct snl_state ss;
+ struct snl_writer nw;
+
+ if (!snl_init(&ss, NETLINK_ROUTE))
+ atf_tc_fail("snl_init() failed");
+
+ int optval = 1;
+ ATF_CHECK(setsockopt(ss.fd, SOL_NETLINK, NETLINK_CAP_ACK, &optval, sizeof(optval)) == 0);
+ optval = 1;
+ ATF_CHECK(setsockopt(ss.fd, SOL_NETLINK, NETLINK_EXT_ACK, &optval, sizeof(optval)) == 0);
+
+ snl_init_writer(&ss, &nw);
+
+ struct nlmsghdr *hdr = snl_create_msg_request(&nw, 255);
+ ATF_CHECK(hdr != NULL);
+ ATF_CHECK(snl_reserve_msg_object(&nw, struct ifinfomsg) != NULL);
+ snl_add_msg_attr_string(&nw, 143, "some random string");
+ ATF_CHECK(snl_finalize_msg(&nw) != NULL);
+
+ ATF_CHECK(snl_send_message(&ss, hdr));
+
+ struct nlmsghdr *rx_hdr = snl_read_reply(&ss, hdr->nlmsg_seq);
+ ATF_CHECK(rx_hdr != NULL);
+ ATF_CHECK(rx_hdr->nlmsg_type == NLMSG_ERROR);
+
+ struct snl_errmsg_data e = {};
+ ATF_CHECK(snl_parse_errmsg(&ss, rx_hdr, &e));
+ ATF_CHECK(e.error != 0);
+ ATF_CHECK(!memcmp(hdr, e.orig_hdr, sizeof(struct nlmsghdr)));
+
+ ATF_CHECK(e.error_str != NULL);
+}
+
+ATF_TC(snl_parse_errmsg_uncapped_extack);
+ATF_TC_HEAD(snl_parse_errmsg_uncapped_extack, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests snl(3) correctly parsing errors with extack");
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+
+ATF_TC_BODY(snl_parse_errmsg_uncapped_extack, tc)
+{
+ struct snl_state ss;
+ struct snl_writer nw;
+
+ ATF_CHECK(snl_init(&ss, NETLINK_ROUTE));
+
+ int optval = 1;
+ ATF_CHECK(setsockopt(ss.fd, SOL_NETLINK, NETLINK_EXT_ACK, &optval, sizeof(optval)) == 0);
+
+ snl_init_writer(&ss, &nw);
+
+ struct nlmsghdr *hdr = snl_create_msg_request(&nw, 255);
+ ATF_CHECK(hdr != NULL);
+ ATF_CHECK(snl_reserve_msg_object(&nw, struct ifinfomsg) != NULL);
+ snl_add_msg_attr_string(&nw, 143, "some random string");
+ ATF_CHECK(snl_finalize_msg(&nw) != NULL);
+
+ ATF_CHECK(snl_send_message(&ss, hdr));
+
+ struct nlmsghdr *rx_hdr = snl_read_reply(&ss, hdr->nlmsg_seq);
+ ATF_CHECK(rx_hdr != NULL);
+ ATF_CHECK(rx_hdr->nlmsg_type == NLMSG_ERROR);
+
+ struct snl_errmsg_data e = {};
+ ATF_CHECK(snl_parse_errmsg(&ss, rx_hdr, &e));
+ ATF_CHECK(e.error != 0);
+ ATF_CHECK(!memcmp(hdr, e.orig_hdr, hdr->nlmsg_len));
+
+ ATF_CHECK(e.error_str != NULL);
+}
+
+ATF_TC(snl_list_ifaces);
+ATF_TC_HEAD(snl_list_ifaces, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests snl(3) listing interfaces");
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+
+struct nl_parsed_link {
+ uint32_t ifi_index;
+ uint32_t ifla_mtu;
+ char *ifla_ifname;
+};
+
+#define _IN(_field) offsetof(struct ifinfomsg, _field)
+#define _OUT(_field) offsetof(struct nl_parsed_link, _field)
+static struct snl_attr_parser ap_link[] = {
+ { .type = IFLA_IFNAME, .off = _OUT(ifla_ifname), .cb = snl_attr_get_string },
+ { .type = IFLA_MTU, .off = _OUT(ifla_mtu), .cb = snl_attr_get_uint32 },
+};
+static struct snl_field_parser fp_link[] = {
+ {.off_in = _IN(ifi_index), .off_out = _OUT(ifi_index), .cb = snl_field_get_uint32 },
+};
+#undef _IN
+#undef _OUT
+SNL_DECLARE_PARSER(link_parser, struct ifinfomsg, fp_link, ap_link);
+
+
+ATF_TC_BODY(snl_list_ifaces, tc)
+{
+ struct snl_state ss;
+ struct snl_writer nw;
+
+ if (!snl_init(&ss, NETLINK_ROUTE))
+ atf_tc_fail("snl_init() failed");
+
+ snl_init_writer(&ss, &nw);
+
+ struct nlmsghdr *hdr = snl_create_msg_request(&nw, RTM_GETLINK);
+ ATF_CHECK(hdr != NULL);
+ ATF_CHECK(snl_reserve_msg_object(&nw, struct ifinfomsg) != NULL);
+ ATF_CHECK(snl_finalize_msg(&nw) != NULL);
+ uint32_t seq_id = hdr->nlmsg_seq;
+
+ ATF_CHECK(snl_send_message(&ss, hdr));
+
+ struct snl_errmsg_data e = {};
+ int count = 0;
+
+ while ((hdr = snl_read_reply_multi(&ss, seq_id, &e)) != NULL) {
+ count++;
+ }
+ ATF_REQUIRE(e.error == 0);
+
+ ATF_REQUIRE_MSG(count > 0, "Empty interface list");
+}
+
+ATF_TP_ADD_TCS(tp)
+{
+ ATF_TP_ADD_TC(tp, snl_verify_core_parsers);
+ ATF_TP_ADD_TC(tp, snl_verify_route_parsers);
+ ATF_TP_ADD_TC(tp, snl_parse_errmsg_capped);
+ ATF_TP_ADD_TC(tp, snl_parse_errmsg_capped_extack);
+ ATF_TP_ADD_TC(tp, snl_parse_errmsg_uncapped_extack);
+ ATF_TP_ADD_TC(tp, snl_list_ifaces);
+
+ return (atf_no_error());
+}
diff --git a/tests/sys/netlink/test_snl_generic.c b/tests/sys/netlink/test_snl_generic.c
new file mode 100644
index 000000000000..8613bf04a45c
--- /dev/null
+++ b/tests/sys/netlink/test_snl_generic.c
@@ -0,0 +1,110 @@
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include <sys/param.h>
+#include <sys/module.h>
+
+#include <netlink/netlink.h>
+#include "netlink/netlink_snl.h"
+#include "netlink/netlink_snl_generic.h"
+
+#include <atf-c.h>
+
+static const struct snl_hdr_parser *snl_all_genl_parsers[] = {
+ &_genl_ctrl_getfam_parser, &_genl_ctrl_mc_parser,
+};
+
+ATF_TC(snl_verify_genl_parsers);
+ATF_TC_HEAD(snl_verify_genl_parsers, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests snl(3) generic parsers are correct");
+}
+
+ATF_TC_BODY(snl_verify_genl_parsers, tc)
+{
+ SNL_VERIFY_PARSERS(snl_all_genl_parsers);
+
+}
+
+ATF_TC(test_snl_get_genl_family_success);
+ATF_TC_HEAD(test_snl_get_genl_family_success, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests successfull resolution of the 'nlctrl' family");
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+
+ATF_TC_BODY(test_snl_get_genl_family_success, tc)
+{
+ struct snl_state ss;
+
+ if (!snl_init(&ss, NETLINK_GENERIC))
+ atf_tc_fail("snl_init() failed");
+
+ ATF_CHECK_EQ(snl_get_genl_family(&ss, "nlctrl"), GENL_ID_CTRL);
+}
+
+ATF_TC(test_snl_get_genl_family_failure);
+ATF_TC_HEAD(test_snl_get_genl_family_failure, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests unsuccessfull resolution of 'no-such-family' family");
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+
+ATF_TC_BODY(test_snl_get_genl_family_failure, tc)
+{
+ struct snl_state ss;
+
+ if (!snl_init(&ss, NETLINK_GENERIC))
+ atf_tc_fail("snl_init() failed");
+
+ ATF_CHECK_EQ(snl_get_genl_family(&ss, "no-such-family"), 0);
+}
+
+ATF_TC(test_snl_get_genl_family_groups);
+ATF_TC_HEAD(test_snl_get_genl_family_groups, tc)
+{
+ atf_tc_set_md_var(tc, "descr", "Tests getting 'nlctrl' groups");
+ atf_tc_set_md_var(tc, "require.kmods", "netlink");
+}
+
+ATF_TC_BODY(test_snl_get_genl_family_groups, tc)
+{
+ struct snl_state ss;
+ struct snl_writer nw;
+ struct nlmsghdr *hdr;
+
+ if (!snl_init(&ss, NETLINK_GENERIC))
+ atf_tc_fail("snl_init() failed");
+
+ snl_init_writer(&ss, &nw);
+ hdr = snl_create_genl_msg_request(&nw, GENL_ID_CTRL, CTRL_CMD_GETFAMILY);
+ snl_add_msg_attr_string(&nw, CTRL_ATTR_FAMILY_NAME, "nlctrl");
+ hdr = snl_finalize_msg(&nw);
+ snl_send_message(&ss, hdr);
+
+ hdr = snl_read_reply(&ss, hdr->nlmsg_seq);
+ ATF_CHECK(hdr != NULL);
+ ATF_CHECK(hdr->nlmsg_type != NLMSG_ERROR);
+
+ struct _getfamily_attrs attrs = {};
+
+ ATF_CHECK(snl_parse_nlmsg(&ss, hdr, &_genl_ctrl_getfam_parser, &attrs));
+ ATF_CHECK_EQ(attrs.mcast_groups.num_groups, 1);
+
+ struct _snl_genl_ctrl_mcast_group *group = attrs.mcast_groups.groups[0];
+
+ ATF_CHECK(group->mcast_grp_id > 0);
+ ATF_CHECK(!strcmp(group->mcast_grp_name, "notify"));
+}
+
+ATF_TP_ADD_TCS(tp)
+{
+ ATF_TP_ADD_TC(tp, snl_verify_genl_parsers);
+ ATF_TP_ADD_TC(tp, test_snl_get_genl_family_success);
+ ATF_TP_ADD_TC(tp, test_snl_get_genl_family_failure);
+ ATF_TP_ADD_TC(tp, test_snl_get_genl_family_groups);
+
+ return (atf_no_error());
+}
+