aboutsummaryrefslogtreecommitdiff
path: root/tools/tools/netmap/pkt-gen.c
diff options
context:
space:
mode:
authorLuigi Rizzo <luigi@FreeBSD.org>2016-10-16 14:13:32 +0000
committerLuigi Rizzo <luigi@FreeBSD.org>2016-10-16 14:13:32 +0000
commit37e3a6d349581b4dd0aebf24be7b1b159a698dcf (patch)
tree0e61deea141c9733af511b0485cf1fd0f2dd17ed /tools/tools/netmap/pkt-gen.c
parent63f6b1a75a8e6e33e4f9d65571c6a221444d3b05 (diff)
downloadsrc-37e3a6d349581b4dd0aebf24be7b1b159a698dcf.tar.gz
src-37e3a6d349581b4dd0aebf24be7b1b159a698dcf.zip
Notes
Diffstat (limited to 'tools/tools/netmap/pkt-gen.c')
-rw-r--r--tools/tools/netmap/pkt-gen.c1114
1 files changed, 854 insertions, 260 deletions
diff --git a/tools/tools/netmap/pkt-gen.c b/tools/tools/netmap/pkt-gen.c
index 6d9bee6de634..168e022cfba9 100644
--- a/tools/tools/netmap/pkt-gen.c
+++ b/tools/tools/netmap/pkt-gen.c
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2011-2014 Matteo Landi, Luigi Rizzo. All rights reserved.
- * Copyright (C) 2013-2014 Universita` di Pisa. All rights reserved.
+ * Copyright (C) 2013-2015 Universita` di Pisa. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -37,8 +37,6 @@
*
*/
-// #define TRASH_VHOST_HDR
-
#define _GNU_SOURCE /* for CPU_SET() */
#include <stdio.h>
#define NETMAP_WITH_LIBS
@@ -49,12 +47,16 @@
#include <unistd.h> // sysconf()
#include <sys/poll.h>
#include <arpa/inet.h> /* ntohs */
+#ifndef _WIN32
#include <sys/sysctl.h> /* sysctl */
+#endif
#include <ifaddrs.h> /* getifaddrs */
#include <net/ethernet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
+#include <assert.h>
+#include <math.h>
#include <pthread.h>
@@ -62,6 +64,69 @@
#include <pcap/pcap.h>
#endif
+#include "ctrs.h"
+
+#ifdef _WIN32
+#define cpuset_t DWORD_PTR //uint64_t
+static inline void CPU_ZERO(cpuset_t *p)
+{
+ *p = 0;
+}
+
+static inline void CPU_SET(uint32_t i, cpuset_t *p)
+{
+ *p |= 1<< (i & 0x3f);
+}
+
+#define pthread_setaffinity_np(a, b, c) !SetThreadAffinityMask(a, *c) //((void)a, 0)
+#define TAP_CLONEDEV "/dev/tap"
+#define AF_LINK 18 //defined in winsocks.h
+#define CLOCK_REALTIME_PRECISE CLOCK_REALTIME
+#include <net/if_dl.h>
+
+/*
+ * Convert an ASCII representation of an ethernet address to
+ * binary form.
+ */
+struct ether_addr *
+ether_aton(const char *a)
+{
+ int i;
+ static struct ether_addr o;
+ unsigned int o0, o1, o2, o3, o4, o5;
+
+ i = sscanf(a, "%x:%x:%x:%x:%x:%x", &o0, &o1, &o2, &o3, &o4, &o5);
+
+ if (i != 6)
+ return (NULL);
+
+ o.octet[0]=o0;
+ o.octet[1]=o1;
+ o.octet[2]=o2;
+ o.octet[3]=o3;
+ o.octet[4]=o4;
+ o.octet[5]=o5;
+
+ return ((struct ether_addr *)&o);
+}
+
+/*
+ * Convert a binary representation of an ethernet address to
+ * an ASCII string.
+ */
+char *
+ether_ntoa(const struct ether_addr *n)
+{
+ int i;
+ static char a[18];
+
+ i = sprintf(a, "%02x:%02x:%02x:%02x:%02x:%02x",
+ n->octet[0], n->octet[1], n->octet[2],
+ n->octet[3], n->octet[4], n->octet[5]);
+ return (i < 17 ? NULL : (char *)&a);
+}
+#endif /* _WIN32 */
+
#ifdef linux
#define cpuset_t cpu_set_t
@@ -169,10 +234,12 @@ struct glob_arg {
int pkt_size;
int burst;
int forever;
- int npackets; /* total packets to send */
+ uint64_t npackets; /* total packets to send */
int frags; /* fragments per packet */
int nthreads;
- int cpus;
+ int cpus; /* cpus used for running */
+ int system_cpus; /* cpus on the system */
+
int options; /* testing */
#define OPT_PREFETCH 1
#define OPT_ACCESS 2
@@ -181,10 +248,10 @@ struct glob_arg {
#define OPT_TS 16 /* add a timestamp */
#define OPT_INDIRECT 32 /* use indirect buffers, tx only */
#define OPT_DUMP 64 /* dump rx/tx traffic */
-#define OPT_MONITOR_TX 128
-#define OPT_MONITOR_RX 256
+#define OPT_RUBBISH 256 /* send wathever the buffers contain */
#define OPT_RANDOM_SRC 512
#define OPT_RANDOM_DST 1024
+#define OPT_PPS_STATS 2048
int dev_type;
#ifndef NO_PCAP
pcap_t *p;
@@ -198,13 +265,18 @@ struct glob_arg {
struct nm_desc *nmd;
int report_interval; /* milliseconds between prints */
void *(*td_body)(void *);
+ int td_type;
void *mmap_addr;
char ifname[MAX_IFNAMELEN];
char *nmr_config;
int dummy_send;
int virt_header; /* send also the virt_header */
int extra_bufs; /* goes in nr_arg3 */
+ int extra_pipes; /* goes in nr_arg1 */
char *packet_file; /* -P option */
+#define STATS_WIN 15
+ int win_idx;
+ int64_t win[STATS_WIN];
};
enum dev_type { DEV_NONE, DEV_NETMAP, DEV_PCAP, DEV_TAP };
@@ -220,7 +292,11 @@ struct targ {
int cancel;
int fd;
struct nm_desc *nmd;
- volatile uint64_t count;
+ /* these ought to be volatile, but they are
+ * only sampled and errors should not accumulate
+ */
+ struct my_ctrs ctr;
+
struct timespec tic, toc;
int me;
pthread_t thread;
@@ -327,11 +403,10 @@ sigint_h(int sig)
int i;
(void)sig; /* UNUSED */
- D("received control-C on thread %p", pthread_self());
+ D("received control-C on thread %p", (void *)pthread_self());
for (i = 0; i < global_nthreads; i++) {
targs[i].cancel = 1;
}
- signal(SIGINT, SIG_DFL);
}
/* sysctl wrapper to return the number of active CPUs */
@@ -345,6 +420,12 @@ system_ncpus(void)
sysctl(mib, 2, &ncpus, &len, NULL, 0);
#elif defined(linux)
ncpus = sysconf(_SC_NPROCESSORS_ONLN);
+#elif defined(_WIN32)
+ {
+ SYSTEM_INFO sysinfo;
+ GetSystemInfo(&sysinfo);
+ ncpus = sysinfo.dwNumberOfProcessors;
+ }
#else /* others */
ncpus = 1;
#endif /* others */
@@ -518,10 +599,11 @@ wrapsum(u_int32_t sum)
* Look for consecutive ascii representations of the size of the packet.
*/
static void
-dump_payload(char *p, int len, struct netmap_ring *ring, int cur)
+dump_payload(const char *_p, int len, struct netmap_ring *ring, int cur)
{
char buf[128];
int i, j, i0;
+ const unsigned char *p = (const unsigned char *)_p;
/* get the length in ASCII of the length of the packet. */
@@ -629,6 +711,7 @@ initialize_packet(struct targ *targ)
indirect_payload : default_payload;
int i, l0 = strlen(payload);
+#ifndef NO_PCAP
char errbuf[PCAP_ERRBUF_SIZE];
pcap_t *file;
struct pcap_pkthdr *header;
@@ -650,6 +733,7 @@ initialize_packet(struct targ *targ)
pcap_close(file);
return;
}
+#endif
/* create a nice NUL-terminated string */
for (i = 0; i < paylen; i += l0) {
@@ -695,35 +779,49 @@ initialize_packet(struct targ *targ)
eh->ether_type = htons(ETHERTYPE_IP);
bzero(&pkt->vh, sizeof(pkt->vh));
-#ifdef TRASH_VHOST_HDR
- /* set bogus content */
- pkt->vh.fields[0] = 0xff;
- pkt->vh.fields[1] = 0xff;
- pkt->vh.fields[2] = 0xff;
- pkt->vh.fields[3] = 0xff;
- pkt->vh.fields[4] = 0xff;
- pkt->vh.fields[5] = 0xff;
-#endif /* TRASH_VHOST_HDR */
// dump_payload((void *)pkt, targ->g->pkt_size, NULL, 0);
}
static void
-set_vnet_hdr_len(struct targ *t)
+get_vnet_hdr_len(struct glob_arg *g)
{
- int err, l = t->g->virt_header;
+ struct nmreq req;
+ int err;
+
+ memset(&req, 0, sizeof(req));
+ bcopy(g->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name));
+ req.nr_version = NETMAP_API;
+ req.nr_cmd = NETMAP_VNET_HDR_GET;
+ err = ioctl(g->main_fd, NIOCREGIF, &req);
+ if (err) {
+ D("Unable to get virtio-net header length");
+ return;
+ }
+
+ g->virt_header = req.nr_arg1;
+ if (g->virt_header) {
+ D("Port requires virtio-net header, length = %d",
+ g->virt_header);
+ }
+}
+
+static void
+set_vnet_hdr_len(struct glob_arg *g)
+{
+ int err, l = g->virt_header;
struct nmreq req;
if (l == 0)
return;
memset(&req, 0, sizeof(req));
- bcopy(t->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name));
+ bcopy(g->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name));
req.nr_version = NETMAP_API;
req.nr_cmd = NETMAP_BDG_VNET_HDR;
req.nr_arg1 = l;
- err = ioctl(t->fd, NIOCREGIF, &req);
+ err = ioctl(g->main_fd, NIOCREGIF, &req);
if (err) {
- D("Unable to set vnet header length %d", l);
+ D("Unable to set virtio-net header length %d", l);
}
}
@@ -763,12 +861,15 @@ send_packets(struct netmap_ring *ring, struct pkt *pkt, void *frame,
for (fcnt = nfrags, sent = 0; sent < count; sent++) {
struct netmap_slot *slot = &ring->slot[cur];
char *p = NETMAP_BUF(ring, slot->buf_idx);
+ int buf_changed = slot->flags & NS_BUF_CHANGED;
slot->flags = 0;
- if (options & OPT_INDIRECT) {
+ if (options & OPT_RUBBISH) {
+ /* do nothing */
+ } else if (options & OPT_INDIRECT) {
slot->flags |= NS_INDIRECT;
- slot->ptr = (uint64_t)frame;
- } else if (options & OPT_COPY) {
+ slot->ptr = (uint64_t)((uintptr_t)frame);
+ } else if ((options & OPT_COPY) || buf_changed) {
nm_pkt_copy(frame, p, size);
if (fcnt == nfrags)
update_addresses(pkt, g);
@@ -798,6 +899,21 @@ send_packets(struct netmap_ring *ring, struct pkt *pkt, void *frame,
}
/*
+ * Index of the highest bit set
+ */
+uint32_t
+msb64(uint64_t x)
+{
+ uint64_t m = 1ULL << 63;
+ int i;
+
+ for (i = 63; i >= 0; i--, m >>=1)
+ if (m & x)
+ return i;
+ return 0;
+}
+
+/*
* Send a packet, and wait for a response.
* The payload (after UDP header, ofs 42) has a 4-byte sequence
* followed by a struct timeval (or bintime?)
@@ -810,25 +926,28 @@ pinger_body(void *data)
struct targ *targ = (struct targ *) data;
struct pollfd pfd = { .fd = targ->fd, .events = POLLIN };
struct netmap_if *nifp = targ->nmd->nifp;
- int i, rx = 0, n = targ->g->npackets;
+ int i, rx = 0;
void *frame;
int size;
- uint32_t sent = 0;
struct timespec ts, now, last_print;
- uint32_t count = 0, min = 1000000000, av = 0;
+ uint64_t sent = 0, n = targ->g->npackets;
+ uint64_t count = 0, t_cur, t_min = ~0, av = 0;
+ uint64_t buckets[64]; /* bins for delays, ns */
frame = &targ->pkt;
frame += sizeof(targ->pkt.vh) - targ->g->virt_header;
size = targ->g->pkt_size + targ->g->virt_header;
+
if (targ->g->nthreads > 1) {
D("can only ping with 1 thread");
return NULL;
}
+ bzero(&buckets, sizeof(buckets));
clock_gettime(CLOCK_REALTIME_PRECISE, &last_print);
now = last_print;
- while (n == 0 || (int)sent < n) {
+ while (!targ->cancel && (n == 0 || sent < n)) {
struct netmap_ring *ring = NETMAP_TXRING(nifp, 0);
struct netmap_slot *slot;
char *p;
@@ -864,6 +983,8 @@ pinger_body(void *data)
while (!nm_ring_empty(ring)) {
uint32_t seq;
struct tstamp *tp;
+ int pos;
+
slot = &ring->slot[ring->cur];
p = NETMAP_BUF(ring, slot->buf_idx);
@@ -878,12 +999,16 @@ pinger_body(void *data)
ts.tv_nsec += 1000000000;
ts.tv_sec--;
}
- if (1) D("seq %d/%d delta %d.%09d", seq, sent,
+ if (0) D("seq %d/%lu delta %d.%09d", seq, sent,
(int)ts.tv_sec, (int)ts.tv_nsec);
- if (ts.tv_nsec < (int)min)
- min = ts.tv_nsec;
+ t_cur = ts.tv_sec * 1000000000UL + ts.tv_nsec;
+ if (t_cur < t_min)
+ t_min = t_cur;
count ++;
- av += ts.tv_nsec;
+ av += t_cur;
+ pos = msb64(t_cur);
+ buckets[pos]++;
+ /* now store it in a bucket */
ring->head = ring->cur = nm_ring_next(ring, ring->cur);
rx++;
}
@@ -897,14 +1022,32 @@ pinger_body(void *data)
ts.tv_sec--;
}
if (ts.tv_sec >= 1) {
- D("count %d min %d av %d",
- count, min, av/count);
+ D("count %d RTT: min %d av %d ns",
+ (int)count, (int)t_min, (int)(av/count));
+ int k, j, kmin;
+ char buf[512];
+
+ for (kmin = 0; kmin < 64; kmin ++)
+ if (buckets[kmin])
+ break;
+ for (k = 63; k >= kmin; k--)
+ if (buckets[k])
+ break;
+ buf[0] = '\0';
+ for (j = kmin; j <= k; j++)
+ sprintf(buf, "%s %5d", buf, (int)buckets[j]);
+ D("k: %d .. %d\n\t%s", 1<<kmin, 1<<k, buf);
+ bzero(&buckets, sizeof(buckets));
count = 0;
av = 0;
- min = 100000000;
+ t_min = ~0;
last_print = now;
}
}
+
+ /* reset the ``used`` flag. */
+ targ->used = 0;
+
return NULL;
}
@@ -919,14 +1062,15 @@ ponger_body(void *data)
struct pollfd pfd = { .fd = targ->fd, .events = POLLIN };
struct netmap_if *nifp = targ->nmd->nifp;
struct netmap_ring *txring, *rxring;
- int i, rx = 0, sent = 0, n = targ->g->npackets;
+ int i, rx = 0;
+ uint64_t sent = 0, n = targ->g->npackets;
if (targ->g->nthreads > 1) {
D("can only reply ping with 1 thread");
return NULL;
}
- D("understood ponger %d but don't know how to do it", n);
- while (n == 0 || sent < n) {
+ D("understood ponger %lu but don't know how to do it", n);
+ while (!targ->cancel && (n == 0 || sent < n)) {
uint32_t txcur, txavail;
//#define BUSYWAIT
#ifdef BUSYWAIT
@@ -975,69 +1119,17 @@ ponger_body(void *data)
}
}
txring->head = txring->cur = txcur;
- targ->count = sent;
+ targ->ctr.pkts = sent;
#ifdef BUSYWAIT
ioctl(pfd.fd, NIOCTXSYNC, NULL);
#endif
//D("tx %d rx %d", sent, rx);
}
- return NULL;
-}
-
-static __inline int
-timespec_ge(const struct timespec *a, const struct timespec *b)
-{
-
- if (a->tv_sec > b->tv_sec)
- return (1);
- if (a->tv_sec < b->tv_sec)
- return (0);
- if (a->tv_nsec >= b->tv_nsec)
- return (1);
- return (0);
-}
-static __inline struct timespec
-timeval2spec(const struct timeval *a)
-{
- struct timespec ts = {
- .tv_sec = a->tv_sec,
- .tv_nsec = a->tv_usec * 1000
- };
- return ts;
-}
-
-static __inline struct timeval
-timespec2val(const struct timespec *a)
-{
- struct timeval tv = {
- .tv_sec = a->tv_sec,
- .tv_usec = a->tv_nsec / 1000
- };
- return tv;
-}
-
-
-static __inline struct timespec
-timespec_add(struct timespec a, struct timespec b)
-{
- struct timespec ret = { a.tv_sec + b.tv_sec, a.tv_nsec + b.tv_nsec };
- if (ret.tv_nsec >= 1000000000) {
- ret.tv_sec++;
- ret.tv_nsec -= 1000000000;
- }
- return ret;
-}
+ /* reset the ``used`` flag. */
+ targ->used = 0;
-static __inline struct timespec
-timespec_sub(struct timespec a, struct timespec b)
-{
- struct timespec ret = { a.tv_sec - b.tv_sec, a.tv_nsec - b.tv_nsec };
- if (ret.tv_nsec < 0) {
- ret.tv_sec--;
- ret.tv_nsec += 1000000000;
- }
- return ret;
+ return NULL;
}
@@ -1065,9 +1157,11 @@ sender_body(void *data)
struct targ *targ = (struct targ *) data;
struct pollfd pfd = { .fd = targ->fd, .events = POLLOUT };
struct netmap_if *nifp;
- struct netmap_ring *txring;
- int i, n = targ->g->npackets / targ->g->nthreads;
- int64_t sent = 0;
+ struct netmap_ring *txring = NULL;
+ int i;
+ uint64_t n = targ->g->npackets / targ->g->nthreads;
+ uint64_t sent = 0;
+ uint64_t event = 0;
int options = targ->g->options | OPT_COPY;
struct timespec nexttime = { 0, 0}; // XXX silence compiler
int rate_limit = targ->g->tx_rate;
@@ -1104,7 +1198,9 @@ sender_body(void *data)
sent++;
update_addresses(pkt, targ->g);
if (i > 10000) {
- targ->count = sent;
+ targ->ctr.pkts = sent;
+ targ->ctr.bytes = sent*size;
+ targ->ctr.events = sent;
i = 0;
}
}
@@ -1117,7 +1213,9 @@ sender_body(void *data)
sent++;
update_addresses(pkt, targ->g);
if (i > 10000) {
- targ->count = sent;
+ targ->ctr.pkts = sent;
+ targ->ctr.bytes = sent*size;
+ targ->ctr.events = sent;
i = 0;
}
}
@@ -1126,7 +1224,7 @@ sender_body(void *data)
int tosend = 0;
int frags = targ->g->frags;
- nifp = targ->nmd->nifp;
+ nifp = targ->nmd->nifp;
while (!targ->cancel && (n == 0 || sent < n)) {
if (rate_limit && tosend <= 0) {
@@ -1138,6 +1236,13 @@ sender_body(void *data)
/*
* wait for available room in the send queue(s)
*/
+#ifdef BUSYWAIT
+ if (ioctl(pfd.fd, NIOCTXSYNC, NULL) < 0) {
+ D("ioctl error on queue %d: %s", targ->me,
+ strerror(errno));
+ goto quit;
+ }
+#else /* !BUSYWAIT */
if (poll(&pfd, 1, 2000) <= 0) {
if (targ->cancel)
break;
@@ -1146,9 +1251,11 @@ sender_body(void *data)
// goto quit;
}
if (pfd.revents & POLLERR) {
- D("poll error");
+ D("poll error on %d ring %d-%d", pfd.fd,
+ targ->nmd->first_tx_ring, targ->nmd->last_tx_ring);
goto quit;
}
+#endif /* !BUSYWAIT */
/*
* scan our queues and send on those with room
*/
@@ -1157,7 +1264,8 @@ sender_body(void *data)
options &= ~OPT_COPY;
}
for (i = targ->nmd->first_tx_ring; i <= targ->nmd->last_tx_ring; i++) {
- int m, limit = rate_limit ? tosend : targ->g->burst;
+ int m;
+ uint64_t limit = rate_limit ? tosend : targ->g->burst;
if (n > 0 && n - sent < limit)
limit = n - sent;
txring = NETMAP_TXRING(nifp, i);
@@ -1171,7 +1279,11 @@ sender_body(void *data)
ND("limit %d tail %d frags %d m %d",
limit, txring->tail, frags, m);
sent += m;
- targ->count = sent;
+ if (m > 0) //XXX-ste: can m be 0?
+ event++;
+ targ->ctr.pkts = sent;
+ targ->ctr.bytes = sent*size;
+ targ->ctr.events = event;
if (rate_limit) {
tosend -= m;
if (tosend <= 0)
@@ -1182,13 +1294,13 @@ sender_body(void *data)
/* flush any remaining packets */
D("flush tail %d head %d on thread %p",
txring->tail, txring->head,
- pthread_self());
+ (void *)pthread_self());
ioctl(pfd.fd, NIOCTXSYNC, NULL);
/* final part: wait all the TX queues to be empty. */
for (i = targ->nmd->first_tx_ring; i <= targ->nmd->last_tx_ring; i++) {
txring = NETMAP_TXRING(nifp, i);
- while (nm_tx_pending(txring)) {
+ while (!targ->cancel && nm_tx_pending(txring)) {
RD(5, "pending tx tail %d head %d on ring %d",
txring->tail, txring->head, i);
ioctl(pfd.fd, NIOCTXSYNC, NULL);
@@ -1199,8 +1311,9 @@ sender_body(void *data)
clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
targ->completed = 1;
- targ->count = sent;
-
+ targ->ctr.pkts = sent;
+ targ->ctr.bytes = sent*size;
+ targ->ctr.events = event;
quit:
/* reset the ``used`` flag. */
targ->used = 0;
@@ -1214,17 +1327,22 @@ static void
receive_pcap(u_char *user, const struct pcap_pkthdr * h,
const u_char * bytes)
{
- int *count = (int *)user;
- (void)h; /* UNUSED */
+ struct my_ctrs *ctr = (struct my_ctrs *)user;
(void)bytes; /* UNUSED */
- (*count)++;
+ ctr->bytes += h->len;
+ ctr->pkts++;
}
#endif /* !NO_PCAP */
+
static int
-receive_packets(struct netmap_ring *ring, u_int limit, int dump)
+receive_packets(struct netmap_ring *ring, u_int limit, int dump, uint64_t *bytes)
{
u_int cur, rx, n;
+ uint64_t b = 0;
+
+ if (bytes == NULL)
+ bytes = &b;
cur = ring->cur;
n = nm_ring_space(ring);
@@ -1234,6 +1352,7 @@ receive_packets(struct netmap_ring *ring, u_int limit, int dump)
struct netmap_slot *slot = &ring->slot[cur];
char *p = NETMAP_BUF(ring, slot->buf_idx);
+ *bytes += slot->len;
if (dump)
dump_payload(p, slot->len, ring, cur);
@@ -1252,7 +1371,10 @@ receiver_body(void *data)
struct netmap_if *nifp;
struct netmap_ring *rxring;
int i;
- uint64_t received = 0;
+ struct my_ctrs cur;
+
+ cur.pkts = cur.bytes = cur.events = cur.min_space = 0;
+ cur.t.tv_usec = cur.t.tv_sec = 0; // unused, just silence the compiler
if (setaffinity(targ->thread, targ->affinity))
goto quit;
@@ -1273,24 +1395,36 @@ receiver_body(void *data)
while (!targ->cancel) {
char buf[MAX_BODYSIZE];
/* XXX should we poll ? */
- if (read(targ->g->main_fd, buf, sizeof(buf)) > 0)
- targ->count++;
+ i = read(targ->g->main_fd, buf, sizeof(buf));
+ if (i > 0) {
+ targ->ctr.pkts++;
+ targ->ctr.bytes += i;
+ targ->ctr.events++;
+ }
}
#ifndef NO_PCAP
} else if (targ->g->dev_type == DEV_PCAP) {
while (!targ->cancel) {
/* XXX should we poll ? */
pcap_dispatch(targ->g->p, targ->g->burst, receive_pcap,
- (u_char *)&targ->count);
+ (u_char *)&targ->ctr);
+ targ->ctr.events++;
}
#endif /* !NO_PCAP */
} else {
int dump = targ->g->options & OPT_DUMP;
- nifp = targ->nmd->nifp;
+ nifp = targ->nmd->nifp;
while (!targ->cancel) {
/* Once we started to receive packets, wait at most 1 seconds
before quitting. */
+#ifdef BUSYWAIT
+ if (ioctl(pfd.fd, NIOCRXSYNC, NULL) < 0) {
+ D("ioctl error on queue %d: %s", targ->me,
+ strerror(errno));
+ goto quit;
+ }
+#else /* !BUSYWAIT */
if (poll(&pfd, 1, 1 * 1000) <= 0 && !targ->g->forever) {
clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
targ->toc.tv_sec -= 1; /* Subtract timeout time. */
@@ -1301,26 +1435,39 @@ receiver_body(void *data)
D("poll err");
goto quit;
}
-
+#endif /* !BUSYWAIT */
+ uint64_t cur_space = 0;
for (i = targ->nmd->first_rx_ring; i <= targ->nmd->last_rx_ring; i++) {
int m;
rxring = NETMAP_RXRING(nifp, i);
+ /* compute free space in the ring */
+ m = rxring->head + rxring->num_slots - rxring->tail;
+ if (m >= (int) rxring->num_slots)
+ m -= rxring->num_slots;
+ cur_space += m;
if (nm_ring_empty(rxring))
continue;
- m = receive_packets(rxring, targ->g->burst, dump);
- received += m;
+ m = receive_packets(rxring, targ->g->burst, dump, &cur.bytes);
+ cur.pkts += m;
+ if (m > 0) //XXX-ste: can m be 0?
+ cur.events++;
}
- targ->count = received;
+ cur.min_space = targ->ctr.min_space;
+ if (cur_space < cur.min_space)
+ cur.min_space = cur_space;
+ targ->ctr = cur;
}
}
clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
+#if !defined(BUSYWAIT)
out:
+#endif
targ->completed = 1;
- targ->count = received;
+ targ->ctr = cur;
quit:
/* reset the ``used`` flag. */
@@ -1329,56 +1476,390 @@ quit:
return (NULL);
}
-/* very crude code to print a number in normalized form.
- * Caller has to make sure that the buffer is large enough.
- */
-static const char *
-norm(char *buf, double val)
+static void *
+txseq_body(void *data)
+{
+ struct targ *targ = (struct targ *) data;
+ struct pollfd pfd = { .fd = targ->fd, .events = POLLOUT };
+ struct netmap_ring *ring;
+ int64_t sent = 0;
+ uint64_t event = 0;
+ int options = targ->g->options | OPT_COPY;
+ struct timespec nexttime = {0, 0};
+ int rate_limit = targ->g->tx_rate;
+ struct pkt *pkt = &targ->pkt;
+ int frags = targ->g->frags;
+ uint32_t sequence = 0;
+ int budget = 0;
+ void *frame;
+ int size;
+
+ if (targ->g->nthreads > 1) {
+ D("can only txseq ping with 1 thread");
+ return NULL;
+ }
+
+ if (targ->g->npackets > 0) {
+ D("Ignoring -n argument");
+ }
+
+ frame = pkt;
+ frame += sizeof(pkt->vh) - targ->g->virt_header;
+ size = targ->g->pkt_size + targ->g->virt_header;
+
+ D("start, fd %d main_fd %d", targ->fd, targ->g->main_fd);
+ if (setaffinity(targ->thread, targ->affinity))
+ goto quit;
+
+ clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
+ if (rate_limit) {
+ targ->tic = timespec_add(targ->tic, (struct timespec){2,0});
+ targ->tic.tv_nsec = 0;
+ wait_time(targ->tic);
+ nexttime = targ->tic;
+ }
+
+ /* Only use the first queue. */
+ ring = NETMAP_TXRING(targ->nmd->nifp, targ->nmd->first_tx_ring);
+
+ while (!targ->cancel) {
+ int64_t limit;
+ unsigned int space;
+ unsigned int head;
+ int fcnt;
+
+ if (!rate_limit) {
+ budget = targ->g->burst;
+
+ } else if (budget <= 0) {
+ budget = targ->g->burst;
+ nexttime = timespec_add(nexttime, targ->g->tx_period);
+ wait_time(nexttime);
+ }
+
+ /* wait for available room in the send queue */
+ if (poll(&pfd, 1, 2000) <= 0) {
+ if (targ->cancel)
+ break;
+ D("poll error/timeout on queue %d: %s", targ->me,
+ strerror(errno));
+ }
+ if (pfd.revents & POLLERR) {
+ D("poll error on %d ring %d-%d", pfd.fd,
+ targ->nmd->first_tx_ring, targ->nmd->last_tx_ring);
+ goto quit;
+ }
+
+ /* If no room poll() again. */
+ space = nm_ring_space(ring);
+ if (!space) {
+ continue;
+ }
+
+ limit = budget;
+
+ if (space < limit) {
+ limit = space;
+ }
+
+ /* Cut off ``limit`` to make sure is multiple of ``frags``. */
+ if (frags > 1) {
+ limit = (limit / frags) * frags;
+ }
+
+ limit = sent + limit; /* Convert to absolute. */
+
+ for (fcnt = frags, head = ring->head;
+ sent < limit; sent++, sequence++) {
+ struct netmap_slot *slot = &ring->slot[head];
+ char *p = NETMAP_BUF(ring, slot->buf_idx);
+
+ slot->flags = 0;
+ pkt->body[0] = sequence >> 24;
+ pkt->body[1] = (sequence >> 16) & 0xff;
+ pkt->body[2] = (sequence >> 8) & 0xff;
+ pkt->body[3] = sequence & 0xff;
+ nm_pkt_copy(frame, p, size);
+ if (fcnt == frags) {
+ update_addresses(pkt, targ->g);
+ }
+
+ if (options & OPT_DUMP) {
+ dump_payload(p, size, ring, head);
+ }
+
+ slot->len = size;
+
+ if (--fcnt > 0) {
+ slot->flags |= NS_MOREFRAG;
+ } else {
+ fcnt = frags;
+ }
+
+ if (sent == limit - 1) {
+ /* Make sure we don't push an incomplete
+ * packet. */
+ assert(!(slot->flags & NS_MOREFRAG));
+ slot->flags |= NS_REPORT;
+ }
+
+ head = nm_ring_next(ring, head);
+ if (rate_limit) {
+ budget--;
+ }
+ }
+
+ ring->cur = ring->head = head;
+
+ event ++;
+ targ->ctr.pkts = sent;
+ targ->ctr.bytes = sent * size;
+ targ->ctr.events = event;
+ }
+
+ /* flush any remaining packets */
+ D("flush tail %d head %d on thread %p",
+ ring->tail, ring->head,
+ (void *)pthread_self());
+ ioctl(pfd.fd, NIOCTXSYNC, NULL);
+
+ /* final part: wait the TX queues to become empty. */
+ while (!targ->cancel && nm_tx_pending(ring)) {
+ RD(5, "pending tx tail %d head %d on ring %d",
+ ring->tail, ring->head, targ->nmd->first_tx_ring);
+ ioctl(pfd.fd, NIOCTXSYNC, NULL);
+ usleep(1); /* wait 1 tick */
+ }
+
+ clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
+ targ->completed = 1;
+ targ->ctr.pkts = sent;
+ targ->ctr.bytes = sent * size;
+ targ->ctr.events = event;
+quit:
+ /* reset the ``used`` flag. */
+ targ->used = 0;
+
+ return (NULL);
+}
+
+
+static char *
+multi_slot_to_string(struct netmap_ring *ring, unsigned int head,
+ unsigned int nfrags, char *strbuf, size_t strbuflen)
{
- char *units[] = { "", "K", "M", "G", "T" };
- u_int i;
+ unsigned int f;
+ char *ret = strbuf;
+
+ for (f = 0; f < nfrags; f++) {
+ struct netmap_slot *slot = &ring->slot[head];
+ int m = snprintf(strbuf, strbuflen, "|%u,%x|", slot->len,
+ slot->flags);
+ if (m >= (int)strbuflen) {
+ break;
+ }
+ strbuf += m;
+ strbuflen -= m;
- for (i = 0; val >=1000 && i < sizeof(units)/sizeof(char *) - 1; i++)
- val /= 1000;
- sprintf(buf, "%.2f %s", val, units[i]);
- return buf;
+ head = nm_ring_next(ring, head);
+ }
+
+ return ret;
}
-static void
-tx_output(uint64_t sent, int size, double delta)
+static void *
+rxseq_body(void *data)
{
- double bw, raw_bw, pps;
- char b1[40], b2[80], b3[80];
+ struct targ *targ = (struct targ *) data;
+ struct pollfd pfd = { .fd = targ->fd, .events = POLLIN };
+ int dump = targ->g->options & OPT_DUMP;
+ struct netmap_ring *ring;
+ unsigned int frags_exp = 1;
+ uint32_t seq_exp = 0;
+ struct my_ctrs cur;
+ unsigned int frags = 0;
+ int first_packet = 1;
+ int first_slot = 1;
+ int i;
- printf("Sent %llu packets, %d bytes each, in %.2f seconds.\n",
- (unsigned long long)sent, size, delta);
- if (delta == 0)
- delta = 1e-6;
- if (size < 60) /* correct for min packet size */
- size = 60;
- pps = sent / delta;
- bw = (8.0 * size * sent) / delta;
- /* raw packets have4 bytes crc + 20 bytes framing */
- raw_bw = (8.0 * (size + 24) * sent) / delta;
+ cur.pkts = cur.bytes = cur.events = cur.min_space = 0;
+ cur.t.tv_usec = cur.t.tv_sec = 0; // unused, just silence the compiler
+
+ if (setaffinity(targ->thread, targ->affinity))
+ goto quit;
+
+ D("reading from %s fd %d main_fd %d",
+ targ->g->ifname, targ->fd, targ->g->main_fd);
+ /* unbounded wait for the first packet. */
+ for (;!targ->cancel;) {
+ i = poll(&pfd, 1, 1000);
+ if (i > 0 && !(pfd.revents & POLLERR))
+ break;
+ RD(1, "waiting for initial packets, poll returns %d %d",
+ i, pfd.revents);
+ }
+
+ clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
+
+ ring = NETMAP_RXRING(targ->nmd->nifp, targ->nmd->first_rx_ring);
+
+ while (!targ->cancel) {
+ unsigned int head;
+ uint32_t seq;
+ int limit;
+
+ /* Once we started to receive packets, wait at most 1 seconds
+ before quitting. */
+ if (poll(&pfd, 1, 1 * 1000) <= 0 && !targ->g->forever) {
+ clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
+ targ->toc.tv_sec -= 1; /* Subtract timeout time. */
+ goto out;
+ }
+
+ if (pfd.revents & POLLERR) {
+ D("poll err");
+ goto quit;
+ }
+
+ if (nm_ring_empty(ring))
+ continue;
+
+ limit = nm_ring_space(ring);
+ if (limit > targ->g->burst)
+ limit = targ->g->burst;
+
+#if 0
+ /* Enable this if
+ * 1) we remove the early-return optimization from
+ * the netmap poll implementation, or
+ * 2) pipes get NS_MOREFRAG support.
+ * With the current netmap implementation, an experiment like
+ * pkt-gen -i vale:1{1 -f txseq -F 9
+ * pkt-gen -i vale:1}1 -f rxseq
+ * would get stuck as soon as we find nm_ring_space(ring) < 9,
+ * since here limit is rounded to 0 and
+ * pipe rxsync is not called anymore by the poll() of this loop.
+ */
+ if (frags_exp > 1) {
+ int o = limit;
+ /* Cut off to the closest smaller multiple. */
+ limit = (limit / frags_exp) * frags_exp;
+ RD(2, "LIMIT %d --> %d", o, limit);
+ }
+#endif
- printf("Speed: %spps Bandwidth: %sbps (raw %sbps)\n",
- norm(b1, pps), norm(b2, bw), norm(b3, raw_bw) );
+ for (head = ring->head, i = 0; i < limit; i++) {
+ struct netmap_slot *slot = &ring->slot[head];
+ char *p = NETMAP_BUF(ring, slot->buf_idx);
+ int len = slot->len;
+ struct pkt *pkt;
+
+ if (dump) {
+ dump_payload(p, slot->len, ring, head);
+ }
+
+ frags++;
+ if (!(slot->flags & NS_MOREFRAG)) {
+ if (first_packet) {
+ first_packet = 0;
+ } else if (frags != frags_exp) {
+ char prbuf[512];
+ RD(1, "Received packets with %u frags, "
+ "expected %u, '%s'", frags, frags_exp,
+ multi_slot_to_string(ring, head-frags+1, frags,
+ prbuf, sizeof(prbuf)));
+ }
+ first_packet = 0;
+ frags_exp = frags;
+ frags = 0;
+ }
+
+ p -= sizeof(pkt->vh) - targ->g->virt_header;
+ len += sizeof(pkt->vh) - targ->g->virt_header;
+ pkt = (struct pkt *)p;
+
+ if ((char *)pkt + len < ((char *)pkt->body) + sizeof(seq)) {
+ RD(1, "%s: packet too small (len=%u)", __func__,
+ slot->len);
+ } else {
+ seq = (pkt->body[0] << 24) | (pkt->body[1] << 16)
+ | (pkt->body[2] << 8) | pkt->body[3];
+ if (first_slot) {
+ /* Grab the first one, whatever it
+ is. */
+ seq_exp = seq;
+ first_slot = 0;
+ } else if (seq != seq_exp) {
+ uint32_t delta = seq - seq_exp;
+
+ if (delta < (0xFFFFFFFF >> 1)) {
+ RD(2, "Sequence GAP: exp %u found %u",
+ seq_exp, seq);
+ } else {
+ RD(2, "Sequence OUT OF ORDER: "
+ "exp %u found %u", seq_exp, seq);
+ }
+ seq_exp = seq;
+ }
+ seq_exp++;
+ }
+
+ cur.bytes += slot->len;
+ head = nm_ring_next(ring, head);
+ cur.pkts++;
+ }
+
+ ring->cur = ring->head = head;
+
+ cur.events++;
+ targ->ctr = cur;
+ }
+
+ clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
+
+out:
+ targ->completed = 1;
+ targ->ctr = cur;
+
+quit:
+ /* reset the ``used`` flag. */
+ targ->used = 0;
+
+ return (NULL);
}
static void
-rx_output(uint64_t received, double delta)
+tx_output(struct my_ctrs *cur, double delta, const char *msg)
{
- double pps;
- char b1[40];
+ double bw, raw_bw, pps, abs;
+ char b1[40], b2[80], b3[80];
+ int size;
- printf("Received %llu packets, in %.2f seconds.\n",
- (unsigned long long) received, delta);
+ if (cur->pkts == 0) {
+ printf("%s nothing.\n", msg);
+ return;
+ }
+
+ size = (int)(cur->bytes / cur->pkts);
+ printf("%s %llu packets %llu bytes %llu events %d bytes each in %.2f seconds.\n",
+ msg,
+ (unsigned long long)cur->pkts,
+ (unsigned long long)cur->bytes,
+ (unsigned long long)cur->events, size, delta);
if (delta == 0)
delta = 1e-6;
- pps = received / delta;
- printf("Speed: %spps\n", norm(b1, pps));
+ if (size < 60) /* correct for min packet size */
+ size = 60;
+ pps = cur->pkts / delta;
+ bw = (8.0 * cur->bytes) / delta;
+ /* raw packets have4 bytes crc + 20 bytes framing */
+ raw_bw = (8.0 * (cur->pkts * 24 + cur->bytes)) / delta;
+ abs = cur->pkts / (double)(cur->events);
+
+ printf("Speed: %spps Bandwidth: %sbps (raw %sbps). Average batch: %.2f pkts\n",
+ norm(b1, pps), norm(b2, bw), norm(b3, raw_bw), abs);
}
static void
@@ -1389,9 +1870,9 @@ usage(void)
"Usage:\n"
"%s arguments\n"
"\t-i interface interface name\n"
- "\t-f function tx rx ping pong\n"
+ "\t-f function tx rx ping pong txseq rxseq\n"
"\t-n count number of iterations (can be 0)\n"
- "\t-t pkts_to_send also forces tx mode\n"
+ "\t-t pkts_to_send also forces tx mode\n"
"\t-r pkts_to_receive also forces rx mode\n"
"\t-l pkt_size in bytes excluding CRC\n"
"\t-d dst_ip[:port[-dst_ip:port]] single or range\n"
@@ -1403,20 +1884,29 @@ usage(void)
"\t-c cores cores to use\n"
"\t-p threads processes/threads to use\n"
"\t-T report_ms milliseconds between reports\n"
- "\t-P use libpcap instead of netmap\n"
"\t-w wait_for_link_time in seconds\n"
"\t-R rate in packets per second\n"
"\t-X dump payload\n"
"\t-H len add empty virtio-net-header with size 'len'\n"
+ "\t-E pipes allocate extra space for a number of pipes\n"
+ "\t-r do not touch the buffers (send rubbish)\n"
"\t-P file load packet from pcap file\n"
"\t-z use random IPv4 src address/port\n"
"\t-Z use random IPv4 dst address/port\n"
+ "\t-F num_frags send multi-slot packets\n"
+ "\t-A activate pps stats on receiver\n"
"",
cmd);
exit(0);
}
+enum {
+ TD_TYPE_SENDER = 1,
+ TD_TYPE_RECEIVER,
+ TD_TYPE_OTHER,
+};
+
static void
start_threads(struct glob_arg *g)
{
@@ -1439,33 +1929,32 @@ start_threads(struct glob_arg *g)
uint64_t nmd_flags = 0;
nmd.self = &nmd;
- if (g->nthreads > 1) {
- if (nmd.req.nr_flags != NR_REG_ALL_NIC) {
- D("invalid nthreads mode %d", nmd.req.nr_flags);
+ if (i > 0) {
+ /* the first thread uses the fd opened by the main
+ * thread, the other threads re-open /dev/netmap
+ */
+ if (g->nthreads > 1) {
+ nmd.req.nr_flags =
+ g->nmd->req.nr_flags & ~NR_REG_MASK;
+ nmd.req.nr_flags |= NR_REG_ONE_NIC;
+ nmd.req.nr_ringid = i;
+ }
+ /* Only touch one of the rings (rx is already ok) */
+ if (g->td_type == TD_TYPE_RECEIVER)
+ nmd_flags |= NETMAP_NO_TX_POLL;
+
+ /* register interface. Override ifname and ringid etc. */
+ t->nmd = nm_open(t->g->ifname, NULL, nmd_flags |
+ NM_OPEN_IFNAME | NM_OPEN_NO_MMAP, &nmd);
+ if (t->nmd == NULL) {
+ D("Unable to open %s: %s",
+ t->g->ifname, strerror(errno));
continue;
}
- nmd.req.nr_flags = NR_REG_ONE_NIC;
- nmd.req.nr_ringid = i;
- }
- /* Only touch one of the rings (rx is already ok) */
- if (g->td_body == receiver_body)
- nmd_flags |= NETMAP_NO_TX_POLL;
-
- /* register interface. Override ifname and ringid etc. */
- if (g->options & OPT_MONITOR_TX)
- nmd.req.nr_flags |= NR_MONITOR_TX;
- if (g->options & OPT_MONITOR_RX)
- nmd.req.nr_flags |= NR_MONITOR_RX;
-
- t->nmd = nm_open(t->g->ifname, NULL, nmd_flags |
- NM_OPEN_IFNAME | NM_OPEN_NO_MMAP, &nmd);
- if (t->nmd == NULL) {
- D("Unable to open %s: %s",
- t->g->ifname, strerror(errno));
- continue;
+ } else {
+ t->nmd = g->nmd;
}
t->fd = t->nmd->fd;
- set_vnet_hdr_len(t);
} else {
targs[i].fd = g->main_fd;
@@ -1473,10 +1962,7 @@ start_threads(struct glob_arg *g)
t->used = 1;
t->me = i;
if (g->affinity >= 0) {
- if (g->affinity < g->cpus)
- t->affinity = g->affinity;
- else
- t->affinity = i % g->cpus;
+ t->affinity = (g->affinity + i) % g->system_cpus;
} else {
t->affinity = -1;
}
@@ -1495,45 +1981,89 @@ main_thread(struct glob_arg *g)
{
int i;
- uint64_t prev = 0;
- uint64_t count = 0;
+ struct my_ctrs prev, cur;
double delta_t;
struct timeval tic, toc;
- gettimeofday(&toc, NULL);
+ prev.pkts = prev.bytes = prev.events = 0;
+ gettimeofday(&prev.t, NULL);
for (;;) {
- struct timeval now, delta;
- uint64_t pps, usec, my_count, npkts;
+ char b1[40], b2[40], b3[40], b4[70];
+ uint64_t pps, usec;
+ struct my_ctrs x;
+ double abs;
int done = 0;
- delta.tv_sec = g->report_interval/1000;
- delta.tv_usec = (g->report_interval%1000)*1000;
- select(0, NULL, NULL, NULL, &delta);
- gettimeofday(&now, NULL);
- timersub(&now, &toc, &toc);
- my_count = 0;
+ usec = wait_for_next_report(&prev.t, &cur.t,
+ g->report_interval);
+
+ cur.pkts = cur.bytes = cur.events = 0;
+ cur.min_space = 0;
+ if (usec < 10000) /* too short to be meaningful */
+ continue;
+ /* accumulate counts for all threads */
for (i = 0; i < g->nthreads; i++) {
- my_count += targs[i].count;
+ cur.pkts += targs[i].ctr.pkts;
+ cur.bytes += targs[i].ctr.bytes;
+ cur.events += targs[i].ctr.events;
+ cur.min_space += targs[i].ctr.min_space;
+ targs[i].ctr.min_space = 99999;
if (targs[i].used == 0)
done++;
}
- usec = toc.tv_sec* 1000000 + toc.tv_usec;
- if (usec < 10000)
- continue;
- npkts = my_count - prev;
- pps = (npkts*1000000 + usec/2) / usec;
- D("%llu pps (%llu pkts in %llu usec)",
- (unsigned long long)pps,
- (unsigned long long)npkts,
- (unsigned long long)usec);
- prev = my_count;
- toc = now;
+ x.pkts = cur.pkts - prev.pkts;
+ x.bytes = cur.bytes - prev.bytes;
+ x.events = cur.events - prev.events;
+ pps = (x.pkts*1000000 + usec/2) / usec;
+ abs = (x.events > 0) ? (x.pkts / (double) x.events) : 0;
+
+ if (!(g->options & OPT_PPS_STATS)) {
+ strcpy(b4, "");
+ } else {
+ /* Compute some pps stats using a sliding window. */
+ double ppsavg = 0.0, ppsdev = 0.0;
+ int nsamples = 0;
+
+ g->win[g->win_idx] = pps;
+ g->win_idx = (g->win_idx + 1) % STATS_WIN;
+
+ for (i = 0; i < STATS_WIN; i++) {
+ ppsavg += g->win[i];
+ if (g->win[i]) {
+ nsamples ++;
+ }
+ }
+ ppsavg /= nsamples;
+
+ for (i = 0; i < STATS_WIN; i++) {
+ if (g->win[i] == 0) {
+ continue;
+ }
+ ppsdev += (g->win[i] - ppsavg) * (g->win[i] - ppsavg);
+ }
+ ppsdev /= nsamples;
+ ppsdev = sqrt(ppsdev);
+
+ snprintf(b4, sizeof(b4), "[avg/std %s/%s pps]",
+ norm(b1, ppsavg), norm(b2, ppsdev));
+ }
+
+ D("%spps %s(%spkts %sbps in %llu usec) %.2f avg_batch %d min_space",
+ norm(b1, pps), b4,
+ norm(b2, (double)x.pkts),
+ norm(b3, (double)x.bytes*8),
+ (unsigned long long)usec,
+ abs, (int)cur.min_space);
+ prev = cur;
+
if (done == g->nthreads)
break;
}
timerclear(&tic);
timerclear(&toc);
+ cur.pkts = cur.bytes = cur.events = 0;
+ /* final round */
for (i = 0; i < g->nthreads; i++) {
struct timespec t_tic, t_toc;
/*
@@ -1541,8 +2071,13 @@ main_thread(struct glob_arg *g)
* file descriptors.
*/
if (targs[i].used)
- pthread_join(targs[i].thread, NULL);
- close(targs[i].fd);
+ pthread_join(targs[i].thread, NULL); /* blocking */
+ if (g->dev_type == DEV_NETMAP) {
+ nm_close(targs[i].nmd);
+ targs[i].nmd = NULL;
+ } else {
+ close(targs[i].fd);
+ }
if (targs[i].completed == 0)
D("ouch, thread %d exited with error", i);
@@ -1551,7 +2086,13 @@ main_thread(struct glob_arg *g)
* Collect threads output and extract information about
* how long it took to send all the packets.
*/
- count += targs[i].count;
+ cur.pkts += targs[i].ctr.pkts;
+ cur.bytes += targs[i].ctr.bytes;
+ cur.events += targs[i].ctr.events;
+ /* collect the largest start (tic) and end (toc) times,
+ * XXX maybe we should do the earliest tic, or do a weighted
+ * average ?
+ */
t_tic = timeval2spec(&tic);
t_toc = timeval2spec(&toc);
if (!timerisset(&tic) || timespec_ge(&targs[i].tic, &t_tic))
@@ -1563,29 +2104,26 @@ main_thread(struct glob_arg *g)
/* print output. */
timersub(&toc, &tic, &toc);
delta_t = toc.tv_sec + 1e-6* toc.tv_usec;
- if (g->td_body == sender_body)
- tx_output(count, g->pkt_size, delta_t);
+ if (g->td_type == TD_TYPE_SENDER)
+ tx_output(&cur, delta_t, "Sent");
else
- rx_output(count, delta_t);
-
- if (g->dev_type == DEV_NETMAP) {
- munmap(g->nmd->mem, g->nmd->req.nr_memsize);
- close(g->main_fd);
- }
+ tx_output(&cur, delta_t, "Received");
}
-
-struct sf {
+struct td_desc {
+ int ty;
char *key;
void *f;
};
-static struct sf func[] = {
- { "tx", sender_body },
- { "rx", receiver_body },
- { "ping", pinger_body },
- { "pong", ponger_body },
- { NULL, NULL }
+static struct td_desc func[] = {
+ { TD_TYPE_SENDER, "tx", sender_body },
+ { TD_TYPE_RECEIVER, "rx", receiver_body },
+ { TD_TYPE_OTHER, "ping", pinger_body },
+ { TD_TYPE_OTHER, "pong", ponger_body },
+ { TD_TYPE_SENDER, "txseq", txseq_body },
+ { TD_TYPE_RECEIVER, "rxseq", rxseq_body },
+ { 0, NULL, NULL }
};
static int
@@ -1654,6 +2192,8 @@ int
main(int arc, char **argv)
{
int i;
+ struct sigaction sa;
+ sigset_t ss;
struct glob_arg g;
@@ -1665,6 +2205,7 @@ main(int arc, char **argv)
g.main_fd = -1;
g.td_body = receiver_body;
+ g.td_type = TD_TYPE_RECEIVER;
g.report_interval = 1000; /* report interval */
g.affinity = -1;
/* ip addresses can also be a range x.x.x.x-x.x.x.y */
@@ -1675,7 +2216,7 @@ main(int arc, char **argv)
g.pkt_size = 60;
g.burst = 512; // default
g.nthreads = 1;
- g.cpus = 1;
+ g.cpus = 1; // default
g.forever = 1;
g.tx_rate = 0;
g.frags = 1;
@@ -1683,8 +2224,8 @@ main(int arc, char **argv)
g.virt_header = 0;
while ( (ch = getopt(arc, argv,
- "a:f:F:n:i:Il:d:s:D:S:b:c:o:p:T:w:WvR:XC:H:e:m:P:zZ")) != -1) {
- struct sf *fn;
+ "a:f:F:n:i:Il:d:s:D:S:b:c:o:p:T:w:WvR:XC:H:e:E:m:rP:zZA")) != -1) {
+ struct td_desc *fn;
switch(ch) {
default:
@@ -1693,7 +2234,7 @@ main(int arc, char **argv)
break;
case 'n':
- g.npackets = atoi(optarg);
+ g.npackets = strtoull(optarg, NULL, 10);
break;
case 'F':
@@ -1710,10 +2251,12 @@ main(int arc, char **argv)
if (!strcmp(fn->key, optarg))
break;
}
- if (fn->key)
+ if (fn->key) {
g.td_body = fn->f;
- else
+ g.td_type = fn->ty;
+ } else {
D("unrecognised function %s", optarg);
+ }
break;
case 'o': /* data generation options */
@@ -1817,24 +2360,27 @@ main(int arc, char **argv)
case 'e': /* extra bufs */
g.extra_bufs = atoi(optarg);
break;
- case 'm':
- if (strcmp(optarg, "tx") == 0) {
- g.options |= OPT_MONITOR_TX;
- } else if (strcmp(optarg, "rx") == 0) {
- g.options |= OPT_MONITOR_RX;
- } else {
- D("unrecognized monitor mode %s", optarg);
- }
+ case 'E':
+ g.extra_pipes = atoi(optarg);
break;
case 'P':
g.packet_file = strdup(optarg);
break;
+ case 'm':
+ /* ignored */
+ break;
+ case 'r':
+ g.options |= OPT_RUBBISH;
+ break;
case 'z':
g.options |= OPT_RANDOM_SRC;
break;
case 'Z':
g.options |= OPT_RANDOM_DST;
break;
+ case 'A':
+ g.options |= OPT_PPS_STATS;
+ break;
}
}
@@ -1843,11 +2389,12 @@ main(int arc, char **argv)
usage();
}
- i = system_ncpus();
+ g.system_cpus = i = system_ncpus();
if (g.cpus < 0 || g.cpus > i) {
D("%d cpus is too high, have only %d cpus", g.cpus, i);
usage();
}
+D("running on %d cpus (have %d)", g.cpus, i);
if (g.cpus == 0)
g.cpus = i;
@@ -1914,6 +2461,11 @@ main(int arc, char **argv)
if (g.extra_bufs) {
base_nmd.nr_arg3 = g.extra_bufs;
}
+ if (g.extra_pipes) {
+ base_nmd.nr_arg1 = g.extra_pipes;
+ }
+
+ base_nmd.nr_flags |= NR_ACCEPT_VNET_HDR;
/*
* Open the netmap device using nm_open().
@@ -1927,13 +2479,38 @@ main(int arc, char **argv)
D("Unable to open %s: %s", g.ifname, strerror(errno));
goto out;
}
+
+ if (g.nthreads > 1) {
+ struct nm_desc saved_desc = *g.nmd;
+ saved_desc.self = &saved_desc;
+ saved_desc.mem = NULL;
+ nm_close(g.nmd);
+ saved_desc.req.nr_flags &= ~NR_REG_MASK;
+ saved_desc.req.nr_flags |= NR_REG_ONE_NIC;
+ saved_desc.req.nr_ringid = 0;
+ g.nmd = nm_open(g.ifname, &base_nmd, NM_OPEN_IFNAME, &saved_desc);
+ if (g.nmd == NULL) {
+ D("Unable to open %s: %s", g.ifname, strerror(errno));
+ goto out;
+ }
+ }
g.main_fd = g.nmd->fd;
D("mapped %dKB at %p", g.nmd->req.nr_memsize>>10, g.nmd->mem);
- /* get num of queues in tx or rx */
- if (g.td_body == sender_body)
+ if (g.virt_header) {
+ /* Set the virtio-net header length, since the user asked
+ * for it explicitely. */
+ set_vnet_hdr_len(&g);
+ } else {
+ /* Check whether the netmap port we opened requires us to send
+ * and receive frames with virtio-net header. */
+ get_vnet_hdr_len(&g);
+ }
+
+ /* get num of queues in tx or rx */
+ if (g.td_type == TD_TYPE_SENDER)
devqueues = g.nmd->req.nr_tx_rings;
- else
+ else
devqueues = g.nmd->req.nr_rx_rings;
/* validate provided nthreads. */
@@ -1951,25 +2528,27 @@ main(int arc, char **argv)
req->nr_arg2);
for (i = 0; i <= req->nr_tx_rings; i++) {
struct netmap_ring *ring = NETMAP_TXRING(nifp, i);
- D(" TX%d at 0x%lx slots %d", i,
- (char *)ring - (char *)nifp, ring->num_slots);
+ D(" TX%d at 0x%p slots %d", i,
+ (void *)((char *)ring - (char *)nifp), ring->num_slots);
}
for (i = 0; i <= req->nr_rx_rings; i++) {
struct netmap_ring *ring = NETMAP_RXRING(nifp, i);
- D(" RX%d at 0x%lx slots %d", i,
- (char *)ring - (char *)nifp, ring->num_slots);
+ D(" RX%d at 0x%p slots %d", i,
+ (void *)((char *)ring - (char *)nifp), ring->num_slots);
}
}
/* Print some debug information. */
fprintf(stdout,
"%s %s: %d queues, %d threads and %d cpus.\n",
- (g.td_body == sender_body) ? "Sending on" : "Receiving from",
+ (g.td_type == TD_TYPE_SENDER) ? "Sending on" :
+ ((g.td_type == TD_TYPE_RECEIVER) ? "Receiving from" :
+ "Working on"),
g.ifname,
devqueues,
g.nthreads,
g.cpus);
- if (g.td_body == sender_body) {
+ if (g.td_type == TD_TYPE_SENDER) {
fprintf(stdout, "%s -> %s (%s -> %s)\n",
g.src_ip.name, g.dst_ip.name,
g.src_mac.name, g.dst_mac.name);
@@ -1985,12 +2564,13 @@ out:
if (g.options) {
- D("--- SPECIAL OPTIONS:%s%s%s%s%s\n",
+ D("--- SPECIAL OPTIONS:%s%s%s%s%s%s\n",
g.options & OPT_PREFETCH ? " prefetch" : "",
g.options & OPT_ACCESS ? " access" : "",
g.options & OPT_MEMCPY ? " memcpy" : "",
g.options & OPT_INDIRECT ? " indirect" : "",
- g.options & OPT_COPY ? " copy" : "");
+ g.options & OPT_COPY ? " copy" : "",
+ g.options & OPT_RUBBISH ? " rubbish " : "");
}
g.tx_period.tv_sec = g.tx_period.tv_nsec = 0;
@@ -2010,7 +2590,7 @@ out:
g.tx_period.tv_sec = g.tx_period.tv_nsec / 1000000000;
g.tx_period.tv_nsec = g.tx_period.tv_nsec % 1000000000;
}
- if (g.td_body == sender_body)
+ if (g.td_type == TD_TYPE_SENDER)
D("Sending %d packets every %ld.%09ld s",
g.burst, g.tx_period.tv_sec, g.tx_period.tv_nsec);
/* Wait for PHY reset. */
@@ -2020,10 +2600,24 @@ out:
/* Install ^C handler. */
global_nthreads = g.nthreads;
- signal(SIGINT, sigint_h);
-
+ sigemptyset(&ss);
+ sigaddset(&ss, SIGINT);
+ /* block SIGINT now, so that all created threads will inherit the mask */
+ if (pthread_sigmask(SIG_BLOCK, &ss, NULL) < 0) {
+ D("failed to block SIGINT: %s", strerror(errno));
+ }
start_threads(&g);
+ /* Install the handler and re-enable SIGINT for the main thread */
+ sa.sa_handler = sigint_h;
+ if (sigaction(SIGINT, &sa, NULL) < 0) {
+ D("failed to install ^C handler: %s", strerror(errno));
+ }
+
+ if (pthread_sigmask(SIG_UNBLOCK, &ss, NULL) < 0) {
+ D("failed to re-enable SIGINT: %s", strerror(errno));
+ }
main_thread(&g);
+ free(targs);
return 0;
}