aboutsummaryrefslogtreecommitdiff
path: root/sntp/libevent/bufferevent.c
diff options
context:
space:
mode:
Diffstat (limited to 'sntp/libevent/bufferevent.c')
-rw-r--r--sntp/libevent/bufferevent.c149
1 files changed, 98 insertions, 51 deletions
diff --git a/sntp/libevent/bufferevent.c b/sntp/libevent/bufferevent.c
index d298d0b3f013..08c0486c087d 100644
--- a/sntp/libevent/bufferevent.c
+++ b/sntp/libevent/bufferevent.c
@@ -45,7 +45,6 @@
#ifdef _WIN32
#include <winsock2.h>
#endif
-#include <errno.h>
#include "event2/util.h"
#include "event2/buffer.h"
@@ -67,8 +66,7 @@ static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
void
bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
if (!bufev_private->read_suspended)
bufev->be_ops->disable(bufev, EV_READ);
@@ -79,8 +77,7 @@ bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags w
void
bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
bufev_private->read_suspended &= ~what;
if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
@@ -91,8 +88,7 @@ bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags
void
bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
if (!bufev_private->write_suspended)
bufev->be_ops->disable(bufev, EV_WRITE);
@@ -103,8 +99,7 @@ bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags
void
bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
bufev_private->write_suspended &= ~what;
if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
@@ -112,6 +107,28 @@ bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flag
BEV_UNLOCK(bufev);
}
+/**
+ * Sometimes bufferevent's implementation can overrun high watermarks
+ * (one of examples is openssl) and in this case if the read callback
+ * will not handle enough data do over condition above the read
+ * callback will never be called again (due to suspend above).
+ *
+ * To avoid this we are scheduling read callback again here, but only
+ * from the user callback to avoid multiple scheduling:
+ * - when the data had been added to it
+ * - when the data had been drained from it (user specified read callback)
+ */
+static void bufferevent_inbuf_wm_check(struct bufferevent *bev)
+{
+ if (!bev->wm_read.high)
+ return;
+ if (!(bev->enabled & EV_READ))
+ return;
+ if (evbuffer_get_length(bev->input) < bev->wm_read.high)
+ return;
+
+ bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
+}
/* Callback to implement watermarks on the input buffer. Only enabled
* if the watermark is set. */
@@ -148,6 +165,7 @@ bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
if (bufev_private->readcb_pending && bufev->readcb) {
bufev_private->readcb_pending = 0;
bufev->readcb(bufev, bufev->cbarg);
+ bufferevent_inbuf_wm_check(bufev);
}
if (bufev_private->writecb_pending && bufev->writecb) {
bufev_private->writecb_pending = 0;
@@ -188,6 +206,7 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg
void *cbarg = bufev->cbarg;
bufev_private->readcb_pending = 0;
UNLOCKED(readcb(bufev, cbarg));
+ bufferevent_inbuf_wm_check(bufev);
}
if (bufev_private->writecb_pending && bufev->writecb) {
bufferevent_data_cb writecb = bufev->writecb;
@@ -222,8 +241,7 @@ void
bufferevent_run_readcb_(struct bufferevent *bufev, int options)
{
/* Requires that we hold the lock and a reference */
- struct bufferevent_private *p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *p = BEV_UPCAST(bufev);
if (bufev->readcb == NULL)
return;
if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
@@ -231,6 +249,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev, int options)
SCHEDULE_DEFERRED(p);
} else {
bufev->readcb(bufev, bufev->cbarg);
+ bufferevent_inbuf_wm_check(bufev);
}
}
@@ -238,8 +257,7 @@ void
bufferevent_run_writecb_(struct bufferevent *bufev, int options)
{
/* Requires that we hold the lock and a reference */
- struct bufferevent_private *p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *p = BEV_UPCAST(bufev);
if (bufev->writecb == NULL)
return;
if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
@@ -267,8 +285,7 @@ void
bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
{
/* Requires that we hold the lock and a reference */
- struct bufferevent_private *p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *p = BEV_UPCAST(bufev);
if (bufev->errorcb == NULL)
return;
if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
@@ -298,14 +315,12 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
if (!bufev->input) {
if ((bufev->input = evbuffer_new()) == NULL)
- return -1;
+ goto err;
}
if (!bufev->output) {
- if ((bufev->output = evbuffer_new()) == NULL) {
- evbuffer_free(bufev->input);
- return -1;
- }
+ if ((bufev->output = evbuffer_new()) == NULL)
+ goto err;
}
bufev_private->refcnt = 1;
@@ -317,7 +332,8 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
bufev->be_ops = ops;
- bufferevent_ratelim_init_(bufev_private);
+ if (bufferevent_ratelim_init_(bufev_private))
+ goto err;
/*
* Set to EV_WRITE so that using bufferevent_write is going to
@@ -328,20 +344,14 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (options & BEV_OPT_THREADSAFE) {
- if (bufferevent_enable_locking_(bufev, NULL) < 0) {
- /* cleanup */
- evbuffer_free(bufev->input);
- evbuffer_free(bufev->output);
- bufev->input = NULL;
- bufev->output = NULL;
- return -1;
- }
+ if (bufferevent_enable_locking_(bufev, NULL) < 0)
+ goto err;
}
#endif
if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
== BEV_OPT_UNLOCK_CALLBACKS) {
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
- return -1;
+ goto err;
}
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_(
@@ -362,6 +372,17 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
evbuffer_set_parent_(bufev->output, bufev);
return 0;
+
+err:
+ if (bufev->input) {
+ evbuffer_free(bufev->input);
+ bufev->input = NULL;
+ }
+ if (bufev->output) {
+ evbuffer_free(bufev->output);
+ bufev->output = NULL;
+ }
+ return -1;
}
void
@@ -460,8 +481,7 @@ bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
short impl_events = event;
int r = 0;
@@ -475,6 +495,8 @@ bufferevent_enable(struct bufferevent *bufev, short event)
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
+ if (r)
+ event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev));
bufferevent_decref_and_unlock_(bufev);
return r;
@@ -534,8 +556,7 @@ int
bufferevent_disable_hard_(struct bufferevent *bufev, short event)
{
int r = 0;
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
bufev->enabled &= ~event;
@@ -558,6 +579,8 @@ bufferevent_disable(struct bufferevent *bufev, short event)
if (bufev->be_ops->disable(bufev, event) < 0)
r = -1;
+ if (r)
+ event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev));
BEV_UNLOCK(bufev);
return r;
@@ -571,8 +594,7 @@ void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
if (events & EV_WRITE) {
@@ -657,8 +679,7 @@ bufferevent_flush(struct bufferevent *bufev,
void
bufferevent_incref_and_lock_(struct bufferevent *bufev)
{
- struct bufferevent_private *bufev_private =
- BEV_UPCAST(bufev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
++bufev_private->refcnt;
}
@@ -684,8 +705,7 @@ bufferevent_transfer_lock_ownership_(struct bufferevent *donor,
int
bufferevent_decref_and_unlock_(struct bufferevent *bufev)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
int n_cbs = 0;
#define MAX_CBS 16
struct event_callback *cbs[MAX_CBS];
@@ -728,8 +748,7 @@ bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
{
struct bufferevent *bufev = arg_;
struct bufferevent *underlying;
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
BEV_LOCK(bufev);
underlying = bufferevent_get_underlying(bufev);
@@ -777,7 +796,7 @@ bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
}
int
-bufferevent_decref_(struct bufferevent *bufev)
+bufferevent_decref(struct bufferevent *bufev)
{
BEV_LOCK(bufev);
return bufferevent_decref_and_unlock_(bufev);
@@ -793,11 +812,14 @@ bufferevent_free(struct bufferevent *bufev)
}
void
-bufferevent_incref_(struct bufferevent *bufev)
+bufferevent_incref(struct bufferevent *bufev)
{
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
+ /* XXX: now that this function is public, we might want to
+ * - return the count from this function
+ * - create a new function to atomically grab the current refcount
+ */
BEV_LOCK(bufev);
++bufev_private->refcnt;
BEV_UNLOCK(bufev);
@@ -848,6 +870,8 @@ bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
BEV_LOCK(bev);
if (bev->be_ops->ctrl)
res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
+ if (res)
+ event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd));
BEV_UNLOCK(bev);
return res;
}
@@ -861,6 +885,8 @@ bufferevent_getfd(struct bufferevent *bev)
BEV_LOCK(bev);
if (bev->be_ops->ctrl)
res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
+ if (res)
+ event_debug(("%s: cannot get fd for %p", __func__, bev));
BEV_UNLOCK(bev);
return (res<0) ? -1 : d.fd;
}
@@ -868,8 +894,7 @@ bufferevent_getfd(struct bufferevent *bev)
enum bufferevent_options
bufferevent_get_options_(struct bufferevent *bev)
{
- struct bufferevent_private *bev_p =
- EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
+ struct bufferevent_private *bev_p = BEV_UPCAST(bev);
enum bufferevent_options options;
BEV_LOCK(bev);
@@ -945,8 +970,7 @@ int
bufferevent_generic_adj_timeouts_(struct bufferevent *bev)
{
const short enabled = bev->enabled;
- struct bufferevent_private *bev_p =
- EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
+ struct bufferevent_private *bev_p = BEV_UPCAST(bev);
int r1=0, r2=0;
if ((enabled & EV_READ) && !bev_p->read_suspended &&
evutil_timerisset(&bev->timeout_read))
@@ -966,9 +990,32 @@ bufferevent_generic_adj_timeouts_(struct bufferevent *bev)
}
int
+bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev)
+{
+ int r = 0;
+ if (event_pending(&bev->ev_read, EV_READ, NULL)) {
+ if (evutil_timerisset(&bev->timeout_read)) {
+ if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0)
+ r = -1;
+ } else {
+ event_remove_timer(&bev->ev_read);
+ }
+ }
+ if (event_pending(&bev->ev_write, EV_WRITE, NULL)) {
+ if (evutil_timerisset(&bev->timeout_write)) {
+ if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0)
+ r = -1;
+ } else {
+ event_remove_timer(&bev->ev_write);
+ }
+ }
+ return r;
+}
+
+int
bufferevent_add_event_(struct event *ev, const struct timeval *tv)
{
- if (tv->tv_sec == 0 && tv->tv_usec == 0)
+ if (!evutil_timerisset(tv))
return event_add(ev, NULL);
else
return event_add(ev, tv);