diff options
Diffstat (limited to 'dnstap')
| -rw-r--r-- | dnstap/dnstap.c | 14 | ||||
| -rw-r--r-- | dnstap/dnstap.h | 3 | ||||
| -rw-r--r-- | dnstap/dtstream.c | 152 | ||||
| -rw-r--r-- | dnstap/dtstream.h | 15 | ||||
| -rw-r--r-- | dnstap/unbound-dnstap-socket.c | 70 |
5 files changed, 145 insertions, 109 deletions
diff --git a/dnstap/dnstap.c b/dnstap/dnstap.c index cc5449dff4a1..0c8c6c4d462a 100644 --- a/dnstap/dnstap.c +++ b/dnstap/dnstap.c @@ -134,9 +134,15 @@ dt_create(struct config_file* cfg) if(cfg->dnstap && cfg->dnstap_socket_path && cfg->dnstap_socket_path[0] && (cfg->dnstap_ip==NULL || cfg->dnstap_ip[0]==0)) { + char* p = fname_after_chroot(cfg->dnstap_socket_path, cfg, 1); + if(!p) { + log_err("malloc failure"); + return NULL; + } verbose(VERB_OPS, "attempting to connect to dnstap socket %s", - cfg->dnstap_socket_path); - check_socket_file(cfg->dnstap_socket_path); + p); + check_socket_file(p); + free(p); } env = (struct dt_env *) calloc(1, sizeof(struct dt_env)); @@ -240,9 +246,9 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg) } int -dt_init(struct dt_env *env) +dt_init(struct dt_env *env, struct comm_base* base) { - env->msgqueue = dt_msg_queue_create(); + env->msgqueue = dt_msg_queue_create(base); if(!env->msgqueue) { log_err("malloc failure"); return 0; diff --git a/dnstap/dnstap.h b/dnstap/dnstap.h index cfef6fc420b9..783b8c51430a 100644 --- a/dnstap/dnstap.h +++ b/dnstap/dnstap.h @@ -101,10 +101,11 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg); /** * Initialize per-worker state in dnstap environment object. * @param env: dnstap environment object to initialize, created with dt_create(). + * @param base: event base for wakeup timer. * @return: true on success, false on failure. */ int -dt_init(struct dt_env *env); +dt_init(struct dt_env *env, struct comm_base* base); /** * Deletes the per-worker state created by dt_init diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index dda3ef1ff485..b0918c52cc63 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -68,6 +68,8 @@ #define DTIO_RECONNECT_TIMEOUT_MAX 1000 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 +/** number of messages before wakeup of thread */ +#define DTIO_MSG_FOR_WAKEUP 32 /** maximum length of received frame */ #define DTIO_RECV_FRAME_MAX_LEN 1000 @@ -99,13 +101,18 @@ static int dtio_enable_brief_write(struct dt_io_thread* dtio); #endif struct dt_msg_queue* -dt_msg_queue_create(void) +dt_msg_queue_create(struct comm_base* base) { struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); if(!mq) return NULL; mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, about 1 M should contain 64K messages with some overhead, or a whole bunch smaller ones */ + mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); + if(!mq->wakeup_timer) { + free(mq); + return NULL; + } lock_basic_init(&mq->lock); lock_protect(&mq->lock, mq, sizeof(*mq)); return mq; @@ -125,6 +132,7 @@ dt_msg_queue_clear(struct dt_msg_queue* mq) mq->first = NULL; mq->last = NULL; mq->cursize = 0; + mq->msgcount = 0; } void @@ -133,6 +141,7 @@ dt_msg_queue_delete(struct dt_msg_queue* mq) if(!mq) return; lock_basic_destroy(&mq->lock); dt_msg_queue_clear(mq); + comm_timer_delete(mq->wakeup_timer); free(mq); } @@ -149,15 +158,14 @@ static void dtio_wakeup(struct dt_io_thread* dtio) #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) continue; - log_err("dnstap io wakeup: write: %s", strerror(errno)); #else if(WSAGetLastError() == WSAEINPROGRESS) continue; if(WSAGetLastError() == WSAEWOULDBLOCK) continue; - log_err("dnstap io stop: write: %s", - wsa_strerror(WSAGetLastError())); #endif + log_err("dnstap io wakeup: write: %s", + sock_strerror(errno)); break; } break; @@ -165,9 +173,56 @@ static void dtio_wakeup(struct dt_io_thread* dtio) } void +mq_wakeup_cb(void* arg) +{ + struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; + /* even if the dtio is already active, because perhaps much + * traffic suddenly, we leave the timer running to save on + * managing it, the once a second timer is less work then + * starting and stopping the timer frequently */ + lock_basic_lock(&mq->dtio->wakeup_timer_lock); + mq->dtio->wakeup_timer_enabled = 0; + lock_basic_unlock(&mq->dtio->wakeup_timer_lock); + dtio_wakeup(mq->dtio); +} + +/** start timer to wakeup dtio because there is content in the queue */ +static void +dt_msg_queue_start_timer(struct dt_msg_queue* mq) +{ + struct timeval tv; + /* Start a timer to process messages to be logged. + * If we woke up the dtio thread for every message, the wakeup + * messages take up too much processing power. If the queue + * fills up the wakeup happens immediately. The timer wakes it up + * if there are infrequent messages to log. */ + + /* we cannot start a timer in dtio thread, because it is a different + * thread and its event base is in use by the other thread, it would + * give race conditions if we tried to modify its event base, + * and locks would wait until it woke up, and this is what we do. */ + + /* do not start the timer if a timer already exists, perhaps + * in another worker. So this variable is protected by a lock in + * dtio */ + lock_basic_lock(&mq->dtio->wakeup_timer_lock); + if(mq->dtio->wakeup_timer_enabled) { + lock_basic_unlock(&mq->dtio->wakeup_timer_lock); + return; + } + mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ + lock_basic_unlock(&mq->dtio->wakeup_timer_lock); + + /* start the timer, in mq, in the event base of our worker */ + tv.tv_sec = 1; + tv.tv_usec = 0; + comm_timer_set(mq->wakeup_timer, &tv); +} + +void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) { - int wakeup = 0; + int wakeupnow = 0, wakeupstarttimer = 0; struct dt_msg_entry* entry; /* check conditions */ @@ -198,9 +253,15 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) /* aqcuire lock */ lock_basic_lock(&mq->lock); - /* list was empty, wakeup dtio */ + /* if list was empty, start timer for (eventual) wakeup */ if(mq->first == NULL) - wakeup = 1; + wakeupstarttimer = 1; + /* if list contains more than wakeupnum elements, wakeup now, + * or if list is (going to be) almost full */ + if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || + (mq->cursize < mq->maxsize * 9 / 10 && + mq->cursize+len >= mq->maxsize * 9 / 10)) + wakeupnow = 1; /* see if it is going to fit */ if(mq->cursize + len > mq->maxsize) { /* buffer full, or congested. */ @@ -211,6 +272,7 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) return; } mq->cursize += len; + mq->msgcount ++; /* append to list */ if(mq->last) { mq->last->next = entry; @@ -221,13 +283,19 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) /* release lock */ lock_basic_unlock(&mq->lock); - if(wakeup) + if(wakeupnow) { dtio_wakeup(mq->dtio); + } else if(wakeupstarttimer) { + dt_msg_queue_start_timer(mq); + } } struct dt_io_thread* dt_io_thread_create(void) { struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); + lock_basic_init(&dtio->wakeup_timer_lock); + lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, + sizeof(dtio->wakeup_timer_enabled)); return dtio; } @@ -235,6 +303,7 @@ void dt_io_thread_delete(struct dt_io_thread* dtio) { struct dt_io_list_item* item, *nextitem; if(!dtio) return; + lock_basic_destroy(&dtio->wakeup_timer_lock); item=dtio->io_list; while(item) { nextitem = item->next; @@ -279,7 +348,8 @@ int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) return 0; } free(dtio->socket_path); - dtio->socket_path = strdup(cfg->dnstap_socket_path); + dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path, + cfg, 1); if(!dtio->socket_path) { log_err("dnstap setup: malloc failure"); return 0; @@ -416,6 +486,7 @@ static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, mq->first = entry->next; if(!entry->next) mq->last = NULL; mq->cursize -= entry->len; + mq->msgcount --; lock_basic_unlock(&mq->lock); *buf = entry->buf; @@ -587,11 +658,7 @@ static void dtio_del_output_event(struct dt_io_thread* dtio) /** close dtio socket and set it to -1 */ static void dtio_close_fd(struct dt_io_thread* dtio) { -#ifndef USE_WINSOCK - close(dtio->fd); -#else - closesocket(dtio->fd); -#endif + sock_close(dtio->fd); dtio->fd = -1; } @@ -659,13 +726,8 @@ static int dtio_check_nb_connect(struct dt_io_thread* dtio) char* to = dtio->socket_path; if(!to) to = dtio->ip_str; if(!to) to = ""; -#ifndef USE_WINSOCK log_err("dnstap io: failed to connect to \"%s\": %s", - to, strerror(error)); -#else - log_err("dnstap io: failed to connect to \"%s\": %s", - to, wsa_strerror(error)); -#endif + to, sock_strerror(error)); return -1; /* error, close it */ } @@ -742,7 +804,6 @@ static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return 0; - log_err("dnstap io: failed send: %s", strerror(errno)); #else if(WSAGetLastError() == WSAEINPROGRESS) return 0; @@ -752,9 +813,8 @@ static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, UB_EV_WRITE); return 0; } - log_err("dnstap io: failed send: %s", - wsa_strerror(WSAGetLastError())); #endif + log_err("dnstap io: failed send: %s", sock_strerror(errno)); return -1; } return ret; @@ -778,7 +838,6 @@ static int dtio_write_with_writev(struct dt_io_thread* dtio) #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return 0; - log_err("dnstap io: failed writev: %s", strerror(errno)); #else if(WSAGetLastError() == WSAEINPROGRESS) return 0; @@ -788,9 +847,8 @@ static int dtio_write_with_writev(struct dt_io_thread* dtio) UB_EV_WRITE); return 0; } - log_err("dnstap io: failed writev: %s", - wsa_strerror(WSAGetLastError())); #endif + log_err("dnstap io: failed writev: %s", sock_strerror(errno)); /* close the channel */ dtio_del_output_event(dtio); dtio_close_output(dtio); @@ -1115,6 +1173,8 @@ static int dtio_read_accept_frame(struct dt_io_thread* dtio) goto close_connection; } dtio->accept_frame_received = 1; + if(!dtio_add_output_event_write(dtio)) + goto close_connection; return 1; } else { /* unknow content type */ @@ -1482,15 +1542,13 @@ void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return; /* ignore this */ - log_err("dnstap io: failed to read: %s", strerror(errno)); #else if(WSAGetLastError() == WSAEINPROGRESS) return; if(WSAGetLastError() == WSAEWOULDBLOCK) return; - log_err("dnstap io: failed to read: %s", - wsa_strerror(WSAGetLastError())); #endif + log_err("dnstap io: failed to read: %s", sock_strerror(errno)); /* and then fall through to quit the thread */ } else if(r == 0) { verbose(VERB_ALGO, "dnstap io: cmd channel closed"); @@ -1852,13 +1910,8 @@ static int dtio_open_output_local(struct dt_io_thread* dtio) struct sockaddr_un s; dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); if(dtio->fd == -1) { -#ifndef USE_WINSOCK - log_err("dnstap io: failed to create socket: %s", - strerror(errno)); -#else log_err("dnstap io: failed to create socket: %s", - wsa_strerror(WSAGetLastError())); -#endif + sock_strerror(errno)); return 0; } memset(&s, 0, sizeof(s)); @@ -1873,13 +1926,13 @@ static int dtio_open_output_local(struct dt_io_thread* dtio) if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) == -1) { char* to = dtio->socket_path; -#ifndef USE_WINSOCK - log_err("dnstap io: failed to connect to \"%s\": %s", - to, strerror(errno)); -#else + if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && + verbosity < 4) { + dtio_close_fd(dtio); + return 0; /* no log retries on low verbosity */ + } log_err("dnstap io: failed to connect to \"%s\": %s", - to, wsa_strerror(WSAGetLastError())); -#endif + to, sock_strerror(errno)); dtio_close_fd(dtio); return 0; } @@ -1904,18 +1957,18 @@ static int dtio_open_output_tcp(struct dt_io_thread* dtio) } dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); if(dtio->fd == -1) { -#ifndef USE_WINSOCK - log_err("can't create socket: %s", strerror(errno)); -#else - log_err("can't create socket: %s", - wsa_strerror(WSAGetLastError())); -#endif + log_err("can't create socket: %s", sock_strerror(errno)); return 0; } fd_set_nonblock(dtio->fd); if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { if(errno == EINPROGRESS) return 1; /* wait until connect done*/ + if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && + verbosity < 4) { + dtio_close_fd(dtio); + return 0; /* no log retries on low verbosity */ + } #ifndef USE_WINSOCK if(tcp_connect_errno_needs_log( (struct sockaddr *)&addr, addrlen)) { @@ -2097,15 +2150,14 @@ void dt_io_thread_stop(struct dt_io_thread* dtio) #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) continue; - log_err("dnstap io stop: write: %s", strerror(errno)); #else if(WSAGetLastError() == WSAEINPROGRESS) continue; if(WSAGetLastError() == WSAEWOULDBLOCK) continue; - log_err("dnstap io stop: write: %s", - wsa_strerror(WSAGetLastError())); #endif + log_err("dnstap io stop: write: %s", + sock_strerror(errno)); break; } break; diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index ede491f30d3e..f87d6dc8d386 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -49,6 +49,7 @@ struct dt_msg_entry; struct dt_io_list_item; struct dt_io_thread; struct config_file; +struct comm_base; /** * A message buffer with dnstap messages queued up. It is per-worker. @@ -68,11 +69,15 @@ struct dt_msg_queue { /** current size of the buffer, in bytes. data bytes of messages. * If a new message make it more than maxsize, the buffer is full */ size_t cursize; + /** number of messages in the queue */ + int msgcount; /** list of messages. The messages are added to the back and taken * out from the front. */ struct dt_msg_entry* first, *last; /** reference to the io thread to wakeup */ struct dt_io_thread* dtio; + /** the wakeup timer for dtio, on worker event base */ + struct comm_timer* wakeup_timer; }; /** @@ -166,6 +171,10 @@ struct dt_io_thread { * for the current message length that precedes the frame */ size_t cur_msg_len_done; + /** lock on wakeup_timer_enabled */ + lock_basic_type wakeup_timer_lock; + /** if wakeup timer is enabled in some thread */ + int wakeup_timer_enabled; /** command pipe that stops the pipe if closed. Used to quit * the program. [0] is read, [1] is written to. */ int commandpipe[2]; @@ -233,9 +242,10 @@ struct dt_io_list_item { /** * Create new (empty) worker message queue. Limit set to default on max. + * @param base: event base for wakeup timer. * @return NULL on malloc failure or a new queue (not locked). */ -struct dt_msg_queue* dt_msg_queue_create(void); +struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base); /** * Delete a worker message queue. It has to be unlinked from access, @@ -258,6 +268,9 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq); */ void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); +/** timer callback to wakeup dtio thread to process messages */ +void mq_wakeup_cb(void* arg); + /** * Create IO thread. * @return new io thread object. not yet started. or NULL malloc failure. diff --git a/dnstap/unbound-dnstap-socket.c b/dnstap/unbound-dnstap-socket.c index 44a0eda95994..3ebe2b4e4124 100644 --- a/dnstap/unbound-dnstap-socket.c +++ b/dnstap/unbound-dnstap-socket.c @@ -278,57 +278,31 @@ static int make_tcp_accept(char* ip) } if((s = socket(addr.ss_family, SOCK_STREAM, 0)) == -1) { -#ifndef USE_WINSOCK - log_err("can't create socket: %s", strerror(errno)); -#else - log_err("can't create socket: %s", - wsa_strerror(WSAGetLastError())); -#endif + log_err("can't create socket: %s", sock_strerror(errno)); return -1; } #ifdef SO_REUSEADDR if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&on, (socklen_t)sizeof(on)) < 0) { -#ifndef USE_WINSOCK log_err("setsockopt(.. SO_REUSEADDR ..) failed: %s", - strerror(errno)); - close(s); -#else - log_err("setsockopt(.. SO_REUSEADDR ..) failed: %s", - wsa_strerror(WSAGetLastError())); - closesocket(s); -#endif + sock_strerror(errno)); + sock_close(s); return -1; } #endif /* SO_REUSEADDR */ if(bind(s, (struct sockaddr*)&addr, len) != 0) { -#ifndef USE_WINSOCK - log_err_addr("can't bind socket", strerror(errno), + log_err_addr("can't bind socket", sock_strerror(errno), &addr, len); - close(s); -#else - log_err_addr("can't bind socket", - wsa_strerror(WSAGetLastError()), &addr, len); - closesocket(s); -#endif + sock_close(s); return -1; } if(!fd_set_nonblock(s)) { -#ifndef USE_WINSOCK - close(s); -#else - closesocket(s); -#endif + sock_close(s); return -1; } if(listen(s, LISTEN_BACKLOG) == -1) { -#ifndef USE_WINSOCK - log_err("can't listen: %s", strerror(errno)); - close(s); -#else - log_err("can't listen: %s", wsa_strerror(WSAGetLastError())); - closesocket(s); -#endif + log_err("can't listen: %s", sock_strerror(errno)); + sock_close(s); return -1; } return s; @@ -654,7 +628,6 @@ static ssize_t receive_bytes(struct tap_data* data, int fd, void* buf, #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return -1; - log_err("could not recv: %s", strerror(errno)); #else /* USE_WINSOCK */ if(WSAGetLastError() == WSAEINPROGRESS) return -1; @@ -662,9 +635,8 @@ static ssize_t receive_bytes(struct tap_data* data, int fd, void* buf, ub_winsock_tcp_wouldblock(data->ev, UB_EV_READ); return -1; } - log_err("could not recv: %s", - wsa_strerror(WSAGetLastError())); #endif + log_err("could not recv: %s", sock_strerror(errno)); if(verbosity) log_info("dnstap client stream closed from %s", (data->id?data->id:"")); return 0; @@ -796,12 +768,7 @@ static int reply_with_accept(struct tap_data* data) } } else { if(send(data->fd, acceptframe, len, 0) == -1) { -#ifndef USE_WINSOCK - log_err("send failed: %s", strerror(errno)); -#else - log_err("send failed: %s", - wsa_strerror(WSAGetLastError())); -#endif + log_err("send failed: %s", sock_strerror(errno)); fd_set_nonblock(data->fd); free(acceptframe); return 0; @@ -834,11 +801,7 @@ static int reply_with_finish(int fd) fd_set_block(fd); if(send(fd, finishframe, len, 0) == -1) { -#ifndef USE_WINSOCK - log_err("send failed: %s", strerror(errno)); -#else - log_err("send failed: %s", wsa_strerror(WSAGetLastError())); -#endif + log_err("send failed: %s", sock_strerror(errno)); fd_set_nonblock(fd); free(finishframe); return 0; @@ -1094,7 +1057,6 @@ void dtio_mainfdcallback(int fd, short ATTR_UNUSED(bits), void* arg) #endif /* EPROTO */ ) return; - log_err_addr("accept failed", strerror(errno), &addr, addrlen); #else /* USE_WINSOCK */ if(WSAGetLastError() == WSAEINPROGRESS || WSAGetLastError() == WSAECONNRESET) @@ -1103,9 +1065,9 @@ void dtio_mainfdcallback(int fd, short ATTR_UNUSED(bits), void* arg) ub_winsock_tcp_wouldblock(maindata->ev, UB_EV_READ); return; } - log_err_addr("accept failed", wsa_strerror(WSAGetLastError()), - &addr, addrlen); #endif + log_err_addr("accept failed", sock_strerror(errno), &addr, + addrlen); return; } fd_set_nonblock(s); @@ -1205,8 +1167,10 @@ int sig_quit = 0; static RETSIGTYPE main_sigh(int sig) { verbose(VERB_ALGO, "exit on signal %d\n", sig); - if(sig_base) + if(sig_base) { ub_event_base_loopexit(sig_base); + sig_base = NULL; + } sig_quit = 1; } @@ -1247,9 +1211,9 @@ setup_and_run(struct config_strlist_head* local_list, if(verbosity) log_info("start of service"); ub_event_base_dispatch(base); + sig_base = NULL; if(verbosity) log_info("end of service"); - sig_base = NULL; tap_socket_list_delete(maindata->acceptlist); ub_event_base_free(base); free(maindata); |
