diff options
author | Luigi Rizzo <luigi@FreeBSD.org> | 2016-10-16 14:13:32 +0000 |
---|---|---|
committer | Luigi Rizzo <luigi@FreeBSD.org> | 2016-10-16 14:13:32 +0000 |
commit | 37e3a6d349581b4dd0aebf24be7b1b159a698dcf (patch) | |
tree | 0e61deea141c9733af511b0485cf1fd0f2dd17ed /tools/tools/netmap/pkt-gen.c | |
parent | 63f6b1a75a8e6e33e4f9d65571c6a221444d3b05 (diff) | |
download | src-37e3a6d349581b4dd0aebf24be7b1b159a698dcf.tar.gz src-37e3a6d349581b4dd0aebf24be7b1b159a698dcf.zip |
Notes
Diffstat (limited to 'tools/tools/netmap/pkt-gen.c')
-rw-r--r-- | tools/tools/netmap/pkt-gen.c | 1114 |
1 files changed, 854 insertions, 260 deletions
diff --git a/tools/tools/netmap/pkt-gen.c b/tools/tools/netmap/pkt-gen.c index 6d9bee6de634..168e022cfba9 100644 --- a/tools/tools/netmap/pkt-gen.c +++ b/tools/tools/netmap/pkt-gen.c @@ -1,6 +1,6 @@ /* * Copyright (C) 2011-2014 Matteo Landi, Luigi Rizzo. All rights reserved. - * Copyright (C) 2013-2014 Universita` di Pisa. All rights reserved. + * Copyright (C) 2013-2015 Universita` di Pisa. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -37,8 +37,6 @@ * */ -// #define TRASH_VHOST_HDR - #define _GNU_SOURCE /* for CPU_SET() */ #include <stdio.h> #define NETMAP_WITH_LIBS @@ -49,12 +47,16 @@ #include <unistd.h> // sysconf() #include <sys/poll.h> #include <arpa/inet.h> /* ntohs */ +#ifndef _WIN32 #include <sys/sysctl.h> /* sysctl */ +#endif #include <ifaddrs.h> /* getifaddrs */ #include <net/ethernet.h> #include <netinet/in.h> #include <netinet/ip.h> #include <netinet/udp.h> +#include <assert.h> +#include <math.h> #include <pthread.h> @@ -62,6 +64,69 @@ #include <pcap/pcap.h> #endif +#include "ctrs.h" + +#ifdef _WIN32 +#define cpuset_t DWORD_PTR //uint64_t +static inline void CPU_ZERO(cpuset_t *p) +{ + *p = 0; +} + +static inline void CPU_SET(uint32_t i, cpuset_t *p) +{ + *p |= 1<< (i & 0x3f); +} + +#define pthread_setaffinity_np(a, b, c) !SetThreadAffinityMask(a, *c) //((void)a, 0) +#define TAP_CLONEDEV "/dev/tap" +#define AF_LINK 18 //defined in winsocks.h +#define CLOCK_REALTIME_PRECISE CLOCK_REALTIME +#include <net/if_dl.h> + +/* + * Convert an ASCII representation of an ethernet address to + * binary form. + */ +struct ether_addr * +ether_aton(const char *a) +{ + int i; + static struct ether_addr o; + unsigned int o0, o1, o2, o3, o4, o5; + + i = sscanf(a, "%x:%x:%x:%x:%x:%x", &o0, &o1, &o2, &o3, &o4, &o5); + + if (i != 6) + return (NULL); + + o.octet[0]=o0; + o.octet[1]=o1; + o.octet[2]=o2; + o.octet[3]=o3; + o.octet[4]=o4; + o.octet[5]=o5; + + return ((struct ether_addr *)&o); +} + +/* + * Convert a binary representation of an ethernet address to + * an ASCII string. + */ +char * +ether_ntoa(const struct ether_addr *n) +{ + int i; + static char a[18]; + + i = sprintf(a, "%02x:%02x:%02x:%02x:%02x:%02x", + n->octet[0], n->octet[1], n->octet[2], + n->octet[3], n->octet[4], n->octet[5]); + return (i < 17 ? NULL : (char *)&a); +} +#endif /* _WIN32 */ + #ifdef linux #define cpuset_t cpu_set_t @@ -169,10 +234,12 @@ struct glob_arg { int pkt_size; int burst; int forever; - int npackets; /* total packets to send */ + uint64_t npackets; /* total packets to send */ int frags; /* fragments per packet */ int nthreads; - int cpus; + int cpus; /* cpus used for running */ + int system_cpus; /* cpus on the system */ + int options; /* testing */ #define OPT_PREFETCH 1 #define OPT_ACCESS 2 @@ -181,10 +248,10 @@ struct glob_arg { #define OPT_TS 16 /* add a timestamp */ #define OPT_INDIRECT 32 /* use indirect buffers, tx only */ #define OPT_DUMP 64 /* dump rx/tx traffic */ -#define OPT_MONITOR_TX 128 -#define OPT_MONITOR_RX 256 +#define OPT_RUBBISH 256 /* send wathever the buffers contain */ #define OPT_RANDOM_SRC 512 #define OPT_RANDOM_DST 1024 +#define OPT_PPS_STATS 2048 int dev_type; #ifndef NO_PCAP pcap_t *p; @@ -198,13 +265,18 @@ struct glob_arg { struct nm_desc *nmd; int report_interval; /* milliseconds between prints */ void *(*td_body)(void *); + int td_type; void *mmap_addr; char ifname[MAX_IFNAMELEN]; char *nmr_config; int dummy_send; int virt_header; /* send also the virt_header */ int extra_bufs; /* goes in nr_arg3 */ + int extra_pipes; /* goes in nr_arg1 */ char *packet_file; /* -P option */ +#define STATS_WIN 15 + int win_idx; + int64_t win[STATS_WIN]; }; enum dev_type { DEV_NONE, DEV_NETMAP, DEV_PCAP, DEV_TAP }; @@ -220,7 +292,11 @@ struct targ { int cancel; int fd; struct nm_desc *nmd; - volatile uint64_t count; + /* these ought to be volatile, but they are + * only sampled and errors should not accumulate + */ + struct my_ctrs ctr; + struct timespec tic, toc; int me; pthread_t thread; @@ -327,11 +403,10 @@ sigint_h(int sig) int i; (void)sig; /* UNUSED */ - D("received control-C on thread %p", pthread_self()); + D("received control-C on thread %p", (void *)pthread_self()); for (i = 0; i < global_nthreads; i++) { targs[i].cancel = 1; } - signal(SIGINT, SIG_DFL); } /* sysctl wrapper to return the number of active CPUs */ @@ -345,6 +420,12 @@ system_ncpus(void) sysctl(mib, 2, &ncpus, &len, NULL, 0); #elif defined(linux) ncpus = sysconf(_SC_NPROCESSORS_ONLN); +#elif defined(_WIN32) + { + SYSTEM_INFO sysinfo; + GetSystemInfo(&sysinfo); + ncpus = sysinfo.dwNumberOfProcessors; + } #else /* others */ ncpus = 1; #endif /* others */ @@ -518,10 +599,11 @@ wrapsum(u_int32_t sum) * Look for consecutive ascii representations of the size of the packet. */ static void -dump_payload(char *p, int len, struct netmap_ring *ring, int cur) +dump_payload(const char *_p, int len, struct netmap_ring *ring, int cur) { char buf[128]; int i, j, i0; + const unsigned char *p = (const unsigned char *)_p; /* get the length in ASCII of the length of the packet. */ @@ -629,6 +711,7 @@ initialize_packet(struct targ *targ) indirect_payload : default_payload; int i, l0 = strlen(payload); +#ifndef NO_PCAP char errbuf[PCAP_ERRBUF_SIZE]; pcap_t *file; struct pcap_pkthdr *header; @@ -650,6 +733,7 @@ initialize_packet(struct targ *targ) pcap_close(file); return; } +#endif /* create a nice NUL-terminated string */ for (i = 0; i < paylen; i += l0) { @@ -695,35 +779,49 @@ initialize_packet(struct targ *targ) eh->ether_type = htons(ETHERTYPE_IP); bzero(&pkt->vh, sizeof(pkt->vh)); -#ifdef TRASH_VHOST_HDR - /* set bogus content */ - pkt->vh.fields[0] = 0xff; - pkt->vh.fields[1] = 0xff; - pkt->vh.fields[2] = 0xff; - pkt->vh.fields[3] = 0xff; - pkt->vh.fields[4] = 0xff; - pkt->vh.fields[5] = 0xff; -#endif /* TRASH_VHOST_HDR */ // dump_payload((void *)pkt, targ->g->pkt_size, NULL, 0); } static void -set_vnet_hdr_len(struct targ *t) +get_vnet_hdr_len(struct glob_arg *g) { - int err, l = t->g->virt_header; + struct nmreq req; + int err; + + memset(&req, 0, sizeof(req)); + bcopy(g->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name)); + req.nr_version = NETMAP_API; + req.nr_cmd = NETMAP_VNET_HDR_GET; + err = ioctl(g->main_fd, NIOCREGIF, &req); + if (err) { + D("Unable to get virtio-net header length"); + return; + } + + g->virt_header = req.nr_arg1; + if (g->virt_header) { + D("Port requires virtio-net header, length = %d", + g->virt_header); + } +} + +static void +set_vnet_hdr_len(struct glob_arg *g) +{ + int err, l = g->virt_header; struct nmreq req; if (l == 0) return; memset(&req, 0, sizeof(req)); - bcopy(t->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name)); + bcopy(g->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name)); req.nr_version = NETMAP_API; req.nr_cmd = NETMAP_BDG_VNET_HDR; req.nr_arg1 = l; - err = ioctl(t->fd, NIOCREGIF, &req); + err = ioctl(g->main_fd, NIOCREGIF, &req); if (err) { - D("Unable to set vnet header length %d", l); + D("Unable to set virtio-net header length %d", l); } } @@ -763,12 +861,15 @@ send_packets(struct netmap_ring *ring, struct pkt *pkt, void *frame, for (fcnt = nfrags, sent = 0; sent < count; sent++) { struct netmap_slot *slot = &ring->slot[cur]; char *p = NETMAP_BUF(ring, slot->buf_idx); + int buf_changed = slot->flags & NS_BUF_CHANGED; slot->flags = 0; - if (options & OPT_INDIRECT) { + if (options & OPT_RUBBISH) { + /* do nothing */ + } else if (options & OPT_INDIRECT) { slot->flags |= NS_INDIRECT; - slot->ptr = (uint64_t)frame; - } else if (options & OPT_COPY) { + slot->ptr = (uint64_t)((uintptr_t)frame); + } else if ((options & OPT_COPY) || buf_changed) { nm_pkt_copy(frame, p, size); if (fcnt == nfrags) update_addresses(pkt, g); @@ -798,6 +899,21 @@ send_packets(struct netmap_ring *ring, struct pkt *pkt, void *frame, } /* + * Index of the highest bit set + */ +uint32_t +msb64(uint64_t x) +{ + uint64_t m = 1ULL << 63; + int i; + + for (i = 63; i >= 0; i--, m >>=1) + if (m & x) + return i; + return 0; +} + +/* * Send a packet, and wait for a response. * The payload (after UDP header, ofs 42) has a 4-byte sequence * followed by a struct timeval (or bintime?) @@ -810,25 +926,28 @@ pinger_body(void *data) struct targ *targ = (struct targ *) data; struct pollfd pfd = { .fd = targ->fd, .events = POLLIN }; struct netmap_if *nifp = targ->nmd->nifp; - int i, rx = 0, n = targ->g->npackets; + int i, rx = 0; void *frame; int size; - uint32_t sent = 0; struct timespec ts, now, last_print; - uint32_t count = 0, min = 1000000000, av = 0; + uint64_t sent = 0, n = targ->g->npackets; + uint64_t count = 0, t_cur, t_min = ~0, av = 0; + uint64_t buckets[64]; /* bins for delays, ns */ frame = &targ->pkt; frame += sizeof(targ->pkt.vh) - targ->g->virt_header; size = targ->g->pkt_size + targ->g->virt_header; + if (targ->g->nthreads > 1) { D("can only ping with 1 thread"); return NULL; } + bzero(&buckets, sizeof(buckets)); clock_gettime(CLOCK_REALTIME_PRECISE, &last_print); now = last_print; - while (n == 0 || (int)sent < n) { + while (!targ->cancel && (n == 0 || sent < n)) { struct netmap_ring *ring = NETMAP_TXRING(nifp, 0); struct netmap_slot *slot; char *p; @@ -864,6 +983,8 @@ pinger_body(void *data) while (!nm_ring_empty(ring)) { uint32_t seq; struct tstamp *tp; + int pos; + slot = &ring->slot[ring->cur]; p = NETMAP_BUF(ring, slot->buf_idx); @@ -878,12 +999,16 @@ pinger_body(void *data) ts.tv_nsec += 1000000000; ts.tv_sec--; } - if (1) D("seq %d/%d delta %d.%09d", seq, sent, + if (0) D("seq %d/%lu delta %d.%09d", seq, sent, (int)ts.tv_sec, (int)ts.tv_nsec); - if (ts.tv_nsec < (int)min) - min = ts.tv_nsec; + t_cur = ts.tv_sec * 1000000000UL + ts.tv_nsec; + if (t_cur < t_min) + t_min = t_cur; count ++; - av += ts.tv_nsec; + av += t_cur; + pos = msb64(t_cur); + buckets[pos]++; + /* now store it in a bucket */ ring->head = ring->cur = nm_ring_next(ring, ring->cur); rx++; } @@ -897,14 +1022,32 @@ pinger_body(void *data) ts.tv_sec--; } if (ts.tv_sec >= 1) { - D("count %d min %d av %d", - count, min, av/count); + D("count %d RTT: min %d av %d ns", + (int)count, (int)t_min, (int)(av/count)); + int k, j, kmin; + char buf[512]; + + for (kmin = 0; kmin < 64; kmin ++) + if (buckets[kmin]) + break; + for (k = 63; k >= kmin; k--) + if (buckets[k]) + break; + buf[0] = '\0'; + for (j = kmin; j <= k; j++) + sprintf(buf, "%s %5d", buf, (int)buckets[j]); + D("k: %d .. %d\n\t%s", 1<<kmin, 1<<k, buf); + bzero(&buckets, sizeof(buckets)); count = 0; av = 0; - min = 100000000; + t_min = ~0; last_print = now; } } + + /* reset the ``used`` flag. */ + targ->used = 0; + return NULL; } @@ -919,14 +1062,15 @@ ponger_body(void *data) struct pollfd pfd = { .fd = targ->fd, .events = POLLIN }; struct netmap_if *nifp = targ->nmd->nifp; struct netmap_ring *txring, *rxring; - int i, rx = 0, sent = 0, n = targ->g->npackets; + int i, rx = 0; + uint64_t sent = 0, n = targ->g->npackets; if (targ->g->nthreads > 1) { D("can only reply ping with 1 thread"); return NULL; } - D("understood ponger %d but don't know how to do it", n); - while (n == 0 || sent < n) { + D("understood ponger %lu but don't know how to do it", n); + while (!targ->cancel && (n == 0 || sent < n)) { uint32_t txcur, txavail; //#define BUSYWAIT #ifdef BUSYWAIT @@ -975,69 +1119,17 @@ ponger_body(void *data) } } txring->head = txring->cur = txcur; - targ->count = sent; + targ->ctr.pkts = sent; #ifdef BUSYWAIT ioctl(pfd.fd, NIOCTXSYNC, NULL); #endif //D("tx %d rx %d", sent, rx); } - return NULL; -} - -static __inline int -timespec_ge(const struct timespec *a, const struct timespec *b) -{ - - if (a->tv_sec > b->tv_sec) - return (1); - if (a->tv_sec < b->tv_sec) - return (0); - if (a->tv_nsec >= b->tv_nsec) - return (1); - return (0); -} -static __inline struct timespec -timeval2spec(const struct timeval *a) -{ - struct timespec ts = { - .tv_sec = a->tv_sec, - .tv_nsec = a->tv_usec * 1000 - }; - return ts; -} - -static __inline struct timeval -timespec2val(const struct timespec *a) -{ - struct timeval tv = { - .tv_sec = a->tv_sec, - .tv_usec = a->tv_nsec / 1000 - }; - return tv; -} - - -static __inline struct timespec -timespec_add(struct timespec a, struct timespec b) -{ - struct timespec ret = { a.tv_sec + b.tv_sec, a.tv_nsec + b.tv_nsec }; - if (ret.tv_nsec >= 1000000000) { - ret.tv_sec++; - ret.tv_nsec -= 1000000000; - } - return ret; -} + /* reset the ``used`` flag. */ + targ->used = 0; -static __inline struct timespec -timespec_sub(struct timespec a, struct timespec b) -{ - struct timespec ret = { a.tv_sec - b.tv_sec, a.tv_nsec - b.tv_nsec }; - if (ret.tv_nsec < 0) { - ret.tv_sec--; - ret.tv_nsec += 1000000000; - } - return ret; + return NULL; } @@ -1065,9 +1157,11 @@ sender_body(void *data) struct targ *targ = (struct targ *) data; struct pollfd pfd = { .fd = targ->fd, .events = POLLOUT }; struct netmap_if *nifp; - struct netmap_ring *txring; - int i, n = targ->g->npackets / targ->g->nthreads; - int64_t sent = 0; + struct netmap_ring *txring = NULL; + int i; + uint64_t n = targ->g->npackets / targ->g->nthreads; + uint64_t sent = 0; + uint64_t event = 0; int options = targ->g->options | OPT_COPY; struct timespec nexttime = { 0, 0}; // XXX silence compiler int rate_limit = targ->g->tx_rate; @@ -1104,7 +1198,9 @@ sender_body(void *data) sent++; update_addresses(pkt, targ->g); if (i > 10000) { - targ->count = sent; + targ->ctr.pkts = sent; + targ->ctr.bytes = sent*size; + targ->ctr.events = sent; i = 0; } } @@ -1117,7 +1213,9 @@ sender_body(void *data) sent++; update_addresses(pkt, targ->g); if (i > 10000) { - targ->count = sent; + targ->ctr.pkts = sent; + targ->ctr.bytes = sent*size; + targ->ctr.events = sent; i = 0; } } @@ -1126,7 +1224,7 @@ sender_body(void *data) int tosend = 0; int frags = targ->g->frags; - nifp = targ->nmd->nifp; + nifp = targ->nmd->nifp; while (!targ->cancel && (n == 0 || sent < n)) { if (rate_limit && tosend <= 0) { @@ -1138,6 +1236,13 @@ sender_body(void *data) /* * wait for available room in the send queue(s) */ +#ifdef BUSYWAIT + if (ioctl(pfd.fd, NIOCTXSYNC, NULL) < 0) { + D("ioctl error on queue %d: %s", targ->me, + strerror(errno)); + goto quit; + } +#else /* !BUSYWAIT */ if (poll(&pfd, 1, 2000) <= 0) { if (targ->cancel) break; @@ -1146,9 +1251,11 @@ sender_body(void *data) // goto quit; } if (pfd.revents & POLLERR) { - D("poll error"); + D("poll error on %d ring %d-%d", pfd.fd, + targ->nmd->first_tx_ring, targ->nmd->last_tx_ring); goto quit; } +#endif /* !BUSYWAIT */ /* * scan our queues and send on those with room */ @@ -1157,7 +1264,8 @@ sender_body(void *data) options &= ~OPT_COPY; } for (i = targ->nmd->first_tx_ring; i <= targ->nmd->last_tx_ring; i++) { - int m, limit = rate_limit ? tosend : targ->g->burst; + int m; + uint64_t limit = rate_limit ? tosend : targ->g->burst; if (n > 0 && n - sent < limit) limit = n - sent; txring = NETMAP_TXRING(nifp, i); @@ -1171,7 +1279,11 @@ sender_body(void *data) ND("limit %d tail %d frags %d m %d", limit, txring->tail, frags, m); sent += m; - targ->count = sent; + if (m > 0) //XXX-ste: can m be 0? + event++; + targ->ctr.pkts = sent; + targ->ctr.bytes = sent*size; + targ->ctr.events = event; if (rate_limit) { tosend -= m; if (tosend <= 0) @@ -1182,13 +1294,13 @@ sender_body(void *data) /* flush any remaining packets */ D("flush tail %d head %d on thread %p", txring->tail, txring->head, - pthread_self()); + (void *)pthread_self()); ioctl(pfd.fd, NIOCTXSYNC, NULL); /* final part: wait all the TX queues to be empty. */ for (i = targ->nmd->first_tx_ring; i <= targ->nmd->last_tx_ring; i++) { txring = NETMAP_TXRING(nifp, i); - while (nm_tx_pending(txring)) { + while (!targ->cancel && nm_tx_pending(txring)) { RD(5, "pending tx tail %d head %d on ring %d", txring->tail, txring->head, i); ioctl(pfd.fd, NIOCTXSYNC, NULL); @@ -1199,8 +1311,9 @@ sender_body(void *data) clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc); targ->completed = 1; - targ->count = sent; - + targ->ctr.pkts = sent; + targ->ctr.bytes = sent*size; + targ->ctr.events = event; quit: /* reset the ``used`` flag. */ targ->used = 0; @@ -1214,17 +1327,22 @@ static void receive_pcap(u_char *user, const struct pcap_pkthdr * h, const u_char * bytes) { - int *count = (int *)user; - (void)h; /* UNUSED */ + struct my_ctrs *ctr = (struct my_ctrs *)user; (void)bytes; /* UNUSED */ - (*count)++; + ctr->bytes += h->len; + ctr->pkts++; } #endif /* !NO_PCAP */ + static int -receive_packets(struct netmap_ring *ring, u_int limit, int dump) +receive_packets(struct netmap_ring *ring, u_int limit, int dump, uint64_t *bytes) { u_int cur, rx, n; + uint64_t b = 0; + + if (bytes == NULL) + bytes = &b; cur = ring->cur; n = nm_ring_space(ring); @@ -1234,6 +1352,7 @@ receive_packets(struct netmap_ring *ring, u_int limit, int dump) struct netmap_slot *slot = &ring->slot[cur]; char *p = NETMAP_BUF(ring, slot->buf_idx); + *bytes += slot->len; if (dump) dump_payload(p, slot->len, ring, cur); @@ -1252,7 +1371,10 @@ receiver_body(void *data) struct netmap_if *nifp; struct netmap_ring *rxring; int i; - uint64_t received = 0; + struct my_ctrs cur; + + cur.pkts = cur.bytes = cur.events = cur.min_space = 0; + cur.t.tv_usec = cur.t.tv_sec = 0; // unused, just silence the compiler if (setaffinity(targ->thread, targ->affinity)) goto quit; @@ -1273,24 +1395,36 @@ receiver_body(void *data) while (!targ->cancel) { char buf[MAX_BODYSIZE]; /* XXX should we poll ? */ - if (read(targ->g->main_fd, buf, sizeof(buf)) > 0) - targ->count++; + i = read(targ->g->main_fd, buf, sizeof(buf)); + if (i > 0) { + targ->ctr.pkts++; + targ->ctr.bytes += i; + targ->ctr.events++; + } } #ifndef NO_PCAP } else if (targ->g->dev_type == DEV_PCAP) { while (!targ->cancel) { /* XXX should we poll ? */ pcap_dispatch(targ->g->p, targ->g->burst, receive_pcap, - (u_char *)&targ->count); + (u_char *)&targ->ctr); + targ->ctr.events++; } #endif /* !NO_PCAP */ } else { int dump = targ->g->options & OPT_DUMP; - nifp = targ->nmd->nifp; + nifp = targ->nmd->nifp; while (!targ->cancel) { /* Once we started to receive packets, wait at most 1 seconds before quitting. */ +#ifdef BUSYWAIT + if (ioctl(pfd.fd, NIOCRXSYNC, NULL) < 0) { + D("ioctl error on queue %d: %s", targ->me, + strerror(errno)); + goto quit; + } +#else /* !BUSYWAIT */ if (poll(&pfd, 1, 1 * 1000) <= 0 && !targ->g->forever) { clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc); targ->toc.tv_sec -= 1; /* Subtract timeout time. */ @@ -1301,26 +1435,39 @@ receiver_body(void *data) D("poll err"); goto quit; } - +#endif /* !BUSYWAIT */ + uint64_t cur_space = 0; for (i = targ->nmd->first_rx_ring; i <= targ->nmd->last_rx_ring; i++) { int m; rxring = NETMAP_RXRING(nifp, i); + /* compute free space in the ring */ + m = rxring->head + rxring->num_slots - rxring->tail; + if (m >= (int) rxring->num_slots) + m -= rxring->num_slots; + cur_space += m; if (nm_ring_empty(rxring)) continue; - m = receive_packets(rxring, targ->g->burst, dump); - received += m; + m = receive_packets(rxring, targ->g->burst, dump, &cur.bytes); + cur.pkts += m; + if (m > 0) //XXX-ste: can m be 0? + cur.events++; } - targ->count = received; + cur.min_space = targ->ctr.min_space; + if (cur_space < cur.min_space) + cur.min_space = cur_space; + targ->ctr = cur; } } clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc); +#if !defined(BUSYWAIT) out: +#endif targ->completed = 1; - targ->count = received; + targ->ctr = cur; quit: /* reset the ``used`` flag. */ @@ -1329,56 +1476,390 @@ quit: return (NULL); } -/* very crude code to print a number in normalized form. - * Caller has to make sure that the buffer is large enough. - */ -static const char * -norm(char *buf, double val) +static void * +txseq_body(void *data) +{ + struct targ *targ = (struct targ *) data; + struct pollfd pfd = { .fd = targ->fd, .events = POLLOUT }; + struct netmap_ring *ring; + int64_t sent = 0; + uint64_t event = 0; + int options = targ->g->options | OPT_COPY; + struct timespec nexttime = {0, 0}; + int rate_limit = targ->g->tx_rate; + struct pkt *pkt = &targ->pkt; + int frags = targ->g->frags; + uint32_t sequence = 0; + int budget = 0; + void *frame; + int size; + + if (targ->g->nthreads > 1) { + D("can only txseq ping with 1 thread"); + return NULL; + } + + if (targ->g->npackets > 0) { + D("Ignoring -n argument"); + } + + frame = pkt; + frame += sizeof(pkt->vh) - targ->g->virt_header; + size = targ->g->pkt_size + targ->g->virt_header; + + D("start, fd %d main_fd %d", targ->fd, targ->g->main_fd); + if (setaffinity(targ->thread, targ->affinity)) + goto quit; + + clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic); + if (rate_limit) { + targ->tic = timespec_add(targ->tic, (struct timespec){2,0}); + targ->tic.tv_nsec = 0; + wait_time(targ->tic); + nexttime = targ->tic; + } + + /* Only use the first queue. */ + ring = NETMAP_TXRING(targ->nmd->nifp, targ->nmd->first_tx_ring); + + while (!targ->cancel) { + int64_t limit; + unsigned int space; + unsigned int head; + int fcnt; + + if (!rate_limit) { + budget = targ->g->burst; + + } else if (budget <= 0) { + budget = targ->g->burst; + nexttime = timespec_add(nexttime, targ->g->tx_period); + wait_time(nexttime); + } + + /* wait for available room in the send queue */ + if (poll(&pfd, 1, 2000) <= 0) { + if (targ->cancel) + break; + D("poll error/timeout on queue %d: %s", targ->me, + strerror(errno)); + } + if (pfd.revents & POLLERR) { + D("poll error on %d ring %d-%d", pfd.fd, + targ->nmd->first_tx_ring, targ->nmd->last_tx_ring); + goto quit; + } + + /* If no room poll() again. */ + space = nm_ring_space(ring); + if (!space) { + continue; + } + + limit = budget; + + if (space < limit) { + limit = space; + } + + /* Cut off ``limit`` to make sure is multiple of ``frags``. */ + if (frags > 1) { + limit = (limit / frags) * frags; + } + + limit = sent + limit; /* Convert to absolute. */ + + for (fcnt = frags, head = ring->head; + sent < limit; sent++, sequence++) { + struct netmap_slot *slot = &ring->slot[head]; + char *p = NETMAP_BUF(ring, slot->buf_idx); + + slot->flags = 0; + pkt->body[0] = sequence >> 24; + pkt->body[1] = (sequence >> 16) & 0xff; + pkt->body[2] = (sequence >> 8) & 0xff; + pkt->body[3] = sequence & 0xff; + nm_pkt_copy(frame, p, size); + if (fcnt == frags) { + update_addresses(pkt, targ->g); + } + + if (options & OPT_DUMP) { + dump_payload(p, size, ring, head); + } + + slot->len = size; + + if (--fcnt > 0) { + slot->flags |= NS_MOREFRAG; + } else { + fcnt = frags; + } + + if (sent == limit - 1) { + /* Make sure we don't push an incomplete + * packet. */ + assert(!(slot->flags & NS_MOREFRAG)); + slot->flags |= NS_REPORT; + } + + head = nm_ring_next(ring, head); + if (rate_limit) { + budget--; + } + } + + ring->cur = ring->head = head; + + event ++; + targ->ctr.pkts = sent; + targ->ctr.bytes = sent * size; + targ->ctr.events = event; + } + + /* flush any remaining packets */ + D("flush tail %d head %d on thread %p", + ring->tail, ring->head, + (void *)pthread_self()); + ioctl(pfd.fd, NIOCTXSYNC, NULL); + + /* final part: wait the TX queues to become empty. */ + while (!targ->cancel && nm_tx_pending(ring)) { + RD(5, "pending tx tail %d head %d on ring %d", + ring->tail, ring->head, targ->nmd->first_tx_ring); + ioctl(pfd.fd, NIOCTXSYNC, NULL); + usleep(1); /* wait 1 tick */ + } + + clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc); + targ->completed = 1; + targ->ctr.pkts = sent; + targ->ctr.bytes = sent * size; + targ->ctr.events = event; +quit: + /* reset the ``used`` flag. */ + targ->used = 0; + + return (NULL); +} + + +static char * +multi_slot_to_string(struct netmap_ring *ring, unsigned int head, + unsigned int nfrags, char *strbuf, size_t strbuflen) { - char *units[] = { "", "K", "M", "G", "T" }; - u_int i; + unsigned int f; + char *ret = strbuf; + + for (f = 0; f < nfrags; f++) { + struct netmap_slot *slot = &ring->slot[head]; + int m = snprintf(strbuf, strbuflen, "|%u,%x|", slot->len, + slot->flags); + if (m >= (int)strbuflen) { + break; + } + strbuf += m; + strbuflen -= m; - for (i = 0; val >=1000 && i < sizeof(units)/sizeof(char *) - 1; i++) - val /= 1000; - sprintf(buf, "%.2f %s", val, units[i]); - return buf; + head = nm_ring_next(ring, head); + } + + return ret; } -static void -tx_output(uint64_t sent, int size, double delta) +static void * +rxseq_body(void *data) { - double bw, raw_bw, pps; - char b1[40], b2[80], b3[80]; + struct targ *targ = (struct targ *) data; + struct pollfd pfd = { .fd = targ->fd, .events = POLLIN }; + int dump = targ->g->options & OPT_DUMP; + struct netmap_ring *ring; + unsigned int frags_exp = 1; + uint32_t seq_exp = 0; + struct my_ctrs cur; + unsigned int frags = 0; + int first_packet = 1; + int first_slot = 1; + int i; - printf("Sent %llu packets, %d bytes each, in %.2f seconds.\n", - (unsigned long long)sent, size, delta); - if (delta == 0) - delta = 1e-6; - if (size < 60) /* correct for min packet size */ - size = 60; - pps = sent / delta; - bw = (8.0 * size * sent) / delta; - /* raw packets have4 bytes crc + 20 bytes framing */ - raw_bw = (8.0 * (size + 24) * sent) / delta; + cur.pkts = cur.bytes = cur.events = cur.min_space = 0; + cur.t.tv_usec = cur.t.tv_sec = 0; // unused, just silence the compiler + + if (setaffinity(targ->thread, targ->affinity)) + goto quit; + + D("reading from %s fd %d main_fd %d", + targ->g->ifname, targ->fd, targ->g->main_fd); + /* unbounded wait for the first packet. */ + for (;!targ->cancel;) { + i = poll(&pfd, 1, 1000); + if (i > 0 && !(pfd.revents & POLLERR)) + break; + RD(1, "waiting for initial packets, poll returns %d %d", + i, pfd.revents); + } + + clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic); + + ring = NETMAP_RXRING(targ->nmd->nifp, targ->nmd->first_rx_ring); + + while (!targ->cancel) { + unsigned int head; + uint32_t seq; + int limit; + + /* Once we started to receive packets, wait at most 1 seconds + before quitting. */ + if (poll(&pfd, 1, 1 * 1000) <= 0 && !targ->g->forever) { + clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc); + targ->toc.tv_sec -= 1; /* Subtract timeout time. */ + goto out; + } + + if (pfd.revents & POLLERR) { + D("poll err"); + goto quit; + } + + if (nm_ring_empty(ring)) + continue; + + limit = nm_ring_space(ring); + if (limit > targ->g->burst) + limit = targ->g->burst; + +#if 0 + /* Enable this if + * 1) we remove the early-return optimization from + * the netmap poll implementation, or + * 2) pipes get NS_MOREFRAG support. + * With the current netmap implementation, an experiment like + * pkt-gen -i vale:1{1 -f txseq -F 9 + * pkt-gen -i vale:1}1 -f rxseq + * would get stuck as soon as we find nm_ring_space(ring) < 9, + * since here limit is rounded to 0 and + * pipe rxsync is not called anymore by the poll() of this loop. + */ + if (frags_exp > 1) { + int o = limit; + /* Cut off to the closest smaller multiple. */ + limit = (limit / frags_exp) * frags_exp; + RD(2, "LIMIT %d --> %d", o, limit); + } +#endif - printf("Speed: %spps Bandwidth: %sbps (raw %sbps)\n", - norm(b1, pps), norm(b2, bw), norm(b3, raw_bw) ); + for (head = ring->head, i = 0; i < limit; i++) { + struct netmap_slot *slot = &ring->slot[head]; + char *p = NETMAP_BUF(ring, slot->buf_idx); + int len = slot->len; + struct pkt *pkt; + + if (dump) { + dump_payload(p, slot->len, ring, head); + } + + frags++; + if (!(slot->flags & NS_MOREFRAG)) { + if (first_packet) { + first_packet = 0; + } else if (frags != frags_exp) { + char prbuf[512]; + RD(1, "Received packets with %u frags, " + "expected %u, '%s'", frags, frags_exp, + multi_slot_to_string(ring, head-frags+1, frags, + prbuf, sizeof(prbuf))); + } + first_packet = 0; + frags_exp = frags; + frags = 0; + } + + p -= sizeof(pkt->vh) - targ->g->virt_header; + len += sizeof(pkt->vh) - targ->g->virt_header; + pkt = (struct pkt *)p; + + if ((char *)pkt + len < ((char *)pkt->body) + sizeof(seq)) { + RD(1, "%s: packet too small (len=%u)", __func__, + slot->len); + } else { + seq = (pkt->body[0] << 24) | (pkt->body[1] << 16) + | (pkt->body[2] << 8) | pkt->body[3]; + if (first_slot) { + /* Grab the first one, whatever it + is. */ + seq_exp = seq; + first_slot = 0; + } else if (seq != seq_exp) { + uint32_t delta = seq - seq_exp; + + if (delta < (0xFFFFFFFF >> 1)) { + RD(2, "Sequence GAP: exp %u found %u", + seq_exp, seq); + } else { + RD(2, "Sequence OUT OF ORDER: " + "exp %u found %u", seq_exp, seq); + } + seq_exp = seq; + } + seq_exp++; + } + + cur.bytes += slot->len; + head = nm_ring_next(ring, head); + cur.pkts++; + } + + ring->cur = ring->head = head; + + cur.events++; + targ->ctr = cur; + } + + clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc); + +out: + targ->completed = 1; + targ->ctr = cur; + +quit: + /* reset the ``used`` flag. */ + targ->used = 0; + + return (NULL); } static void -rx_output(uint64_t received, double delta) +tx_output(struct my_ctrs *cur, double delta, const char *msg) { - double pps; - char b1[40]; + double bw, raw_bw, pps, abs; + char b1[40], b2[80], b3[80]; + int size; - printf("Received %llu packets, in %.2f seconds.\n", - (unsigned long long) received, delta); + if (cur->pkts == 0) { + printf("%s nothing.\n", msg); + return; + } + + size = (int)(cur->bytes / cur->pkts); + printf("%s %llu packets %llu bytes %llu events %d bytes each in %.2f seconds.\n", + msg, + (unsigned long long)cur->pkts, + (unsigned long long)cur->bytes, + (unsigned long long)cur->events, size, delta); if (delta == 0) delta = 1e-6; - pps = received / delta; - printf("Speed: %spps\n", norm(b1, pps)); + if (size < 60) /* correct for min packet size */ + size = 60; + pps = cur->pkts / delta; + bw = (8.0 * cur->bytes) / delta; + /* raw packets have4 bytes crc + 20 bytes framing */ + raw_bw = (8.0 * (cur->pkts * 24 + cur->bytes)) / delta; + abs = cur->pkts / (double)(cur->events); + + printf("Speed: %spps Bandwidth: %sbps (raw %sbps). Average batch: %.2f pkts\n", + norm(b1, pps), norm(b2, bw), norm(b3, raw_bw), abs); } static void @@ -1389,9 +1870,9 @@ usage(void) "Usage:\n" "%s arguments\n" "\t-i interface interface name\n" - "\t-f function tx rx ping pong\n" + "\t-f function tx rx ping pong txseq rxseq\n" "\t-n count number of iterations (can be 0)\n" - "\t-t pkts_to_send also forces tx mode\n" + "\t-t pkts_to_send also forces tx mode\n" "\t-r pkts_to_receive also forces rx mode\n" "\t-l pkt_size in bytes excluding CRC\n" "\t-d dst_ip[:port[-dst_ip:port]] single or range\n" @@ -1403,20 +1884,29 @@ usage(void) "\t-c cores cores to use\n" "\t-p threads processes/threads to use\n" "\t-T report_ms milliseconds between reports\n" - "\t-P use libpcap instead of netmap\n" "\t-w wait_for_link_time in seconds\n" "\t-R rate in packets per second\n" "\t-X dump payload\n" "\t-H len add empty virtio-net-header with size 'len'\n" + "\t-E pipes allocate extra space for a number of pipes\n" + "\t-r do not touch the buffers (send rubbish)\n" "\t-P file load packet from pcap file\n" "\t-z use random IPv4 src address/port\n" "\t-Z use random IPv4 dst address/port\n" + "\t-F num_frags send multi-slot packets\n" + "\t-A activate pps stats on receiver\n" "", cmd); exit(0); } +enum { + TD_TYPE_SENDER = 1, + TD_TYPE_RECEIVER, + TD_TYPE_OTHER, +}; + static void start_threads(struct glob_arg *g) { @@ -1439,33 +1929,32 @@ start_threads(struct glob_arg *g) uint64_t nmd_flags = 0; nmd.self = &nmd; - if (g->nthreads > 1) { - if (nmd.req.nr_flags != NR_REG_ALL_NIC) { - D("invalid nthreads mode %d", nmd.req.nr_flags); + if (i > 0) { + /* the first thread uses the fd opened by the main + * thread, the other threads re-open /dev/netmap + */ + if (g->nthreads > 1) { + nmd.req.nr_flags = + g->nmd->req.nr_flags & ~NR_REG_MASK; + nmd.req.nr_flags |= NR_REG_ONE_NIC; + nmd.req.nr_ringid = i; + } + /* Only touch one of the rings (rx is already ok) */ + if (g->td_type == TD_TYPE_RECEIVER) + nmd_flags |= NETMAP_NO_TX_POLL; + + /* register interface. Override ifname and ringid etc. */ + t->nmd = nm_open(t->g->ifname, NULL, nmd_flags | + NM_OPEN_IFNAME | NM_OPEN_NO_MMAP, &nmd); + if (t->nmd == NULL) { + D("Unable to open %s: %s", + t->g->ifname, strerror(errno)); continue; } - nmd.req.nr_flags = NR_REG_ONE_NIC; - nmd.req.nr_ringid = i; - } - /* Only touch one of the rings (rx is already ok) */ - if (g->td_body == receiver_body) - nmd_flags |= NETMAP_NO_TX_POLL; - - /* register interface. Override ifname and ringid etc. */ - if (g->options & OPT_MONITOR_TX) - nmd.req.nr_flags |= NR_MONITOR_TX; - if (g->options & OPT_MONITOR_RX) - nmd.req.nr_flags |= NR_MONITOR_RX; - - t->nmd = nm_open(t->g->ifname, NULL, nmd_flags | - NM_OPEN_IFNAME | NM_OPEN_NO_MMAP, &nmd); - if (t->nmd == NULL) { - D("Unable to open %s: %s", - t->g->ifname, strerror(errno)); - continue; + } else { + t->nmd = g->nmd; } t->fd = t->nmd->fd; - set_vnet_hdr_len(t); } else { targs[i].fd = g->main_fd; @@ -1473,10 +1962,7 @@ start_threads(struct glob_arg *g) t->used = 1; t->me = i; if (g->affinity >= 0) { - if (g->affinity < g->cpus) - t->affinity = g->affinity; - else - t->affinity = i % g->cpus; + t->affinity = (g->affinity + i) % g->system_cpus; } else { t->affinity = -1; } @@ -1495,45 +1981,89 @@ main_thread(struct glob_arg *g) { int i; - uint64_t prev = 0; - uint64_t count = 0; + struct my_ctrs prev, cur; double delta_t; struct timeval tic, toc; - gettimeofday(&toc, NULL); + prev.pkts = prev.bytes = prev.events = 0; + gettimeofday(&prev.t, NULL); for (;;) { - struct timeval now, delta; - uint64_t pps, usec, my_count, npkts; + char b1[40], b2[40], b3[40], b4[70]; + uint64_t pps, usec; + struct my_ctrs x; + double abs; int done = 0; - delta.tv_sec = g->report_interval/1000; - delta.tv_usec = (g->report_interval%1000)*1000; - select(0, NULL, NULL, NULL, &delta); - gettimeofday(&now, NULL); - timersub(&now, &toc, &toc); - my_count = 0; + usec = wait_for_next_report(&prev.t, &cur.t, + g->report_interval); + + cur.pkts = cur.bytes = cur.events = 0; + cur.min_space = 0; + if (usec < 10000) /* too short to be meaningful */ + continue; + /* accumulate counts for all threads */ for (i = 0; i < g->nthreads; i++) { - my_count += targs[i].count; + cur.pkts += targs[i].ctr.pkts; + cur.bytes += targs[i].ctr.bytes; + cur.events += targs[i].ctr.events; + cur.min_space += targs[i].ctr.min_space; + targs[i].ctr.min_space = 99999; if (targs[i].used == 0) done++; } - usec = toc.tv_sec* 1000000 + toc.tv_usec; - if (usec < 10000) - continue; - npkts = my_count - prev; - pps = (npkts*1000000 + usec/2) / usec; - D("%llu pps (%llu pkts in %llu usec)", - (unsigned long long)pps, - (unsigned long long)npkts, - (unsigned long long)usec); - prev = my_count; - toc = now; + x.pkts = cur.pkts - prev.pkts; + x.bytes = cur.bytes - prev.bytes; + x.events = cur.events - prev.events; + pps = (x.pkts*1000000 + usec/2) / usec; + abs = (x.events > 0) ? (x.pkts / (double) x.events) : 0; + + if (!(g->options & OPT_PPS_STATS)) { + strcpy(b4, ""); + } else { + /* Compute some pps stats using a sliding window. */ + double ppsavg = 0.0, ppsdev = 0.0; + int nsamples = 0; + + g->win[g->win_idx] = pps; + g->win_idx = (g->win_idx + 1) % STATS_WIN; + + for (i = 0; i < STATS_WIN; i++) { + ppsavg += g->win[i]; + if (g->win[i]) { + nsamples ++; + } + } + ppsavg /= nsamples; + + for (i = 0; i < STATS_WIN; i++) { + if (g->win[i] == 0) { + continue; + } + ppsdev += (g->win[i] - ppsavg) * (g->win[i] - ppsavg); + } + ppsdev /= nsamples; + ppsdev = sqrt(ppsdev); + + snprintf(b4, sizeof(b4), "[avg/std %s/%s pps]", + norm(b1, ppsavg), norm(b2, ppsdev)); + } + + D("%spps %s(%spkts %sbps in %llu usec) %.2f avg_batch %d min_space", + norm(b1, pps), b4, + norm(b2, (double)x.pkts), + norm(b3, (double)x.bytes*8), + (unsigned long long)usec, + abs, (int)cur.min_space); + prev = cur; + if (done == g->nthreads) break; } timerclear(&tic); timerclear(&toc); + cur.pkts = cur.bytes = cur.events = 0; + /* final round */ for (i = 0; i < g->nthreads; i++) { struct timespec t_tic, t_toc; /* @@ -1541,8 +2071,13 @@ main_thread(struct glob_arg *g) * file descriptors. */ if (targs[i].used) - pthread_join(targs[i].thread, NULL); - close(targs[i].fd); + pthread_join(targs[i].thread, NULL); /* blocking */ + if (g->dev_type == DEV_NETMAP) { + nm_close(targs[i].nmd); + targs[i].nmd = NULL; + } else { + close(targs[i].fd); + } if (targs[i].completed == 0) D("ouch, thread %d exited with error", i); @@ -1551,7 +2086,13 @@ main_thread(struct glob_arg *g) * Collect threads output and extract information about * how long it took to send all the packets. */ - count += targs[i].count; + cur.pkts += targs[i].ctr.pkts; + cur.bytes += targs[i].ctr.bytes; + cur.events += targs[i].ctr.events; + /* collect the largest start (tic) and end (toc) times, + * XXX maybe we should do the earliest tic, or do a weighted + * average ? + */ t_tic = timeval2spec(&tic); t_toc = timeval2spec(&toc); if (!timerisset(&tic) || timespec_ge(&targs[i].tic, &t_tic)) @@ -1563,29 +2104,26 @@ main_thread(struct glob_arg *g) /* print output. */ timersub(&toc, &tic, &toc); delta_t = toc.tv_sec + 1e-6* toc.tv_usec; - if (g->td_body == sender_body) - tx_output(count, g->pkt_size, delta_t); + if (g->td_type == TD_TYPE_SENDER) + tx_output(&cur, delta_t, "Sent"); else - rx_output(count, delta_t); - - if (g->dev_type == DEV_NETMAP) { - munmap(g->nmd->mem, g->nmd->req.nr_memsize); - close(g->main_fd); - } + tx_output(&cur, delta_t, "Received"); } - -struct sf { +struct td_desc { + int ty; char *key; void *f; }; -static struct sf func[] = { - { "tx", sender_body }, - { "rx", receiver_body }, - { "ping", pinger_body }, - { "pong", ponger_body }, - { NULL, NULL } +static struct td_desc func[] = { + { TD_TYPE_SENDER, "tx", sender_body }, + { TD_TYPE_RECEIVER, "rx", receiver_body }, + { TD_TYPE_OTHER, "ping", pinger_body }, + { TD_TYPE_OTHER, "pong", ponger_body }, + { TD_TYPE_SENDER, "txseq", txseq_body }, + { TD_TYPE_RECEIVER, "rxseq", rxseq_body }, + { 0, NULL, NULL } }; static int @@ -1654,6 +2192,8 @@ int main(int arc, char **argv) { int i; + struct sigaction sa; + sigset_t ss; struct glob_arg g; @@ -1665,6 +2205,7 @@ main(int arc, char **argv) g.main_fd = -1; g.td_body = receiver_body; + g.td_type = TD_TYPE_RECEIVER; g.report_interval = 1000; /* report interval */ g.affinity = -1; /* ip addresses can also be a range x.x.x.x-x.x.x.y */ @@ -1675,7 +2216,7 @@ main(int arc, char **argv) g.pkt_size = 60; g.burst = 512; // default g.nthreads = 1; - g.cpus = 1; + g.cpus = 1; // default g.forever = 1; g.tx_rate = 0; g.frags = 1; @@ -1683,8 +2224,8 @@ main(int arc, char **argv) g.virt_header = 0; while ( (ch = getopt(arc, argv, - "a:f:F:n:i:Il:d:s:D:S:b:c:o:p:T:w:WvR:XC:H:e:m:P:zZ")) != -1) { - struct sf *fn; + "a:f:F:n:i:Il:d:s:D:S:b:c:o:p:T:w:WvR:XC:H:e:E:m:rP:zZA")) != -1) { + struct td_desc *fn; switch(ch) { default: @@ -1693,7 +2234,7 @@ main(int arc, char **argv) break; case 'n': - g.npackets = atoi(optarg); + g.npackets = strtoull(optarg, NULL, 10); break; case 'F': @@ -1710,10 +2251,12 @@ main(int arc, char **argv) if (!strcmp(fn->key, optarg)) break; } - if (fn->key) + if (fn->key) { g.td_body = fn->f; - else + g.td_type = fn->ty; + } else { D("unrecognised function %s", optarg); + } break; case 'o': /* data generation options */ @@ -1817,24 +2360,27 @@ main(int arc, char **argv) case 'e': /* extra bufs */ g.extra_bufs = atoi(optarg); break; - case 'm': - if (strcmp(optarg, "tx") == 0) { - g.options |= OPT_MONITOR_TX; - } else if (strcmp(optarg, "rx") == 0) { - g.options |= OPT_MONITOR_RX; - } else { - D("unrecognized monitor mode %s", optarg); - } + case 'E': + g.extra_pipes = atoi(optarg); break; case 'P': g.packet_file = strdup(optarg); break; + case 'm': + /* ignored */ + break; + case 'r': + g.options |= OPT_RUBBISH; + break; case 'z': g.options |= OPT_RANDOM_SRC; break; case 'Z': g.options |= OPT_RANDOM_DST; break; + case 'A': + g.options |= OPT_PPS_STATS; + break; } } @@ -1843,11 +2389,12 @@ main(int arc, char **argv) usage(); } - i = system_ncpus(); + g.system_cpus = i = system_ncpus(); if (g.cpus < 0 || g.cpus > i) { D("%d cpus is too high, have only %d cpus", g.cpus, i); usage(); } +D("running on %d cpus (have %d)", g.cpus, i); if (g.cpus == 0) g.cpus = i; @@ -1914,6 +2461,11 @@ main(int arc, char **argv) if (g.extra_bufs) { base_nmd.nr_arg3 = g.extra_bufs; } + if (g.extra_pipes) { + base_nmd.nr_arg1 = g.extra_pipes; + } + + base_nmd.nr_flags |= NR_ACCEPT_VNET_HDR; /* * Open the netmap device using nm_open(). @@ -1927,13 +2479,38 @@ main(int arc, char **argv) D("Unable to open %s: %s", g.ifname, strerror(errno)); goto out; } + + if (g.nthreads > 1) { + struct nm_desc saved_desc = *g.nmd; + saved_desc.self = &saved_desc; + saved_desc.mem = NULL; + nm_close(g.nmd); + saved_desc.req.nr_flags &= ~NR_REG_MASK; + saved_desc.req.nr_flags |= NR_REG_ONE_NIC; + saved_desc.req.nr_ringid = 0; + g.nmd = nm_open(g.ifname, &base_nmd, NM_OPEN_IFNAME, &saved_desc); + if (g.nmd == NULL) { + D("Unable to open %s: %s", g.ifname, strerror(errno)); + goto out; + } + } g.main_fd = g.nmd->fd; D("mapped %dKB at %p", g.nmd->req.nr_memsize>>10, g.nmd->mem); - /* get num of queues in tx or rx */ - if (g.td_body == sender_body) + if (g.virt_header) { + /* Set the virtio-net header length, since the user asked + * for it explicitely. */ + set_vnet_hdr_len(&g); + } else { + /* Check whether the netmap port we opened requires us to send + * and receive frames with virtio-net header. */ + get_vnet_hdr_len(&g); + } + + /* get num of queues in tx or rx */ + if (g.td_type == TD_TYPE_SENDER) devqueues = g.nmd->req.nr_tx_rings; - else + else devqueues = g.nmd->req.nr_rx_rings; /* validate provided nthreads. */ @@ -1951,25 +2528,27 @@ main(int arc, char **argv) req->nr_arg2); for (i = 0; i <= req->nr_tx_rings; i++) { struct netmap_ring *ring = NETMAP_TXRING(nifp, i); - D(" TX%d at 0x%lx slots %d", i, - (char *)ring - (char *)nifp, ring->num_slots); + D(" TX%d at 0x%p slots %d", i, + (void *)((char *)ring - (char *)nifp), ring->num_slots); } for (i = 0; i <= req->nr_rx_rings; i++) { struct netmap_ring *ring = NETMAP_RXRING(nifp, i); - D(" RX%d at 0x%lx slots %d", i, - (char *)ring - (char *)nifp, ring->num_slots); + D(" RX%d at 0x%p slots %d", i, + (void *)((char *)ring - (char *)nifp), ring->num_slots); } } /* Print some debug information. */ fprintf(stdout, "%s %s: %d queues, %d threads and %d cpus.\n", - (g.td_body == sender_body) ? "Sending on" : "Receiving from", + (g.td_type == TD_TYPE_SENDER) ? "Sending on" : + ((g.td_type == TD_TYPE_RECEIVER) ? "Receiving from" : + "Working on"), g.ifname, devqueues, g.nthreads, g.cpus); - if (g.td_body == sender_body) { + if (g.td_type == TD_TYPE_SENDER) { fprintf(stdout, "%s -> %s (%s -> %s)\n", g.src_ip.name, g.dst_ip.name, g.src_mac.name, g.dst_mac.name); @@ -1985,12 +2564,13 @@ out: if (g.options) { - D("--- SPECIAL OPTIONS:%s%s%s%s%s\n", + D("--- SPECIAL OPTIONS:%s%s%s%s%s%s\n", g.options & OPT_PREFETCH ? " prefetch" : "", g.options & OPT_ACCESS ? " access" : "", g.options & OPT_MEMCPY ? " memcpy" : "", g.options & OPT_INDIRECT ? " indirect" : "", - g.options & OPT_COPY ? " copy" : ""); + g.options & OPT_COPY ? " copy" : "", + g.options & OPT_RUBBISH ? " rubbish " : ""); } g.tx_period.tv_sec = g.tx_period.tv_nsec = 0; @@ -2010,7 +2590,7 @@ out: g.tx_period.tv_sec = g.tx_period.tv_nsec / 1000000000; g.tx_period.tv_nsec = g.tx_period.tv_nsec % 1000000000; } - if (g.td_body == sender_body) + if (g.td_type == TD_TYPE_SENDER) D("Sending %d packets every %ld.%09ld s", g.burst, g.tx_period.tv_sec, g.tx_period.tv_nsec); /* Wait for PHY reset. */ @@ -2020,10 +2600,24 @@ out: /* Install ^C handler. */ global_nthreads = g.nthreads; - signal(SIGINT, sigint_h); - + sigemptyset(&ss); + sigaddset(&ss, SIGINT); + /* block SIGINT now, so that all created threads will inherit the mask */ + if (pthread_sigmask(SIG_BLOCK, &ss, NULL) < 0) { + D("failed to block SIGINT: %s", strerror(errno)); + } start_threads(&g); + /* Install the handler and re-enable SIGINT for the main thread */ + sa.sa_handler = sigint_h; + if (sigaction(SIGINT, &sa, NULL) < 0) { + D("failed to install ^C handler: %s", strerror(errno)); + } + + if (pthread_sigmask(SIG_UNBLOCK, &ss, NULL) < 0) { + D("failed to re-enable SIGINT: %s", strerror(errno)); + } main_thread(&g); + free(targs); return 0; } |