summaryrefslogtreecommitdiff
path: root/services/outside_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'services/outside_network.c')
-rw-r--r--services/outside_network.c1089
1 files changed, 984 insertions, 105 deletions
diff --git a/services/outside_network.c b/services/outside_network.c
index 41a1d83f1454..11951adea7bc 100644
--- a/services/outside_network.c
+++ b/services/outside_network.c
@@ -132,6 +132,52 @@ serviced_cmp(const void* key1, const void* key2)
return sockaddr_cmp(&q1->addr, q1->addrlen, &q2->addr, q2->addrlen);
}
+/** compare if the reuse element has the same address, port and same ssl-is
+ * used-for-it characteristic */
+static int
+reuse_cmp_addrportssl(const void* key1, const void* key2)
+{
+ struct reuse_tcp* r1 = (struct reuse_tcp*)key1;
+ struct reuse_tcp* r2 = (struct reuse_tcp*)key2;
+ int r;
+ /* compare address and port */
+ r = sockaddr_cmp(&r1->addr, r1->addrlen, &r2->addr, r2->addrlen);
+ if(r != 0)
+ return r;
+
+ /* compare if SSL-enabled */
+ if(r1->is_ssl && !r2->is_ssl)
+ return 1;
+ if(!r1->is_ssl && r2->is_ssl)
+ return -1;
+ return 0;
+}
+
+int
+reuse_cmp(const void* key1, const void* key2)
+{
+ int r;
+ r = reuse_cmp_addrportssl(key1, key2);
+ if(r != 0)
+ return r;
+
+ /* compare ptr value */
+ if(key1 < key2) return -1;
+ if(key1 > key2) return 1;
+ return 0;
+}
+
+int reuse_id_cmp(const void* key1, const void* key2)
+{
+ struct waiting_tcp* w1 = (struct waiting_tcp*)key1;
+ struct waiting_tcp* w2 = (struct waiting_tcp*)key2;
+ if(w1->id < w2->id)
+ return -1;
+ if(w1->id > w2->id)
+ return 1;
+ return 0;
+}
+
/** delete waiting_tcp entry. Does not unlink from waiting list.
* @param w: to delete.
*/
@@ -280,15 +326,234 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen)
return 1;
}
+/** log reuse item addr and ptr with message */
+static void
+log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse)
+{
+ uint16_t port;
+ char addrbuf[128];
+ if(verbosity < v) return;
+ addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf));
+ port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port);
+ verbose(v, "%s %s#%u fd %d", msg, addrbuf, (unsigned)port,
+ reuse->pending->c->fd);
+}
+
+/** pop the first element from the writewait list */
+static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse)
+{
+ struct waiting_tcp* w = reuse->write_wait_first;
+ if(!w)
+ return NULL;
+ log_assert(w->write_wait_queued);
+ log_assert(!w->write_wait_prev);
+ reuse->write_wait_first = w->write_wait_next;
+ if(w->write_wait_next)
+ w->write_wait_next->write_wait_prev = NULL;
+ else reuse->write_wait_last = NULL;
+ w->write_wait_queued = 0;
+ return w;
+}
+
+/** remove the element from the writewait list */
+static void reuse_write_wait_remove(struct reuse_tcp* reuse,
+ struct waiting_tcp* w)
+{
+ if(!w)
+ return;
+ if(!w->write_wait_queued)
+ return;
+ if(w->write_wait_prev)
+ w->write_wait_prev->write_wait_next = w->write_wait_next;
+ else reuse->write_wait_first = w->write_wait_next;
+ if(w->write_wait_next)
+ w->write_wait_next->write_wait_prev = w->write_wait_prev;
+ else reuse->write_wait_last = w->write_wait_prev;
+ w->write_wait_queued = 0;
+}
+
+/** push the element after the last on the writewait list */
+static void reuse_write_wait_push_back(struct reuse_tcp* reuse,
+ struct waiting_tcp* w)
+{
+ if(!w) return;
+ log_assert(!w->write_wait_queued);
+ if(reuse->write_wait_last) {
+ reuse->write_wait_last->write_wait_next = w;
+ w->write_wait_prev = reuse->write_wait_last;
+ } else {
+ reuse->write_wait_first = w;
+ }
+ reuse->write_wait_last = w;
+ w->write_wait_queued = 1;
+}
+
+/** insert element in tree by id */
+void
+reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+ log_assert(w->id_node.key == NULL);
+ w->id_node.key = w;
+ rbtree_insert(&reuse->tree_by_id, &w->id_node);
+}
+
+/** find element in tree by id */
+struct waiting_tcp*
+reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id)
+{
+ struct waiting_tcp key_w;
+ rbnode_type* n;
+ memset(&key_w, 0, sizeof(key_w));
+ key_w.id_node.key = &key_w;
+ key_w.id = id;
+ n = rbtree_search(&reuse->tree_by_id, &key_w);
+ if(!n) return NULL;
+ return (struct waiting_tcp*)n->key;
+}
+
+/** return ID value of rbnode in tree_by_id */
+static uint16_t
+tree_by_id_get_id(rbnode_type* node)
+{
+ struct waiting_tcp* w = (struct waiting_tcp*)node->key;
+ return w->id;
+}
+
+/** insert into reuse tcp tree and LRU, false on failure (duplicate) */
+static int
+reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
+{
+ log_reuse_tcp(VERB_CLIENT, "reuse_tcp_insert", &pend_tcp->reuse);
+ if(pend_tcp->reuse.item_on_lru_list)
+ return 1;
+ pend_tcp->reuse.node.key = &pend_tcp->reuse;
+ pend_tcp->reuse.pending = pend_tcp;
+ if(!rbtree_insert(&outnet->tcp_reuse, &pend_tcp->reuse.node)) {
+ /* this is a duplicate connection, close this one */
+ verbose(VERB_CLIENT, "reuse_tcp_insert: duplicate connection");
+ pend_tcp->reuse.node.key = NULL;
+ return 0;
+ }
+ /* insert into LRU, first is newest */
+ pend_tcp->reuse.lru_prev = NULL;
+ if(outnet->tcp_reuse_first) {
+ pend_tcp->reuse.lru_next = outnet->tcp_reuse_first;
+ outnet->tcp_reuse_first->lru_prev = &pend_tcp->reuse;
+ } else {
+ pend_tcp->reuse.lru_next = NULL;
+ outnet->tcp_reuse_last = &pend_tcp->reuse;
+ }
+ outnet->tcp_reuse_first = &pend_tcp->reuse;
+ pend_tcp->reuse.item_on_lru_list = 1;
+ return 1;
+}
+
+/** find reuse tcp stream to destination for query, or NULL if none */
+static struct reuse_tcp*
+reuse_tcp_find(struct outside_network* outnet, struct sockaddr_storage* addr,
+ socklen_t addrlen, int use_ssl)
+{
+ struct waiting_tcp key_w;
+ struct pending_tcp key_p;
+ struct comm_point c;
+ rbnode_type* result = NULL, *prev;
+ verbose(VERB_CLIENT, "reuse_tcp_find");
+ memset(&key_w, 0, sizeof(key_w));
+ memset(&key_p, 0, sizeof(key_p));
+ memset(&c, 0, sizeof(c));
+ key_p.query = &key_w;
+ key_p.c = &c;
+ key_p.reuse.pending = &key_p;
+ key_p.reuse.node.key = &key_p.reuse;
+ if(use_ssl)
+ key_p.reuse.is_ssl = 1;
+ if(addrlen > (socklen_t)sizeof(key_p.reuse.addr))
+ return NULL;
+ memmove(&key_p.reuse.addr, addr, addrlen);
+ key_p.reuse.addrlen = addrlen;
+
+ verbose(VERB_CLIENT, "reuse_tcp_find: num reuse streams %u",
+ (unsigned)outnet->tcp_reuse.count);
+ if(outnet->tcp_reuse.root == NULL ||
+ outnet->tcp_reuse.root == RBTREE_NULL)
+ return NULL;
+ if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node,
+ &result)) {
+ /* exact match */
+ /* but the key is on stack, and ptr is compared, impossible */
+ log_assert(&key_p.reuse != (struct reuse_tcp*)result);
+ log_assert(&key_p != ((struct reuse_tcp*)result)->pending);
+ }
+ /* not found, return null */
+ if(!result || result == RBTREE_NULL)
+ return NULL;
+ verbose(VERB_CLIENT, "reuse_tcp_find check inexact match");
+ /* inexact match, find one of possibly several connections to the
+ * same destination address, with the correct port, ssl, and
+ * also less than max number of open queries, or else, fail to open
+ * a new one */
+ /* rewind to start of sequence of same address,port,ssl */
+ prev = rbtree_previous(result);
+ while(prev && prev != RBTREE_NULL &&
+ reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) {
+ result = prev;
+ prev = rbtree_previous(result);
+ }
+
+ /* loop to find first one that has correct characteristics */
+ while(result && result != RBTREE_NULL &&
+ reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) {
+ if(((struct reuse_tcp*)result)->tree_by_id.count <
+ MAX_REUSE_TCP_QUERIES) {
+ /* same address, port, ssl-yes-or-no, and has
+ * space for another query */
+ return (struct reuse_tcp*)result;
+ }
+ result = rbtree_next(result);
+ }
+ return NULL;
+}
+
+/** use the buffer to setup writing the query */
+static void
+outnet_tcp_take_query_setup(int s, struct pending_tcp* pend,
+ struct waiting_tcp* w)
+{
+ struct timeval tv;
+ verbose(VERB_CLIENT, "outnet_tcp_take_query_setup: setup packet to write "
+ "len %d timeout %d msec",
+ (int)w->pkt_len, w->timeout);
+ pend->c->tcp_write_pkt = w->pkt;
+ pend->c->tcp_write_pkt_len = w->pkt_len;
+ pend->c->tcp_write_and_read = 1;
+ pend->c->tcp_write_byte_count = 0;
+ pend->c->tcp_is_reading = 0;
+ comm_point_start_listening(pend->c, s, -1);
+ /* set timer on the waiting_tcp entry, this is the write timeout
+ * for the written packet. The timer on pend->c is the timer
+ * for when there is no written packet and we have readtimeouts */
+#ifndef S_SPLINT_S
+ tv.tv_sec = w->timeout/1000;
+ tv.tv_usec = (w->timeout%1000)*1000;
+#endif
+ /* if the waiting_tcp was previously waiting for a buffer in the
+ * outside_network.tcpwaitlist, then the timer is reset now that
+ * we start writing it */
+ comm_timer_set(w->timer, &tv);
+}
+
/** use next free buffer to service a tcp query */
static int
-outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len)
+outnet_tcp_take_into_use(struct waiting_tcp* w)
{
struct pending_tcp* pend = w->outnet->tcp_free;
int s;
log_assert(pend);
- log_assert(pkt);
+ log_assert(w->pkt);
+ log_assert(w->pkt_len > 0);
log_assert(w->addrlen > 0);
+ pend->c->tcp_do_toggle_rw = 0;
+ pend->c->tcp_do_close = 0;
/* open socket */
s = outnet_get_tcp_fd(&w->addr, w->addrlen, w->outnet->tcp_mss, w->outnet->ip_dscp);
@@ -383,24 +648,65 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len)
return 0;
}
}
- w->pkt = NULL;
w->next_waiting = (void*)pend;
- pend->id = LDNS_ID_WIRE(pkt);
w->outnet->num_tcp_outgoing++;
w->outnet->tcp_free = pend->next_free;
pend->next_free = NULL;
pend->query = w;
+ pend->reuse.outnet = w->outnet;
pend->c->repinfo.addrlen = w->addrlen;
+ pend->c->tcp_more_read_again = &pend->reuse.cp_more_read_again;
+ pend->c->tcp_more_write_again = &pend->reuse.cp_more_write_again;
+ pend->reuse.cp_more_read_again = 0;
+ pend->reuse.cp_more_write_again = 0;
memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
- sldns_buffer_clear(pend->c->buffer);
- sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
- sldns_buffer_flip(pend->c->buffer);
- pend->c->tcp_is_reading = 0;
- pend->c->tcp_byte_count = 0;
- comm_point_start_listening(pend->c, s, -1);
+ pend->reuse.pending = pend;
+ if(pend->c->ssl)
+ pend->reuse.is_ssl = 1;
+ else pend->reuse.is_ssl = 0;
+ /* insert in reuse by address tree if not already inserted there */
+ (void)reuse_tcp_insert(w->outnet, pend);
+ reuse_tree_by_id_insert(&pend->reuse, w);
+ outnet_tcp_take_query_setup(s, pend, w);
return 1;
}
+/** Touch the lru of a reuse_tcp element, it is in use.
+ * This moves it to the front of the list, where it is not likely to
+ * be closed. Items at the back of the list are closed to make space. */
+static void
+reuse_tcp_lru_touch(struct outside_network* outnet, struct reuse_tcp* reuse)
+{
+ if(!reuse->item_on_lru_list)
+ return; /* not on the list, no lru to modify */
+ if(!reuse->lru_prev)
+ return; /* already first in the list */
+ /* remove at current position */
+ /* since it is not first, there is a previous element */
+ reuse->lru_prev->lru_next = reuse->lru_next;
+ if(reuse->lru_next)
+ reuse->lru_next->lru_prev = reuse->lru_prev;
+ else outnet->tcp_reuse_last = reuse->lru_prev;
+ /* insert at the front */
+ reuse->lru_prev = NULL;
+ reuse->lru_next = outnet->tcp_reuse_first;
+ /* since it is not first, it is not the only element and
+ * lru_next is thus not NULL and thus reuse is now not the last in
+ * the list, so outnet->tcp_reuse_last does not need to be modified */
+ outnet->tcp_reuse_first = reuse;
+}
+
+/** call callback on waiting_tcp, if not NULL */
+static void
+waiting_tcp_callback(struct waiting_tcp* w, struct comm_point* c, int error,
+ struct comm_reply* reply_info)
+{
+ if(w->cb) {
+ fptr_ok(fptr_whitelist_pending_tcp(w->cb));
+ (void)(*w->cb)(c, w->cb_arg, error, reply_info);
+ }
+}
+
/** see if buffers can be used to service TCP queries */
static void
use_free_buffer(struct outside_network* outnet)
@@ -408,25 +714,198 @@ use_free_buffer(struct outside_network* outnet)
struct waiting_tcp* w;
while(outnet->tcp_free && outnet->tcp_wait_first
&& !outnet->want_to_quit) {
+ struct reuse_tcp* reuse = NULL;
w = outnet->tcp_wait_first;
outnet->tcp_wait_first = w->next_waiting;
if(outnet->tcp_wait_last == w)
outnet->tcp_wait_last = NULL;
- if(!outnet_tcp_take_into_use(w, w->pkt, w->pkt_len)) {
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- waiting_tcp_delete(w);
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
+ w->on_tcp_waiting_list = 0;
+ reuse = reuse_tcp_find(outnet, &w->addr, w->addrlen,
+ w->ssl_upstream);
+ if(reuse) {
+ log_reuse_tcp(VERB_CLIENT, "use free buffer for waiting tcp: "
+ "found reuse", reuse);
+ reuse_tcp_lru_touch(outnet, reuse);
+ comm_timer_disable(w->timer);
+ w->next_waiting = (void*)reuse->pending;
+ reuse_tree_by_id_insert(reuse, w);
+ if(reuse->pending->query) {
+ /* on the write wait list */
+ reuse_write_wait_push_back(reuse, w);
+ } else {
+ /* write straight away */
+ /* stop the timer on read of the fd */
+ comm_point_stop_listening(reuse->pending->c);
+ reuse->pending->query = w;
+ outnet_tcp_take_query_setup(
+ reuse->pending->c->fd, reuse->pending,
+ w);
+ }
+ } else {
+ struct pending_tcp* pend = w->outnet->tcp_free;
+ rbtree_init(&pend->reuse.tree_by_id, reuse_id_cmp);
+ pend->reuse.pending = pend;
+ memcpy(&pend->reuse.addr, &w->addr, w->addrlen);
+ pend->reuse.addrlen = w->addrlen;
+ if(!outnet_tcp_take_into_use(w)) {
+ waiting_tcp_callback(w, NULL, NETEVENT_CLOSED,
+ NULL);
+ waiting_tcp_delete(w);
+ }
}
}
}
+/** add waiting_tcp element to the outnet tcp waiting list */
+static void
+outnet_add_tcp_waiting(struct outside_network* outnet, struct waiting_tcp* w)
+{
+ struct timeval tv;
+ if(w->on_tcp_waiting_list)
+ return;
+ w->next_waiting = NULL;
+ if(outnet->tcp_wait_last)
+ outnet->tcp_wait_last->next_waiting = w;
+ else outnet->tcp_wait_first = w;
+ outnet->tcp_wait_last = w;
+ w->on_tcp_waiting_list = 1;
+#ifndef S_SPLINT_S
+ tv.tv_sec = w->timeout/1000;
+ tv.tv_usec = (w->timeout%1000)*1000;
+#endif
+ comm_timer_set(w->timer, &tv);
+}
+
+/** delete element from tree by id */
+static void
+reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+ log_assert(w->id_node.key != NULL);
+ rbtree_delete(&reuse->tree_by_id, w);
+ w->id_node.key = NULL;
+}
+
+/** move writewait list to go for another connection. */
+static void
+reuse_move_writewait_away(struct outside_network* outnet,
+ struct pending_tcp* pend)
+{
+ /* the writewait list has not been written yet, so if the
+ * stream was closed, they have not actually been failed, only
+ * the queries written. Other queries can get written to another
+ * stream. For upstreams that do not support multiple queries
+ * and answers, the stream can get closed, and then the queries
+ * can get written on a new socket */
+ struct waiting_tcp* w;
+ if(pend->query && pend->query->error_count == 0 &&
+ pend->c->tcp_write_pkt == pend->query->pkt &&
+ pend->c->tcp_write_pkt_len == pend->query->pkt_len) {
+ /* since the current query is not written, it can also
+ * move to a free buffer */
+ if(verbosity >= VERB_CLIENT && pend->query->pkt_len > 12+2+2 &&
+ LDNS_QDCOUNT(pend->query->pkt) > 0 &&
+ dname_valid(pend->query->pkt+12, pend->query->pkt_len-12)) {
+ char buf[LDNS_MAX_DOMAINLEN+1];
+ dname_str(pend->query->pkt+12, buf);
+ verbose(VERB_CLIENT, "reuse_move_writewait_away current %s %d bytes were written",
+ buf, (int)pend->c->tcp_write_byte_count);
+ }
+ pend->c->tcp_write_pkt = NULL;
+ pend->c->tcp_write_pkt_len = 0;
+ pend->c->tcp_write_and_read = 0;
+ pend->reuse.cp_more_read_again = 0;
+ pend->reuse.cp_more_write_again = 0;
+ pend->c->tcp_is_reading = 1;
+ w = pend->query;
+ pend->query = NULL;
+ /* increase error count, so that if the next socket fails too
+ * the server selection is run again with this query failed
+ * and it can select a different server (if possible), or
+ * fail the query */
+ w->error_count ++;
+ reuse_tree_by_id_delete(&pend->reuse, w);
+ outnet_add_tcp_waiting(outnet, w);
+ }
+ while((w = reuse_write_wait_pop(&pend->reuse)) != NULL) {
+ if(verbosity >= VERB_CLIENT && w->pkt_len > 12+2+2 &&
+ LDNS_QDCOUNT(w->pkt) > 0 &&
+ dname_valid(w->pkt+12, w->pkt_len-12)) {
+ char buf[LDNS_MAX_DOMAINLEN+1];
+ dname_str(w->pkt+12, buf);
+ verbose(VERB_CLIENT, "reuse_move_writewait_away item %s", buf);
+ }
+ reuse_tree_by_id_delete(&pend->reuse, w);
+ outnet_add_tcp_waiting(outnet, w);
+ }
+}
+
+/** remove reused element from tree and lru list */
+static void
+reuse_tcp_remove_tree_list(struct outside_network* outnet,
+ struct reuse_tcp* reuse)
+{
+ verbose(VERB_CLIENT, "reuse_tcp_remove_tree_list");
+ if(reuse->node.key) {
+ /* delete it from reuse tree */
+ (void)rbtree_delete(&outnet->tcp_reuse, &reuse->node);
+ reuse->node.key = NULL;
+ }
+ /* delete from reuse list */
+ if(reuse->item_on_lru_list) {
+ if(reuse->lru_prev) {
+ /* assert that members of the lru list are waiting
+ * and thus have a pending pointer to the struct */
+ log_assert(reuse->lru_prev->pending);
+ reuse->lru_prev->lru_next = reuse->lru_next;
+ } else {
+ log_assert(!reuse->lru_next || reuse->lru_next->pending);
+ outnet->tcp_reuse_first = reuse->lru_next;
+ }
+ if(reuse->lru_next) {
+ /* assert that members of the lru list are waiting
+ * and thus have a pending pointer to the struct */
+ log_assert(reuse->lru_next->pending);
+ reuse->lru_next->lru_prev = reuse->lru_prev;
+ } else {
+ log_assert(!reuse->lru_prev || reuse->lru_prev->pending);
+ outnet->tcp_reuse_last = reuse->lru_prev;
+ }
+ reuse->item_on_lru_list = 0;
+ }
+}
+
+/** helper function that deletes an element from the tree of readwait
+ * elements in tcp reuse structure */
+static void reuse_del_readwait_elem(rbnode_type* node, void* ATTR_UNUSED(arg))
+{
+ struct waiting_tcp* w = (struct waiting_tcp*)node->key;
+ waiting_tcp_delete(w);
+}
+
+/** delete readwait waiting_tcp elements, deletes the elements in the list */
+void reuse_del_readwait(rbtree_type* tree_by_id)
+{
+ if(tree_by_id->root == NULL ||
+ tree_by_id->root == RBTREE_NULL)
+ return;
+ traverse_postorder(tree_by_id, &reuse_del_readwait_elem, NULL);
+ rbtree_init(tree_by_id, reuse_id_cmp);
+}
+
/** decommission a tcp buffer, closes commpoint and frees waiting_tcp entry */
static void
decommission_pending_tcp(struct outside_network* outnet,
struct pending_tcp* pend)
{
+ verbose(VERB_CLIENT, "decommission_pending_tcp");
+ pend->next_free = outnet->tcp_free;
+ outnet->tcp_free = pend;
+ if(pend->reuse.node.key) {
+ /* needs unlink from the reuse tree to get deleted */
+ reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+ }
+ /* free SSL structure after remove from outnet tcp reuse tree,
+ * because the c->ssl null or not is used for sorting in the tree */
if(pend->c->ssl) {
#ifdef HAVE_SSL
SSL_shutdown(pend->c->ssl);
@@ -435,11 +914,68 @@ decommission_pending_tcp(struct outside_network* outnet,
#endif
}
comm_point_close(pend->c);
- pend->next_free = outnet->tcp_free;
- outnet->tcp_free = pend;
- waiting_tcp_delete(pend->query);
+ pend->reuse.cp_more_read_again = 0;
+ pend->reuse.cp_more_write_again = 0;
+ /* unlink the query and writewait list, it is part of the tree
+ * nodes and is deleted */
pend->query = NULL;
- use_free_buffer(outnet);
+ pend->reuse.write_wait_first = NULL;
+ pend->reuse.write_wait_last = NULL;
+ reuse_del_readwait(&pend->reuse.tree_by_id);
+}
+
+/** perform failure callbacks for waiting queries in reuse read rbtree */
+static void reuse_cb_readwait_for_failure(rbtree_type* tree_by_id, int err)
+{
+ rbnode_type* node;
+ if(tree_by_id->root == NULL ||
+ tree_by_id->root == RBTREE_NULL)
+ return;
+ node = rbtree_first(tree_by_id);
+ while(node && node != RBTREE_NULL) {
+ struct waiting_tcp* w = (struct waiting_tcp*)node->key;
+ waiting_tcp_callback(w, NULL, err, NULL);
+ node = rbtree_next(node);
+ }
+}
+
+/** perform callbacks for failure and also decommission pending tcp.
+ * the callbacks remove references in sq->pending to the waiting_tcp
+ * members of the tree_by_id in the pending tcp. The pending_tcp is
+ * removed before the callbacks, so that the callbacks do not modify
+ * the pending_tcp due to its reference in the outside_network reuse tree */
+static void reuse_cb_and_decommission(struct outside_network* outnet,
+ struct pending_tcp* pend, int error)
+{
+ rbtree_type store;
+ store = pend->reuse.tree_by_id;
+ pend->query = NULL;
+ rbtree_init(&pend->reuse.tree_by_id, reuse_id_cmp);
+ pend->reuse.write_wait_first = NULL;
+ pend->reuse.write_wait_last = NULL;
+ decommission_pending_tcp(outnet, pend);
+ reuse_cb_readwait_for_failure(&store, error);
+ reuse_del_readwait(&store);
+}
+
+/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
+static void
+reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp)
+{
+ log_reuse_tcp(VERB_CLIENT, "reuse_tcp_setup_timeout", &pend_tcp->reuse);
+ comm_point_start_listening(pend_tcp->c, -1, REUSE_TIMEOUT);
+}
+
+/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
+static void
+reuse_tcp_setup_read_and_timeout(struct pending_tcp* pend_tcp)
+{
+ log_reuse_tcp(VERB_CLIENT, "reuse_tcp_setup_readtimeout", &pend_tcp->reuse);
+ sldns_buffer_clear(pend_tcp->c->buffer);
+ pend_tcp->c->tcp_is_reading = 1;
+ pend_tcp->c->tcp_byte_count = 0;
+ comm_point_stop_listening(pend_tcp->c);
+ comm_point_start_listening(pend_tcp->c, -1, REUSE_TIMEOUT);
}
int
@@ -447,24 +983,116 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
struct comm_reply *reply_info)
{
struct pending_tcp* pend = (struct pending_tcp*)arg;
- struct outside_network* outnet = pend->query->outnet;
+ struct outside_network* outnet = pend->reuse.outnet;
+ struct waiting_tcp* w = NULL;
verbose(VERB_ALGO, "outnettcp cb");
- if(error != NETEVENT_NOERROR) {
+ if(error == NETEVENT_TIMEOUT) {
+ if(pend->c->tcp_write_and_read) {
+ verbose(VERB_QUERY, "outnettcp got tcp timeout "
+ "for read, ignored because write underway");
+ /* if we are writing, ignore readtimer, wait for write timer
+ * or write is done */
+ return 0;
+ } else {
+ verbose(VERB_QUERY, "outnettcp got tcp timeout %s",
+ (pend->reuse.tree_by_id.count?"for reading pkt":
+ "for keepalive for reuse"));
+ }
+ /* must be timeout for reading or keepalive reuse,
+ * close it. */
+ reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+ } else if(error == NETEVENT_PKT_WRITTEN) {
+ /* the packet we want to write has been written. */
+ verbose(VERB_ALGO, "outnet tcp pkt was written event");
+ log_assert(c == pend->c);
+ log_assert(pend->query->pkt == pend->c->tcp_write_pkt);
+ log_assert(pend->query->pkt_len == pend->c->tcp_write_pkt_len);
+ pend->c->tcp_write_pkt = NULL;
+ pend->c->tcp_write_pkt_len = 0;
+ /* the pend.query is already in tree_by_id */
+ log_assert(pend->query->id_node.key);
+ pend->query = NULL;
+ /* setup to write next packet or setup read timeout */
+ if(pend->reuse.write_wait_first) {
+ verbose(VERB_ALGO, "outnet tcp setup next pkt");
+ /* we can write it straight away perhaps, set flag
+ * because this callback called after a tcp write
+ * succeeded and likely more buffer space is available
+ * and we can write some more. */
+ pend->reuse.cp_more_write_again = 1;
+ pend->query = reuse_write_wait_pop(&pend->reuse);
+ comm_point_stop_listening(pend->c);
+ outnet_tcp_take_query_setup(pend->c->fd, pend,
+ pend->query);
+ } else {
+ verbose(VERB_ALGO, "outnet tcp writes done, wait");
+ pend->c->tcp_write_and_read = 0;
+ pend->reuse.cp_more_read_again = 0;
+ pend->reuse.cp_more_write_again = 0;
+ pend->c->tcp_is_reading = 1;
+ comm_point_stop_listening(pend->c);
+ reuse_tcp_setup_timeout(pend);
+ }
+ return 0;
+ } else if(error != NETEVENT_NOERROR) {
verbose(VERB_QUERY, "outnettcp got tcp error %d", error);
+ reuse_move_writewait_away(outnet, pend);
/* pass error below and exit */
} else {
/* check ID */
- if(sldns_buffer_limit(c->buffer) < sizeof(uint16_t) ||
- LDNS_ID_WIRE(sldns_buffer_begin(c->buffer))!=pend->id) {
+ if(sldns_buffer_limit(c->buffer) < sizeof(uint16_t)) {
log_addr(VERB_QUERY,
- "outnettcp: bad ID in reply, from:",
- &pend->query->addr, pend->query->addrlen);
+ "outnettcp: bad ID in reply, too short, from:",
+ &pend->reuse.addr, pend->reuse.addrlen);
error = NETEVENT_CLOSED;
- }
+ } else {
+ uint16_t id = LDNS_ID_WIRE(sldns_buffer_begin(
+ c->buffer));
+ /* find the query the reply is for */
+ w = reuse_tcp_by_id_find(&pend->reuse, id);
+ }
+ }
+ if(error == NETEVENT_NOERROR && !w) {
+ /* no struct waiting found in tree, no reply to call */
+ log_addr(VERB_QUERY, "outnettcp: bad ID in reply, from:",
+ &pend->reuse.addr, pend->reuse.addrlen);
+ error = NETEVENT_CLOSED;
+ }
+ if(error == NETEVENT_NOERROR) {
+ /* add to reuse tree so it can be reused, if not a failure.
+ * This is possible if the state machine wants to make a tcp
+ * query again to the same destination. */
+ if(outnet->tcp_reuse.count < outnet->tcp_reuse_max) {
+ (void)reuse_tcp_insert(outnet, pend);
+ }
+ }
+ if(w) {
+ reuse_tree_by_id_delete(&pend->reuse, w);
+ verbose(VERB_CLIENT, "outnet tcp callback query err %d buflen %d",
+ error, (int)sldns_buffer_limit(c->buffer));
+ waiting_tcp_callback(w, c, error, reply_info);
+ waiting_tcp_delete(w);
+ }
+ verbose(VERB_CLIENT, "outnet_tcp_cb reuse after cb");
+ if(error == NETEVENT_NOERROR && pend->reuse.node.key) {
+ verbose(VERB_CLIENT, "outnet_tcp_cb reuse after cb: keep it");
+ /* it is in the reuse_tcp tree, with other queries, or
+ * on the empty list. do not decommission it */
+ /* if there are more outstanding queries, we could try to
+ * read again, to see if it is on the input,
+ * because this callback called after a successful read
+ * and there could be more bytes to read on the input */
+ if(pend->reuse.tree_by_id.count != 0)
+ pend->reuse.cp_more_read_again = 1;
+ reuse_tcp_setup_read_and_timeout(pend);
+ return 0;
}
- fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
- (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
- decommission_pending_tcp(outnet, pend);
+ verbose(VERB_CLIENT, "outnet_tcp_cb reuse after cb: decommission it");
+ /* no queries on it, no space to keep it. or timeout or closed due
+ * to error. Close it */
+ reuse_cb_and_decommission(outnet, pend, (error==NETEVENT_TIMEOUT?
+ NETEVENT_TIMEOUT:NETEVENT_CLOSED));
+ use_free_buffer(outnet);
return 0;
}
@@ -723,7 +1351,8 @@ outside_network_create(struct comm_base *base, size_t bufsize,
struct ub_randstate* rnd, int use_caps_for_id, int* availports,
int numavailports, size_t unwanted_threshold, int tcp_mss,
void (*unwanted_action)(void*), void* unwanted_param, int do_udp,
- void* sslctx, int delayclose, int tls_use_sni, struct dt_env* dtenv)
+ void* sslctx, int delayclose, int tls_use_sni, struct dt_env* dtenv,
+ int udp_connect)
{
struct outside_network* outnet = (struct outside_network*)
calloc(1, sizeof(struct outside_network));
@@ -761,6 +1390,9 @@ outside_network_create(struct comm_base *base, size_t bufsize,
outnet->delay_tv.tv_usec = (delayclose%1000)*1000;
}
#endif
+ if(udp_connect) {
+ outnet->udp_connect = 1;
+ }
if(numavailports == 0 || num_ports == 0) {
log_err("no outgoing ports available");
outside_network_delete(outnet);
@@ -795,6 +1427,8 @@ outside_network_create(struct comm_base *base, size_t bufsize,
outside_network_delete(outnet);
return NULL;
}
+ rbtree_init(&outnet->tcp_reuse, reuse_cmp);
+ outnet->tcp_reuse_max = num_tcp;
/* allocate commpoints */
for(k=0; k<num_ports; k++) {
@@ -958,6 +1592,17 @@ outside_network_delete(struct outside_network* outnet)
size_t i;
for(i=0; i<outnet->num_tcp; i++)
if(outnet->tcp_conns[i]) {
+ if(outnet->tcp_conns[i]->query &&
+ !outnet->tcp_conns[i]->query->
+ on_tcp_waiting_list) {
+ /* delete waiting_tcp elements that
+ * the tcp conn is working on */
+ struct pending_tcp* pend =
+ (struct pending_tcp*)outnet->
+ tcp_conns[i]->query->
+ next_waiting;
+ decommission_pending_tcp(outnet, pend);
+ }
comm_point_delete(outnet->tcp_conns[i]->c);
waiting_tcp_delete(outnet->tcp_conns[i]->query);
free(outnet->tcp_conns[i]);
@@ -972,6 +1617,10 @@ outside_network_delete(struct outside_network* outnet)
p = np;
}
}
+ /* was allocated in struct pending that was deleted above */
+ rbtree_init(&outnet->tcp_reuse, reuse_cmp);
+ outnet->tcp_reuse_first = NULL;
+ outnet->tcp_reuse_last = NULL;
if(outnet->udp_wait_first) {
struct pending* p = outnet->udp_wait_first, *np;
while(p) {
@@ -1115,13 +1764,26 @@ select_ifport(struct outside_network* outnet, struct pending* pend,
my_if = ub_random_max(outnet->rnd, num_if);
pif = &ifs[my_if];
#ifndef DISABLE_EXPLICIT_PORT_RANDOMISATION
- my_port = ub_random_max(outnet->rnd, pif->avail_total);
- if(my_port < pif->inuse) {
- /* port already open */
- pend->pc = pif->out[my_port];
- verbose(VERB_ALGO, "using UDP if=%d port=%d",
- my_if, pend->pc->number);
- break;
+ if(outnet->udp_connect) {
+ /* if we connect() we cannot reuse fds for a port */
+ if(pif->inuse >= pif->avail_total) {
+ tries++;
+ if(tries < MAX_PORT_RETRY)
+ continue;
+ log_err("failed to find an open port, drop msg");
+ return 0;
+ }
+ my_port = pif->inuse + ub_random_max(outnet->rnd,
+ pif->avail_total - pif->inuse);
+ } else {
+ my_port = ub_random_max(outnet->rnd, pif->avail_total);
+ if(my_port < pif->inuse) {
+ /* port already open */
+ pend->pc = pif->out[my_port];
+ verbose(VERB_ALGO, "using UDP if=%d port=%d",
+ my_if, pend->pc->number);
+ break;
+ }
}
/* try to open new port, if fails, loop to try again */
log_assert(pif->inuse < pif->maxout);
@@ -1138,6 +1800,17 @@ select_ifport(struct outside_network* outnet, struct pending* pend,
if(fd != -1) {
verbose(VERB_ALGO, "opened UDP if=%d port=%d",
my_if, portno);
+ if(outnet->udp_connect) {
+ /* connect() to the destination */
+ if(connect(fd, (struct sockaddr*)&pend->addr,
+ pend->addrlen) < 0) {
+ log_err_addr("udp connect failed",
+ strerror(errno), &pend->addr,
+ pend->addrlen);
+ sock_close(fd);
+ return 0;
+ }
+ }
/* grab fd */
pend->pc = outnet->unused_fds;
outnet->unused_fds = pend->pc->next;
@@ -1197,10 +1870,17 @@ randomize_and_send_udp(struct pending* pend, sldns_buffer* packet, int timeout)
log_assert(pend->pc && pend->pc->cp);
/* send it over the commlink */
- if(!comm_point_send_udp_msg(pend->pc->cp, packet,
- (struct sockaddr*)&pend->addr, pend->addrlen)) {
- portcomm_loweruse(outnet, pend->pc);
- return 0;
+ if(outnet->udp_connect) {
+ if(!comm_point_send_udp_msg(pend->pc->cp, packet, NULL, 0)) {
+ portcomm_loweruse(outnet, pend->pc);
+ return 0;
+ }
+ } else {
+ if(!comm_point_send_udp_msg(pend->pc->cp, packet,
+ (struct sockaddr*)&pend->addr, pend->addrlen)) {
+ portcomm_loweruse(outnet, pend->pc);
+ return 0;
+ }
}
/* system calls to set timeout after sending UDP to make roundtrip
@@ -1273,45 +1953,152 @@ outnet_tcptimer(void* arg)
{
struct waiting_tcp* w = (struct waiting_tcp*)arg;
struct outside_network* outnet = w->outnet;
- comm_point_callback_type* cb;
- void* cb_arg;
- if(w->pkt) {
+ verbose(VERB_CLIENT, "outnet_tcptimer");
+ if(w->on_tcp_waiting_list) {
/* it is on the waiting list */
waiting_list_remove(outnet, w);
+ waiting_tcp_callback(w, NULL, NETEVENT_TIMEOUT, NULL);
+ waiting_tcp_delete(w);
} else {
/* it was in use */
struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
- if(pend->c->ssl) {
-#ifdef HAVE_SSL
- SSL_shutdown(pend->c->ssl);
- SSL_free(pend->c->ssl);
- pend->c->ssl = NULL;
-#endif
- }
- comm_point_close(pend->c);
- pend->query = NULL;
- pend->next_free = outnet->tcp_free;
- outnet->tcp_free = pend;
+ reuse_cb_and_decommission(outnet, pend, NETEVENT_TIMEOUT);
}
- cb = w->cb;
- cb_arg = w->cb_arg;
- waiting_tcp_delete(w);
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
use_free_buffer(outnet);
}
+/** close the oldest reuse_tcp connection to make a fd and struct pend
+ * available for a new stream connection */
+static void
+reuse_tcp_close_oldest(struct outside_network* outnet)
+{
+ struct pending_tcp* pend;
+ verbose(VERB_CLIENT, "reuse_tcp_close_oldest");
+ if(!outnet->tcp_reuse_last) return;
+ pend = outnet->tcp_reuse_last->pending;
+
+ /* snip off of LRU */
+ log_assert(pend->reuse.lru_next == NULL);
+ if(pend->reuse.lru_prev) {
+ outnet->tcp_reuse_last = pend->reuse.lru_prev;
+ pend->reuse.lru_prev->lru_next = NULL;
+ } else {
+ outnet->tcp_reuse_last = NULL;
+ outnet->tcp_reuse_first = NULL;
+ }
+ pend->reuse.item_on_lru_list = 0;
+
+ /* free up */
+ reuse_cb_and_decommission(outnet, pend, NETEVENT_CLOSED);
+}
+
+/** find spare ID value for reuse tcp stream. That is random and also does
+ * not collide with an existing query ID that is in use or waiting */
+uint16_t
+reuse_tcp_select_id(struct reuse_tcp* reuse, struct outside_network* outnet)
+{
+ uint16_t id = 0, curid, nextid;
+ const int try_random = 2000;
+ int i;
+ unsigned select, count, space;
+ rbnode_type* node;
+
+ /* make really sure the tree is not empty */
+ if(reuse->tree_by_id.count == 0) {
+ id = ((unsigned)ub_random(outnet->rnd)>>8) & 0xffff;
+ return id;
+ }
+
+ /* try to find random empty spots by picking them */
+ for(i = 0; i<try_random; i++) {
+ id = ((unsigned)ub_random(outnet->rnd)>>8) & 0xffff;
+ if(!reuse_tcp_by_id_find(reuse, id)) {
+ return id;
+ }
+ }
+
+ /* equally pick a random unused element from the tree that is
+ * not in use. Pick a the n-th index of an ununused number,
+ * then loop over the empty spaces in the tree and find it */
+ log_assert(reuse->tree_by_id.count < 0xffff);
+ select = ub_random_max(outnet->rnd, 0xffff - reuse->tree_by_id.count);
+ /* select value now in 0 .. num free - 1 */
+
+ count = 0; /* number of free spaces passed by */
+ node = rbtree_first(&reuse->tree_by_id);
+ log_assert(node && node != RBTREE_NULL); /* tree not empty */
+ /* see if select is before first node */
+ if(select < tree_by_id_get_id(node))
+ return select;
+ count += tree_by_id_get_id(node);
+ /* perhaps select is between nodes */
+ while(node && node != RBTREE_NULL) {
+ rbnode_type* next = rbtree_next(node);
+ if(next && next != RBTREE_NULL) {
+ curid = tree_by_id_get_id(node);
+ nextid = tree_by_id_get_id(next);
+ log_assert(curid < nextid);
+ if(curid != 0xffff && curid + 1 < nextid) {
+ /* space between nodes */
+ space = nextid - curid - 1;
+ log_assert(select >= count);
+ if(select < count + space) {
+ /* here it is */
+ return curid + 1 + (select - count);
+ }
+ count += space;
+ }
+ }
+ node = next;
+ }
+
+ /* select is after the last node */
+ /* count is the number of free positions before the nodes in the
+ * tree */
+ node = rbtree_last(&reuse->tree_by_id);
+ log_assert(node && node != RBTREE_NULL); /* tree not empty */
+ curid = tree_by_id_get_id(node);
+ log_assert(count + (0xffff-curid) + reuse->tree_by_id.count == 0xffff);
+ return curid + 1 + (select - count);
+}
+
struct waiting_tcp*
pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
int timeout, comm_point_callback_type* callback, void* callback_arg)
{
struct pending_tcp* pend = sq->outnet->tcp_free;
+ struct reuse_tcp* reuse = NULL;
struct waiting_tcp* w;
- struct timeval tv;
- uint16_t id;
- /* if no buffer is free allocate space to store query */
+
+ verbose(VERB_CLIENT, "pending_tcp_query");
+ if(sldns_buffer_limit(packet) < sizeof(uint16_t)) {
+ verbose(VERB_ALGO, "pending tcp query with too short buffer < 2");
+ return NULL;
+ }
+
+ /* find out if a reused stream to the target exists */
+ /* if so, take it into use */
+ reuse = reuse_tcp_find(sq->outnet, &sq->addr, sq->addrlen,
+ sq->ssl_upstream);
+ if(reuse) {
+ log_reuse_tcp(VERB_CLIENT, "pending_tcp_query: found reuse", reuse);
+ log_assert(reuse->pending);
+ pend = reuse->pending;
+ reuse_tcp_lru_touch(sq->outnet, reuse);
+ }
+
+ /* if !pend but we have reuse streams, close a reuse stream
+ * to be able to open a new one to this target, no use waiting
+ * to reuse a file descriptor while another query needs to use
+ * that buffer and file descriptor now. */
+ if(!pend) {
+ reuse_tcp_close_oldest(sq->outnet);
+ pend = sq->outnet->tcp_free;
+ }
+
+ /* allocate space to store query */
w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp)
- + (pend?0:sldns_buffer_limit(packet)));
+ + sldns_buffer_limit(packet));
if(!w) {
return NULL;
}
@@ -1319,47 +2106,76 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
free(w);
return NULL;
}
- w->pkt = NULL;
- w->pkt_len = 0;
- id = ((unsigned)ub_random(sq->outnet->rnd)>>8) & 0xffff;
- LDNS_ID_SET(sldns_buffer_begin(packet), id);
+ w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp);
+ w->pkt_len = sldns_buffer_limit(packet);
+ memmove(w->pkt, sldns_buffer_begin(packet), w->pkt_len);
+ if(reuse)
+ w->id = reuse_tcp_select_id(reuse, sq->outnet);
+ else w->id = ((unsigned)ub_random(sq->outnet->rnd)>>8) & 0xffff;
+ LDNS_ID_SET(w->pkt, w->id);
memcpy(&w->addr, &sq->addr, sq->addrlen);
w->addrlen = sq->addrlen;
w->outnet = sq->outnet;
+ w->on_tcp_waiting_list = 0;
+ w->next_waiting = NULL;
w->cb = callback;
w->cb_arg = callback_arg;
w->ssl_upstream = sq->ssl_upstream;
w->tls_auth_name = sq->tls_auth_name;
-#ifndef S_SPLINT_S
- tv.tv_sec = timeout/1000;
- tv.tv_usec = (timeout%1000)*1000;
-#endif
- comm_timer_set(w->timer, &tv);
+ w->timeout = timeout;
+ w->id_node.key = NULL;
+ w->write_wait_prev = NULL;
+ w->write_wait_next = NULL;
+ w->write_wait_queued = 0;
+ w->error_count = 0;
if(pend) {
/* we have a buffer available right now */
- if(!outnet_tcp_take_into_use(w, sldns_buffer_begin(packet),
- sldns_buffer_limit(packet))) {
- waiting_tcp_delete(w);
- return NULL;
+ if(reuse) {
+ /* reuse existing fd, write query and continue */
+ /* store query in tree by id */
+ verbose(VERB_CLIENT, "pending_tcp_query: reuse, store");
+ w->next_waiting = (void*)pend;
+ reuse_tree_by_id_insert(&pend->reuse, w);
+ /* can we write right now? */
+ if(pend->query == NULL) {
+ /* write straight away */
+ /* stop the timer on read of the fd */
+ comm_point_stop_listening(pend->c);
+ pend->query = w;
+ outnet_tcp_take_query_setup(pend->c->fd, pend,
+ w);
+ } else {
+ /* put it in the waiting list for
+ * this stream */
+ reuse_write_wait_push_back(&pend->reuse, w);
+ }
+ } else {
+ /* create new fd and connect to addr, setup to
+ * write query */
+ verbose(VERB_CLIENT, "pending_tcp_query: new fd, connect");
+ rbtree_init(&pend->reuse.tree_by_id, reuse_id_cmp);
+ pend->reuse.pending = pend;
+ memcpy(&pend->reuse.addr, &sq->addr, sq->addrlen);
+ pend->reuse.addrlen = sq->addrlen;
+ if(!outnet_tcp_take_into_use(w)) {
+ waiting_tcp_delete(w);
+ return NULL;
+ }
}
-#ifdef USE_DNSTAP
- if(sq->outnet->dtenv &&
- (sq->outnet->dtenv->log_resolver_query_messages ||
- sq->outnet->dtenv->log_forwarder_query_messages))
- dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
- comm_tcp, sq->zone, sq->zonelen, packet);
-#endif
} else {
/* queue up */
- w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp);
- w->pkt_len = sldns_buffer_limit(packet);
- memmove(w->pkt, sldns_buffer_begin(packet), w->pkt_len);
- w->next_waiting = NULL;
- if(sq->outnet->tcp_wait_last)
- sq->outnet->tcp_wait_last->next_waiting = w;
- else sq->outnet->tcp_wait_first = w;
- sq->outnet->tcp_wait_last = w;
+ /* waiting for a buffer on the outside network buffer wait
+ * list */
+ verbose(VERB_CLIENT, "pending_tcp_query: queue to wait");
+ outnet_add_tcp_waiting(sq->outnet, w);
}
+#ifdef USE_DNSTAP
+ if(sq->outnet->dtenv &&
+ (sq->outnet->dtenv->log_resolver_query_messages ||
+ sq->outnet->dtenv->log_forwarder_query_messages))
+ dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
+ comm_tcp, sq->zone, sq->zonelen, packet);
+#endif
return w;
}
@@ -1477,6 +2293,7 @@ static void
waiting_list_remove(struct outside_network* outnet, struct waiting_tcp* w)
{
struct waiting_tcp* p = outnet->tcp_wait_first, *prev = NULL;
+ w->on_tcp_waiting_list = 0;
while(p) {
if(p == w) {
/* remove w */
@@ -1492,10 +2309,53 @@ waiting_list_remove(struct outside_network* outnet, struct waiting_tcp* w)
}
}
+/** reuse tcp stream, remove serviced query from stream,
+ * return true if the stream is kept, false if it is to be closed */
+static int
+reuse_tcp_remove_serviced_keep(struct waiting_tcp* w,
+ struct serviced_query* sq)
+{
+ struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting;
+ verbose(VERB_CLIENT, "reuse_tcp_remove_serviced_keep");
+ /* remove the callback. let query continue to write to not cancel
+ * the stream itself. also keep it as an entry in the tree_by_id,
+ * in case the answer returns (that we no longer want), but we cannot
+ * pick the same ID number meanwhile */
+ w->cb = NULL;
+ /* see if can be entered in reuse tree
+ * for that the FD has to be non-1 */
+ if(pend_tcp->c->fd == -1) {
+ verbose(VERB_CLIENT, "reuse_tcp_remove_serviced_keep: -1 fd");
+ return 0;
+ }
+ /* if in tree and used by other queries */
+ if(pend_tcp->reuse.node.key) {
+ verbose(VERB_CLIENT, "reuse_tcp_remove_serviced_keep: in use by other queries");
+ /* do not reset the keepalive timer, for that
+ * we'd need traffic, and this is where the serviced is
+ * removed due to state machine internal reasons,
+ * eg. iterator no longer interested in this query */
+ return 1;
+ }
+ /* if still open and want to keep it open */
+ if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count <
+ sq->outnet->tcp_reuse_max) {
+ verbose(VERB_CLIENT, "reuse_tcp_remove_serviced_keep: keep open");
+ /* set a keepalive timer on it */
+ if(!reuse_tcp_insert(sq->outnet, pend_tcp)) {
+ return 0;
+ }
+ reuse_tcp_setup_timeout(pend_tcp);
+ return 1;
+ }
+ return 0;
+}
+
/** cleanup serviced query entry */
static void
serviced_delete(struct serviced_query* sq)
{
+ verbose(VERB_CLIENT, "serviced_delete");
if(sq->pending) {
/* clear up the pending query */
if(sq->status == serviced_query_UDP_EDNS ||
@@ -1503,6 +2363,7 @@ serviced_delete(struct serviced_query* sq)
sq->status == serviced_query_UDP_EDNS_FRAG ||
sq->status == serviced_query_UDP_EDNS_fallback) {
struct pending* p = (struct pending*)sq->pending;
+ verbose(VERB_CLIENT, "serviced_delete: UDP");
if(p->pc)
portcomm_loweruse(sq->outnet, p->pc);
pending_delete(sq->outnet, p);
@@ -1510,14 +2371,32 @@ serviced_delete(struct serviced_query* sq)
* mesh */
outnet_send_wait_udp(sq->outnet);
} else {
- struct waiting_tcp* p = (struct waiting_tcp*)
+ struct waiting_tcp* w = (struct waiting_tcp*)
sq->pending;
- if(p->pkt == NULL) {
- decommission_pending_tcp(sq->outnet,
- (struct pending_tcp*)p->next_waiting);
+ verbose(VERB_CLIENT, "serviced_delete: TCP");
+ /* if on stream-write-waiting list then
+ * remove from waiting list and waiting_tcp_delete */
+ if(w->write_wait_queued) {
+ struct pending_tcp* pend =
+ (struct pending_tcp*)w->next_waiting;
+ verbose(VERB_CLIENT, "serviced_delete: writewait");
+ reuse_tree_by_id_delete(&pend->reuse, w);
+ reuse_write_wait_remove(&pend->reuse, w);
+ waiting_tcp_delete(w);
+ } else if(!w->on_tcp_waiting_list) {
+ struct pending_tcp* pend =
+ (struct pending_tcp*)w->next_waiting;
+ verbose(VERB_CLIENT, "serviced_delete: tcpreusekeep");
+ if(!reuse_tcp_remove_serviced_keep(w, sq)) {
+ reuse_cb_and_decommission(sq->outnet,
+ pend, NETEVENT_CLOSED);
+ use_free_buffer(sq->outnet);
+ }
+ sq->pending = NULL;
} else {
- waiting_list_remove(sq->outnet, p);
- waiting_tcp_delete(p);
+ verbose(VERB_CLIENT, "serviced_delete: tcpwait");
+ waiting_list_remove(sq->outnet, w);
+ waiting_tcp_delete(w);
}
}
}
@@ -2097,18 +2976,18 @@ outnet_serviced_query(struct outside_network* outnet,
{
struct serviced_query* sq;
struct service_callback* cb;
- struct edns_tag_addr* client_tag_addr;
+ struct edns_string_addr* client_string_addr;
if(!inplace_cb_query_call(env, qinfo, flags, addr, addrlen, zone, zonelen,
qstate, qstate->region))
return NULL;
- if((client_tag_addr = edns_tag_addr_lookup(&env->edns_tags->client_tags,
- addr, addrlen))) {
- uint16_t client_tag = htons(client_tag_addr->tag_data);
+ if((client_string_addr = edns_string_addr_lookup(
+ &env->edns_strings->client_strings, addr, addrlen))) {
edns_opt_list_append(&qstate->edns_opts_back_out,
- env->edns_tags->client_tag_opcode, 2,
- (uint8_t*)&client_tag, qstate->region);
+ env->edns_strings->client_string_opcode,
+ client_string_addr->string_len,
+ client_string_addr->string, qstate->region);
}
serviced_gen_query(buff, qinfo->qname, qinfo->qname_len, qinfo->qtype,