summaryrefslogtreecommitdiff
path: root/ports/winnt/ntpd/ntp_iocompletionport.c
diff options
context:
space:
mode:
Diffstat (limited to 'ports/winnt/ntpd/ntp_iocompletionport.c')
-rw-r--r--ports/winnt/ntpd/ntp_iocompletionport.c1670
1 files changed, 877 insertions, 793 deletions
diff --git a/ports/winnt/ntpd/ntp_iocompletionport.c b/ports/winnt/ntpd/ntp_iocompletionport.c
index a14b081add53d..6512d75488fad 100644
--- a/ports/winnt/ntpd/ntp_iocompletionport.c
+++ b/ports/winnt/ntpd/ntp_iocompletionport.c
@@ -16,20 +16,20 @@ Some notes on the implementation:
makes using more than one thread useless, as they would compete for
the same core and create contention.
-+ Some IO operations need a possibly lengthy postprocessing. Emulating
++ Some IO operations need a possibly lengthy post-processing. Emulating
the UN*X line discipline is currently the only but prominent example.
To avoid the processing in the time-critical IOCPL thread, longer
processing is offloaded the worker thread pool.
+ A fact that seems not as well-known as it should be is that all
- ressources passed to an overlapped IO operation must be considered
+ resources passed to an overlapped IO operation must be considered
owned by the OS until the result has been fetched/dequeued. This
includes all overlapped structures and buffers involved, so cleaning
up on shutdown must be carefully constructed. (This includes closing
all the IO handles and waiting for the results to be dequeued.
'CancleIo()' cannot be used since it's broken beyond repair.)
- If this is not possible, then all ressources should be dropped into
+ If this is not possible, then all resources should be dropped into
oblivion -- otherwise "bad things (tm)" are bound to happen.
Using a private heap that is silently dropped but not deleted is a
@@ -59,124 +59,35 @@ Juergen Perlinger (perlinger@ntp.org) Feb 2012
#include <stdio.h>
#include <process.h>
#include <syslog.h>
-#include <limits.h>
#include "ntpd.h"
-#include "ntp_machine.h"
-#include "ntp_iocompletionport.h"
#include "ntp_request.h"
-#include "ntp_assert.h"
-#include "ntp_io.h"
-#include "ntp_lists.h"
+
+#include "ntp_iocompletionport.h"
+#include "ntp_iocplmem.h"
+#include "ntp_iocpltypes.h"
#define CONTAINEROF(p, type, member) \
((type *)((char *)(p) - offsetof(type, member)))
-#ifdef _MSC_VER
-# pragma warning(push)
-# pragma warning(disable: 201) /* nonstd extension nameless union */
-#endif
-
-/*
- * ---------------------------------------------------------------------
- * storage type for PPS data (DCD change counts & times)
- * ---------------------------------------------------------------------
- */
-struct PpsData {
- u_long cc_assert;
- u_long cc_clear;
- l_fp ts_assert;
- l_fp ts_clear;
-};
-typedef struct PpsData PPSData_t;
-
-struct PpsDataEx {
- u_long cov_count;
- PPSData_t data;
-};
-typedef volatile struct PpsDataEx PPSDataEx_t;
-
-/*
- * ---------------------------------------------------------------------
- * device context; uses reference counting to avoid nasty surprises.
- * Currently this stores only the PPS time stamps, but it could be
- * easily extended.
- * ---------------------------------------------------------------------
- */
-#define PPS_QUEUE_LEN 8u /* must be power of two! */
-#define PPS_QUEUE_MSK (PPS_QUEUE_LEN-1) /* mask for easy MOD ops */
-
-struct DeviceContext {
- volatile long ref_count;
- volatile u_long cov_count;
- PPSData_t pps_data;
- PPSDataEx_t pps_buff[PPS_QUEUE_LEN];
-};
-
-typedef struct DeviceContext DevCtx_t;
-
-/*
- * ---------------------------------------------------------------------
- * I/O context structure
- *
- * This is an extended overlapped structure. Some fields are only used
- * for serial I/O, others are used for all operations. The serial I/O is
- * more interesting since the same context object is used for waiting,
- * actual I/O and possibly offload processing in a worker thread until
- * a complete operation cycle is done.
- *
- * In this case the I/O context is used to gather all the bits that are
- * finally needed for the processing of the buffer.
- * ---------------------------------------------------------------------
- */
-//struct IoCtx;
-typedef struct IoCtx IoCtx_t;
-typedef struct refclockio RIO_t;
-
-typedef void (*IoCompleteFunc)(ULONG_PTR, IoCtx_t *);
-
-struct IoCtx {
- OVERLAPPED ol; /* 'kernel' part of the context */
- union {
- recvbuf_t * recv_buf; /* incoming -> buffer structure */
- void * trans_buf; /* outgoing -> char array */
- PPSData_t * pps_buf; /* for reading PPS seq/stamps */
- HANDLE ppswake; /* pps wakeup for attach */
- };
- IoCompleteFunc onIoDone; /* HL callback to execute */
- RIO_t * rio; /* RIO backlink (for offload) */
- DevCtx_t * devCtx;
- l_fp DCDSTime; /* PPS-hack: time of DCD ON */
- l_fp FlagTime; /* timestamp of flag/event char */
- l_fp RecvTime; /* timestamp of callback */
- DWORD errCode; /* error code of last I/O */
- DWORD byteCount; /* byte count " */
- DWORD com_events; /* buffer for COM events */
- unsigned int flRawMem : 1; /* buffer is raw memory -> free */
- unsigned int flTsDCDS : 1; /* DCDSTime valid? */
- unsigned int flTsFlag : 1; /* FlagTime valid? */
-};
-
-#ifdef _MSC_VER
-# pragma warning(pop)
-#endif
/*
* local function definitions
*/
-static void ntpd_addremove_semaphore(HANDLE, int);
-static inline void set_serial_recv_time (recvbuf_t *, IoCtx_t *);
+static void ntpd_addremove_semaphore(HANDLE, int);
+static void set_serial_recv_time (recvbuf_t *, IoCtx_t *);
/* Initiate/Request async IO operations */
-static BOOL QueueSerialWait (RIO_t *, recvbuf_t *, IoCtx_t *);
-static BOOL QueueSerialRead (RIO_t *, recvbuf_t *, IoCtx_t *);
-static BOOL QueueRawSerialRead(RIO_t *, recvbuf_t *, IoCtx_t *);
-static BOOL QueueSocketRecv (SOCKET , recvbuf_t *, IoCtx_t *);
+static BOOL __fastcall QueueSerialWait (IoCtx_t *, recvbuf_t *);
+static BOOL __fastcall QueueSerialRead(IoCtx_t *, recvbuf_t *);
+static BOOL __fastcall QueueRawSerialRead(IoCtx_t *, recvbuf_t *);
+static BOOL __fastcall QueueSocketRecv(IoCtx_t *, recvbuf_t *);
/* High-level IO callback functions */
static void OnSocketRecv (ULONG_PTR, IoCtx_t *);
+static void OnSocketSend (ULONG_PTR, IoCtx_t *);
static void OnSerialWaitComplete (ULONG_PTR, IoCtx_t *);
static void OnSerialReadComplete (ULONG_PTR, IoCtx_t *);
static void OnRawSerialReadComplete(ULONG_PTR, IoCtx_t *);
@@ -192,198 +103,29 @@ static void free_io_completion_port_mem(void);
#endif
- HANDLE WaitableExitEventHandle;
- HANDLE WaitableIoEventHandle;
-static HANDLE hIoCompletionPort;
+ HANDLE WaitableExitEventHandle;
+ HANDLE WaitableIoEventHandle;
+static HANDLE hndIOCPLPort;
+static HANDLE hMainThread;
DWORD ActiveWaitHandles;
HANDLE WaitHandles[16];
-/*
- * -------------------------------------------------------------------
- * We make a pool of our own for IO context objects -- the are owned by
- * the system until a completion result is pulled from the queue, and
- * they seriously go into the way of memory tracking until we can safely
- * cancel an IO request.
- * -------------------------------------------------------------------
- */
-static HANDLE hHeapHandle;
-
-/*
- * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- * Create a new heap for IO context objects
- */
-static void
-IoCtxPoolInit(
- size_t initObjs
- )
-{
- hHeapHandle = HeapCreate(0, initObjs * sizeof(IoCtx_t), 0);
- if (hHeapHandle == NULL) {
- msyslog(LOG_ERR, "Can't initialize Heap: %m");
- exit(1);
- }
-}
-
-/*
- * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- *
- * Delete the IO context heap
- *
- * Since we do not know what callbacks are pending, we just drop the
- * pool into oblivion. New allocs and frees will fail from this moment,
- * but we simply don't care. At least the normal heap dump stats will
- * show no leaks from IO context blocks. On the downside, we have to
- * track them ourselves if something goes wrong.
- */
-static void
-IoCtxPoolDone(void)
-{
- hHeapHandle = NULL;
-}
-
-/*
- * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- * Alloc & Free on local heap
- *
- * When the heap handle is NULL, these both will fail; Alloc with a NULL
- * return and Free silently.
- */
-static void * __fastcall
-LocalPoolAlloc(
- size_t size,
- const char * desc
-)
-{
- void * ptr;
-
- /* Windows heaps can't grok zero byte allocation.
- * We just get one byte.
- */
- if (size == 0)
- size = 1;
- if (hHeapHandle != NULL)
- ptr = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY, size);
- else
- ptr = NULL;
- DPRINTF(3, ("Allocate '%s', heap=%p, ptr=%p\n",
- desc, hHeapHandle, ptr));
-
- return ptr;
-}
-
-static void __fastcall
-LocalPoolFree(
- void * ptr,
- const char * desc
- )
-{
- DPRINTF(3, ("Free '%s', heap=%p, ptr=%p\n",
- desc, hHeapHandle, ptr));
- if (ptr != NULL && hHeapHandle != NULL)
- HeapFree(hHeapHandle, 0, ptr);
-}
-
-/*
- * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- * Alloc & Free of Device context
- *
- * When the heap handle is NULL, these both will fail; Alloc with a NULL
- * return and Free silently.
- */
-static DevCtx_t * __fastcall
-DevCtxAlloc(void)
-{
- DevCtx_t * devCtx;
- u_long slot;
-
- /* allocate struct and tag all slots as invalid */
- devCtx = (DevCtx_t *)LocalPoolAlloc(sizeof(DevCtx_t), "DEV ctx");
- if (devCtx != NULL)
- {
- /* The initial COV values make sure there is no busy
- * loop on unused/empty slots.
- */
- devCtx->cov_count = 0;
- for (slot = 0; slot < PPS_QUEUE_LEN; slot++)
- devCtx->pps_buff[slot].cov_count = ~slot;
- }
- return devCtx;
-}
-
-static void __fastcall
-DevCtxFree(
- DevCtx_t * devCtx
- )
-{
- /* this would be the place to get rid of managed ressources. */
- LocalPoolFree(devCtx, "DEV ctx");
-}
-
-static DevCtx_t * __fastcall
-DevCtxAttach(
- DevCtx_t * devCtx
- )
-{
- if (devCtx != NULL)
- InterlockedIncrement(&devCtx->ref_count);
- return devCtx;
-}
-
-static void __fastcall
-DevCtxDetach(
- DevCtx_t * devCtx
- )
-{
- if (devCtx && !InterlockedDecrement(&devCtx->ref_count))
- DevCtxFree(devCtx);
-}
/*
- * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- * Alloc & Free of I/O context
+ * -------------------------------------------------------------------
+ * Windows 2000 bluescreens with bugcheck 0x76 PROCESS_HAS_LOCKED_PAGES
+ * at ntpd process termination when using more than one pending
+ * receive per socket. A runtime version test during startup will
+ * allow using more on newer versions of Windows.
*
- * When the heap handle is NULL, these both will fail; Alloc with a NULL
- * return and Free silently.
+ * perlinger@ntp.org: Considering the quirks fixed in the overlapped
+ * IO handling in recent years, it could even be that this is no longer
+ * an issue. Testing this might be tricky -- who runs a Win2k system
+ * in the year 2016?
*/
-static IoCtx_t * __fastcall
-IoCtxAlloc(
- DevCtx_t * devCtx
- )
-{
- IoCtx_t * ioCtx;
+static size_t s_SockRecvSched = 1; /* possibly adjusted later */
- ioCtx = (IoCtx_t *)LocalPoolAlloc(sizeof(IoCtx_t), "IO ctx");
- if (ioCtx != NULL)
- ioCtx->devCtx = DevCtxAttach(devCtx);
- return ioCtx;
-}
-
-static void __fastcall
-IoCtxFree(
- IoCtx_t * ctx
- )
-{
- if (ctx)
- DevCtxDetach(ctx->devCtx);
- LocalPoolFree(ctx, "IO ctx");
-}
-
-static void __fastcall
-IoCtxReset(
- IoCtx_t * ctx
- )
-{
- RIO_t * rio;
- DevCtx_t * dev;
- if (ctx) {
- rio = ctx->rio;
- dev = ctx->devCtx;
- ZERO(*ctx);
- ctx->rio = rio;
- ctx->devCtx = dev;
- }
-}
/*
* -------------------------------------------------------------------
@@ -408,7 +150,9 @@ static UINT tidCompletionThread;
* dangerous weapon -- it's like SIGKILL.
*/
static unsigned WINAPI
-iocompletionthread(void *NotUsed)
+iocompletionthread(
+ void *NotUsed
+ )
{
DWORD err;
DWORD octets;
@@ -418,19 +162,17 @@ iocompletionthread(void *NotUsed)
UNUSED_ARG(NotUsed);
- /*
- * Socket and refclock receive call gettimeofday() so the I/O
+ /* Socket and refclock receive call gettimeofday() so the I/O
* thread needs to be on the same processor as the main and
* timing threads to ensure consistent QueryPerformanceCounter()
* results.
*
* This gets seriously into the way of efficient thread pooling
- * on multicore systems.
+ * on multi-core systems.
*/
lock_thread_to_processor(GetCurrentThread());
- /*
- * Set the thread priority high enough so I/O will preempt
+ /* Set the thread priority high enough so I/O will pre-empt
* normal recv packet processing, but not higher than the timer
* sync thread.
*/
@@ -440,7 +182,7 @@ iocompletionthread(void *NotUsed)
for(;;) {
if (GetQueuedCompletionStatus(
- hIoCompletionPort,
+ hndIOCPLPort,
&octets,
&key,
&pol,
@@ -449,12 +191,12 @@ iocompletionthread(void *NotUsed)
} else {
err = GetLastError();
}
- if (NULL == pol) {
+ if (pol == NULL) {
DPRINTF(2, ("Overlapped IO Thread Exiting\n"));
break; /* fail */
}
lpo = CONTAINEROF(pol, IoCtx_t, ol);
- get_systime(&lpo->RecvTime);
+ get_systime(&lpo->aux.RecvTime);
lpo->byteCount = octets;
lpo->errCode = err;
handler_calls++;
@@ -471,12 +213,21 @@ iocompletionthread(void *NotUsed)
void
init_io_completion_port(void)
{
-#ifdef DEBUG
+ OSVERSIONINFO vi;
+
+# ifdef DEBUG
atexit(&free_io_completion_port_mem);
-#endif
+# endif
+
+ memset(&vi, 0, sizeof(vi));
+ vi.dwOSVersionInfoSize = sizeof(vi);
+
+ /* For windows 7 and above, schedule more than one receive */
+ if (GetVersionEx(&vi) && vi.dwMajorVersion >= 6)
+ s_SockRecvSched = 4;
/* Create the context pool first. */
- IoCtxPoolInit(20);
+ IOCPLPoolInit(20);
/* Create the event used to signal an IO event */
WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
@@ -492,9 +243,9 @@ init_io_completion_port(void)
}
/* Create the IO completion port */
- hIoCompletionPort = CreateIoCompletionPort(
+ hndIOCPLPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, NULL, 0, 0);
- if (hIoCompletionPort == NULL) {
+ if (hndIOCPLPort == NULL) {
msyslog(LOG_ERR, "Can't create I/O completion port: %m");
exit(1);
}
@@ -505,17 +256,19 @@ init_io_completion_port(void)
WaitHandles[2] = WaitableTimerHandle;
ActiveWaitHandles = 3;
- /*
- * Supply ntp_worker.c with function to add or remove a
+ /* Supply ntp_worker.c with function to add or remove a
* semaphore to the ntpd I/O loop which is signalled by a worker
* when a response is ready. The callback is invoked in the
* parent.
*/
addremove_io_semaphore = &ntpd_addremove_semaphore;
- /*
- * Have one thread servicing I/O. See rationale in front matter.
- */
+ /* Create a true handle for the main thread (APC processing) */
+ DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
+ GetCurrentProcess(), &hMainThread,
+ 0, FALSE, DUPLICATE_SAME_ACCESS);
+
+ /* Have one thread servicing I/O. See rationale in front matter. */
hIoCompletionThread = (HANDLE)_beginthreadex(
NULL,
0,
@@ -535,35 +288,40 @@ uninit_io_completion_port(
void
)
{
- DWORD rc;
+ DWORD rc;
/* do noting if completion port already gone. */
- if (NULL == hIoCompletionPort)
+ if (hndIOCPLPort == NULL)
return;
- /*
- * Service thread seems running. Terminate him with grace
+ /* Service thread seems running. Terminate him with grace
* first and force later...
*/
- if (tidCompletionThread != GetCurrentThreadId()) {
- PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0);
- rc = WaitForSingleObject(hIoCompletionThread, 5000);
- if (rc == WAIT_TIMEOUT) {
- /* Thread lost. Kill off with TerminateThread. */
- msyslog(LOG_ERR,
- "IO completion thread refuses to terminate");
- TerminateThread(hIoCompletionThread, ~0UL);
- }
+ if (tidCompletionThread != GetCurrentThreadId()) {
+ PostQueuedCompletionStatus(hndIOCPLPort, 0, 0, 0);
+ rc = WaitForSingleObject(hIoCompletionThread, 5000);
+ if (rc == WAIT_TIMEOUT) {
+ /* Thread lost. Kill off with TerminateThread. */
+ msyslog(LOG_ERR,
+ "IO completion thread refuses to terminate");
+ TerminateThread(hIoCompletionThread, ~0UL);
+ }
}
- /* stop using the memory pool */
- IoCtxPoolDone();
+ /* close the additional main thread handle */
+ if (hMainThread) {
+ CloseHandle(hMainThread);
+ hMainThread = NULL;
+ }
+
+ /* stop using the memory pool */
+ IOCPLPoolDone();
/* now reap all handles... */
CloseHandle(hIoCompletionThread);
hIoCompletionThread = NULL;
- CloseHandle(hIoCompletionPort);
- hIoCompletionPort = NULL;
+ CloseHandle(hndIOCPLPort);
+ hndIOCPLPort = NULL;
}
@@ -594,8 +352,7 @@ ntpd_addremove_semaphore(
break;
if (remove) {
- /*
- * If found, eventually swap with last entry to keep
+ /* If found, eventually swap with last entry to keep
* the table dense.
*/
if (hi < ActiveWaitHandles) {
@@ -606,8 +363,7 @@ ntpd_addremove_semaphore(
WaitHandles[ActiveWaitHandles] = NULL;
}
} else {
- /*
- * Make sure the entry is not found and there is enough
+ /* Make sure the entry is not found and there is enough
* room, then append to the table array.
*/
if (hi >= ActiveWaitHandles) {
@@ -621,12 +377,9 @@ ntpd_addremove_semaphore(
#ifdef DEBUG
static void
-free_io_completion_port_mem(
- void
- )
+free_io_completion_port_mem(void)
{
- /*
- * At the moment, do absolutely nothing. Returning memory here
+ /* At the moment, do absolutely nothing. Returning memory here
* requires NO PENDING OVERLAPPED OPERATIONS AT ALL at this
* point in time, and as long we cannot be reasonable sure about
* that the simple advice is:
@@ -636,16 +389,68 @@ free_io_completion_port_mem(
}
#endif /* DEBUG */
-
/*
* -------------------------------------------------------------------
- * Serial IO stuff
+ * APC callback for scheduling interface scans.
+ *
+ * We get an error when trying to send if the network interface is
+ * gone or has lost link. Rescan interfaces to catch on sooner, but no
+ * more often than once per minute. Once ntpd is able to detect
+ * changes without polling this should be unnecessary.
+ */
+static void WINAPI
+apcOnUnexpectedNetworkError(
+ ULONG_PTR arg
+ )
+{
+ static u_long time_next_ifscan_after_error;
+
+ UNUSED_ARG(arg);
+
+ if (time_next_ifscan_after_error < current_time) {
+ time_next_ifscan_after_error = current_time + 60;
+ timer_interfacetimeout(current_time);
+ }
+ DPRINTF(4, ("UnexpectedNetworkError: interface may be down\n"));
+}
+
+/* -------------------------------------------------------------------
+ *
+ * Prelude to madness -- common error checking code
*
- * Prelude -- common error checking code
* -------------------------------------------------------------------
*/
extern char * NTstrerror(int err, BOOL *bfreebuf);
+static void
+LogIoError(
+ const char * msg,
+ HANDLE hnd,
+ DWORD err
+ )
+{
+ static const char * const rmsg =
+ "LogIoError (unknown source)";
+
+ /* -*- format & print the error message -*-
+ * We have to resort to the low level error formatting functions
+ * here, since the error code can come from an overlapped result.
+ * Relying the value to be the same as the 'GetLastError()'
+ * result at this point of execution is shaky at best, and using
+ * 'SetLastError()' to force it seems too nasty.
+ */
+ BOOL dynbuf = FALSE;
+ char * msgbuf = NTstrerror(err, &dynbuf);
+ msyslog(LOG_ERR, "%s: hnd=%p, err=%u, '%s'",
+ (msg ? msg : rmsg), hnd, err, msgbuf);
+ if (dynbuf)
+ LocalFree(msgbuf);
+}
+
+/* -------------------------------------------------------------------
+ * synchronous IO request result check (network & serial)
+ * -------------------------------------------------------------------
+ */
static BOOL
IoResultCheck(
DWORD err,
@@ -653,153 +458,252 @@ IoResultCheck(
const char * msg
)
{
- char * msgbuf;
- BOOL dynbuf;
-
- /* If the clock is not / no longer active, assume
- * 'ERROR_OPERATION_ABORTED' and do the necessary cleanup.
- */
- if (ctx->rio && !ctx->rio->active)
- err = ERROR_OPERATION_ABORTED;
-
- switch (err)
- {
+ switch (err) {
/* The first ones are no real errors. */
case ERROR_SUCCESS: /* all is good */
case ERROR_IO_PENDING: /* callback pending */
- return TRUE;
-
- /* the next ones go silently -- only cleanup is done */
- case ERROR_INVALID_PARAMETER: /* handle already closed */
- case ERROR_OPERATION_ABORTED: /* handle closed while wait */
break;
+ /* this defers the error processing to the main thread
+ * and continues silently.
+ */
+ case ERROR_UNEXP_NET_ERR:
+ if (hMainThread) {
+ QueueUserAPC(apcOnUnexpectedNetworkError,
+ hMainThread, ctx->io.sfd);
+ }
+ IoCtxRelease(ctx);
+ return FALSE;
default:
- /*
- * We have to resort to the low level error formatting
- * functions here, since the error code can be an
- * overlapped result. Relying the value to be the same
- * as the 'GetLastError()' result at this point of
- * execution is shaky at best, and using SetLastError()
- * to force it seems too nasty.
- */
- msgbuf = NTstrerror(err, &dynbuf);
- msyslog(LOG_ERR, "%s: err=%u, '%s'", msg, err, msgbuf);
- if (dynbuf)
- LocalFree(msgbuf);
- break;
+ LogIoError(msg, ctx->io.hnd, err);
+ /* the next ones go silently -- only clean-up is done */
+ case ERROR_INVALID_PARAMETER: /* handle already closed (clock)*/
+ case WSAENOTSOCK : /* handle already closed (socket)*/
+ IoCtxRelease(ctx);
+ return FALSE;
}
+ return TRUE;
+}
- /* If we end here, we have to mop up the buffer and context */
- if (ctx->flRawMem) {
- if (ctx->trans_buf)
- free(ctx->trans_buf);
- } else {
- if (ctx->recv_buf)
- freerecvbuf(ctx->recv_buf);
+/* -------------------------------------------------------------------
+ * IO callback context check -- serial (non-network) data streams
+ *
+ * Attention: deletes the IO context when the clock is dead!
+ * -------------------------------------------------------------------
+ */
+static RIO_t*
+getRioFromIoCtx(
+ IoCtx_t * ctx,
+ ULONG_PTR key,
+ const char * msg
+ )
+{
+ /* Make sure the key matches the context info in the shared
+ * lock, the check for errors. If the error indicates the
+ * operation was cancelled, let the operation fail silently.
+ */
+ RIO_t * rio = NULL;
+ SharedLock_t * slock = slAttachShared(ctx->slock);
+ if (NULL != slock) {
+ rio = slock->rsrc.rio;
+ if (key != slock->rsrc.key)
+ rio = NULL;
+ else if (ctx->io.hnd != slock->handles[0])
+ rio = NULL;
+ slDetachShared(slock);
}
- IoCtxFree(ctx);
- return FALSE;
+ if (rio != NULL) switch (ctx->errCode) {
+ /* When we got cancelled, don't spill messages */
+ case ERROR_INVALID_PARAMETER: /* handle already closed (clock) */
+ case ERROR_OPERATION_ABORTED: /* handle closed while wait */
+ case WSAENOTSOCK: /* handle already closed (sock?) */
+ ctx->errCode = ERROR_SUCCESS;
+ rio = NULL;
+ case ERROR_SUCCESS: /* all is good */
+ break;
+ default:
+ /* log error, but return -- caller has to handle this! */
+ LogIoError(msg, ctx->io.hnd, ctx->errCode);
+ break;
+ }
+ if (rio == NULL)
+ IoCtxRelease(ctx);
+ return rio;
}
+/* -------------------------------------------------------------------
+ * IO callback context check -- network sockets
+ *
+ * Attention: deletes the IO context when the endpoint is dead!
+ * -------------------------------------------------------------------
+ */
+static endpt*
+getEndptFromIoCtx(
+ IoCtx_t * ctx,
+ ULONG_PTR key,
+ const char * msg
+ )
+{
+ /* Make sure the key matches the context info in the shared
+ * lock, the check for errors. If the error indicates the
+ * operation was cancelled, let the operation fail silently.
+ *
+ * !Note! Since we use the lowest bit of the key to distinguish
+ * between regular and broadcast socket, we must make sure the
+ * LSB is not used in the reverse-link check. Hence we shift
+ * it out in both the input key and the registered source.
+ */
+ endpt * ep = NULL;
+ SharedLock_t * slock = slAttachShared(ctx->slock);
+ if (slock != NULL) {
+ ep = slock->rsrc.ept;
+ if ((key >> 1) != (slock->rsrc.key >> 1))
+ ep = NULL;
+ else if (ctx->io.hnd != slock->handles[key & 1])
+ ep = NULL;
+ slDetachShared(slock);
+ }
+ if (ep != NULL) switch (ctx->errCode) {
+ case ERROR_UNEXP_NET_ERR:
+ if (hMainThread)
+ QueueUserAPC(apcOnUnexpectedNetworkError,
+ hMainThread, ctx->io.sfd);
+ case ERROR_INVALID_PARAMETER: /* handle already closed (clock?)*/
+ case ERROR_OPERATION_ABORTED: /* handle closed while wait */
+ case WSAENOTSOCK : /* handle already closed (sock) */
+ ctx->errCode = ERROR_SUCCESS;
+ ep = NULL;
+ case ERROR_SUCCESS: /* all is good */
+ break;
+ default:
+ /* log error, but return -- caller has to handle this! */
+ LogIoError(msg, ctx->io.hnd, ctx->errCode);
+ ep = NULL;
+ break;
+ }
+ if (NULL == ep)
+ IoCtxRelease(ctx);
+ return ep;
+}
/*
* -------------------------------------------------------------------
* Serial IO stuff
*
* Part 1 -- COMM event handling
+ *
+ * This is the initial step for serial line input: wait for COM event.
+ * We always check for DCD changes (for user-mode PPS time stamps) and
+ * either a flag char (line feed, for line mode emulation) or any
+ * input character (raw mode). In the callback we decide if we just
+ * have to go on with waiting, or if there is data we must read.
+ * Depending on the mode, we either queue a raw read or a 'regular'
+ * read request.
+ *
+ * !Note! Currently on single IO context circles through the WAIT,
+ * READ and PROCESS stages. For better performance, it might make
+ * sense to have on cycle for the wait, spinning off new read requests
+ * when there is data. There are actually two problems that must be
+ * solved:
+ * - We would need a queue on post-processing.
+ * - We have to take care of the order of read results. While the
+ * IOCPL queue guarantees delivery in the order of enque, the
+ * order of enque is not guaranteed once multiple reads are in
+ * flight.
+ *
+ * So, for the time being, we have one request cycling...
* -------------------------------------------------------------------
*/
-static BOOL
+static BOOL __fastcall
QueueSerialWait(
- RIO_t * rio,
- recvbuf_t * buff,
- IoCtx_t * lpo
+ IoCtx_t * lpo,
+ recvbuf_t * buff
)
{
- BOOL rc;
+ static const char * const msg =
+ "QueueSerialWait: cannot wait for COM event";
+ BOOL rc;
+
+ memset(&lpo->aux, 0, sizeof(lpo->aux));
lpo->onIoDone = OnSerialWaitComplete;
lpo->recv_buf = buff;
lpo->flRawMem = 0;
- lpo->rio = rio;
- buff->fd = rio->fd;
-
- rc = WaitCommEvent((HANDLE)_get_osfhandle(rio->fd),
- &lpo->com_events, &lpo->ol);
- if (!rc)
- return IoResultCheck(GetLastError(), lpo,
- "Can't wait on Refclock");
- return TRUE;
+
+ buff->fd = lpo->slock->riofd;
+ /* keep receive position for continuation of partial lines! */
+ rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol);
+ return rc || IoResultCheck(GetLastError(), lpo, msg);
}
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
static void
OnSerialWaitComplete(
- ULONG_PTR key,
+ ULONG_PTR key,
IoCtx_t * lpo
)
{
- RIO_t * rio;
+ static const char * const msg =
+ "OnSerialWaitComplete: wait for COM event failed";
+
DevCtx_t * dev;
- recvbuf_t * buff;
PPSDataEx_t * ppsbuf;
DWORD modem_status;
u_long covc;
- /* check and bail out if operation failed */
- if (!IoResultCheck(lpo->errCode, lpo,
- "WaitCommEvent failed"))
+ /* Make sure this RIO is not closed. */
+ if (NULL == getRioFromIoCtx(lpo, key, msg))
return;
- /* get & validate context and buffer. */
- rio = (RIO_t *)key;
- buff = lpo->recv_buf;
- dev = lpo->devCtx;
-
- INSIST(rio == lpo->rio);
+ /* start next IO and leave if we hit an error */
+ if (lpo->errCode != ERROR_SUCCESS) {
+ IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
+ return;
+ }
#ifdef DEBUG
- if (~(EV_RXFLAG | EV_RLSD | EV_RXCHAR) & lpo->com_events) {
+ if (~(EV_RXFLAG | EV_RLSD | EV_RXCHAR) & lpo->aux.com_events) {
msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x",
- lpo->com_events);
+ lpo->aux.com_events);
exit(-1);
}
#endif
- /*
- * Take note of changes on DCD; 'user mode PPS hack'.
- * perlinger@ntp.org suggested a way of solving several problems with
- * this code that makes a lot of sense: move to a putative
+ /* Take note of changes on DCD; 'user mode PPS hack'.
+ * perlinger@ntp.org suggested a way of solving several problems
+ * with this code that makes a lot of sense: move to a putative
* dcdpps-ppsapi-provider.dll.
+ *
+ * perlinger@ntp.org: It came out as loopback-ppsapi-provider
+ * (because it loops back into NTPD), but I had to maintain the
+ * old hack for backward compatibility.
*/
- if (EV_RLSD & lpo->com_events) {
+ if (EV_RLSD & lpo->aux.com_events) {
modem_status = 0;
- GetCommModemStatus((HANDLE)_get_osfhandle(rio->fd),
- &modem_status);
-
- if (dev != NULL) {
+ GetCommModemStatus(lpo->io.hnd, &modem_status);
+ if (NULL != (dev = lpo->devCtx)) {
/* PPS-context available -- use it! */
if (MS_RLSD_ON & modem_status) {
dev->pps_data.cc_assert++;
- dev->pps_data.ts_assert = lpo->RecvTime;
- DPRINTF(2, ("upps-real: fd %d DCD PPS Rise at %s\n", rio->fd,
- ulfptoa(&lpo->RecvTime, 6)));
+ dev->pps_data.ts_assert = lpo->aux.RecvTime;
+ DPRINTF(2, ("upps-real: fd %d DCD PPS Rise at %s\n",
+ lpo->slock->rsrc.rio->fd,
+ ulfptoa(&lpo->aux.RecvTime, 6)));
} else {
dev->pps_data.cc_clear++;
- dev->pps_data.ts_clear = lpo->RecvTime;
- DPRINTF(2, ("upps-real: fd %d DCD PPS Fall at %s\n", rio->fd,
- ulfptoa(&lpo->RecvTime, 6)));
+ dev->pps_data.ts_clear = lpo->aux.RecvTime;
+ DPRINTF(2, ("upps-real: fd %d DCD PPS Fall at %s\n",
+ lpo->slock->rsrc.rio->fd,
+ ulfptoa(&lpo->aux.RecvTime, 6)));
}
- /*
- ** Update PPS buffer, writing from low to high, with index
- ** update as last action. We use interlocked ops and a
- ** volatile data destination to avoid reordering on compiler
- ** and CPU level. The interlocked instruction act as full
- ** barriers -- we need only release semantics, but we don't
- ** have them before VS2010.
- */
+ /* Update PPS buffer, writing from low to high, with index
+ * update as last action. We use interlocked ops and a
+ * volatile data destination to avoid reordering on compiler
+ * and CPU level. The interlocked instruction act as full
+ * barriers -- we need only release semantics, but we don't
+ * have them before VS2010.
+ */
covc = dev->cov_count + 1u;
ppsbuf = dev->pps_buff + (covc & PPS_QUEUE_MSK);
InterlockedExchange((PLONG)&ppsbuf->cov_count, covc);
@@ -807,33 +711,35 @@ OnSerialWaitComplete(
InterlockedExchange((PLONG)&dev->cov_count, covc);
}
/* perlinger@ntp.org, 2012-11-19
- It can be argued that once you have the PPS API active, you can
- disable the old pps hack. This would give a behaviour that's much
- more like the behaviour under a UN*Xish OS. On the other hand, it
- will give a nasty surprise for people which have until now happily
- taken the pps hack for granted, and after the first complaint, I have
- decided to keep the old implementation unconditionally. So here it is:
-
- /* backward compat: 'usermode-pps-hack' */
+ * It can be argued that once you have the PPS API active, you can
+ * disable the old pps hack. This would give a behaviour that's much
+ * more like the behaviour under a UN*Xish OS. On the other hand, it
+ * will give a nasty surprise for people which have until now happily
+ * taken the pps hack for granted, and after the first complaint, I have
+ * decided to keep the old implementation unconditionally. So here it is:
+ *
+ * backward compat: 'usermode-pps-hack'
+ */
if (MS_RLSD_ON & modem_status) {
- lpo->DCDSTime = lpo->RecvTime;
- lpo->flTsDCDS = 1;
- DPRINTF(2, ("upps-hack: fd %d DCD PPS Rise at %s\n", rio->fd,
- ulfptoa(&lpo->RecvTime, 6)));
+ lpo->aux.DCDSTime = lpo->aux.RecvTime;
+ lpo->aux.flTsDCDS = 1;
+ DPRINTF(2, ("upps-hack: fd %d DCD PPS Rise at %s\n",
+ lpo->slock->rsrc.rio->fd,
+ ulfptoa(&lpo->aux.RecvTime, 6)));
}
}
/* If IO ready, read data. Go back waiting else. */
- if (EV_RXFLAG & lpo->com_events) { /* line discipline */
- lpo->FlagTime = lpo->RecvTime;
- lpo->flTsFlag = 1;
- QueueSerialRead(rio, buff, lpo);
- } else if (EV_RXCHAR & lpo->com_events) { /* raw discipline */
- lpo->FlagTime = lpo->RecvTime;
- lpo->flTsFlag = 1;
- QueueRawSerialRead(rio, buff, lpo);
+ if (EV_RXFLAG & lpo->aux.com_events) { /* line discipline */
+ lpo->aux.FlagTime = lpo->aux.RecvTime;
+ lpo->aux.flTsFlag = 1;
+ IoCtxStartLocked(lpo, QueueSerialRead, lpo->recv_buf);
+ } else if (EV_RXCHAR & lpo->aux.com_events) { /* raw discipline */
+ lpo->aux.FlagTime = lpo->aux.RecvTime;
+ lpo->aux.flTsFlag = 1;
+ IoCtxStartLocked(lpo, QueueRawSerialRead, lpo->recv_buf);
} else { /* idle... */
- QueueSerialWait(rio, buff, lpo);
+ IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
}
}
@@ -841,6 +747,41 @@ OnSerialWaitComplete(
* -------------------------------------------------------------------
* Serial IO stuff
*
+ * common for both modes
+ * -------------------------------------------------------------------
+ */
+static BOOL __fastcall
+QueueSerialReadCommon(
+ IoCtx_t * lpo,
+ recvbuf_t * buff
+ )
+{
+ static const char * const msg =
+ "QueueSerialRead: cannot schedule device read";
+
+ BOOL rc;
+
+ /* 'lpo->onIoDone' must be set already! */
+ lpo->recv_buf = buff;
+ lpo->flRawMem = 0;
+
+ /* 'buff->recv_length' must be set already! */
+ buff->fd = lpo->slock->riofd;
+ buff->dstadr = NULL;
+ buff->receiver = process_refclock_packet;
+ buff->recv_peer = lpo->slock->rsrc.rio->srcclock;
+
+ rc = ReadFile(lpo->io.hnd,
+ (char*)buff->recv_buffer + buff->recv_length,
+ sizeof(buff->recv_buffer) - buff->recv_length,
+ NULL, &lpo->ol);
+ return rc || IoResultCheck(GetLastError(), lpo, msg);
+}
+
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
* Part 2 -- line discipline emulation
*
* Ideally this should *not* be done in the IO completion thread.
@@ -848,37 +789,21 @@ OnSerialWaitComplete(
* -------------------------------------------------------------------
*/
-/*
- * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Start & Queue a serial read for line discipline emulation.
*/
-static BOOL
+static BOOL __fastcall
QueueSerialRead(
- RIO_t * rio,
- recvbuf_t * buff,
- IoCtx_t * lpo
+ IoCtx_t * lpo,
+ recvbuf_t * buff
)
{
- BOOL rc;
-
lpo->onIoDone = &OnSerialReadComplete;
- lpo->recv_buf = buff;
- lpo->flRawMem = 0;
- lpo->rio = rio;
- buff->fd = rio->fd;
-
- rc = ReadFile((HANDLE)_get_osfhandle(rio->fd),
- (char*)buff->recv_buffer + buff->recv_length,
- sizeof(buff->recv_buffer) - buff->recv_length,
- NULL, &lpo->ol);
- if (!rc)
- return IoResultCheck(GetLastError(), lpo,
- "Can't read from Refclock");
- return TRUE;
+ /* keep 'buff->recv_length' for line continuation! */
+ return QueueSerialReadCommon(lpo, buff);
}
-/*
- * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* IO completion thread callback. Takes a time stamp and offloads the
* real work to the worker pool ASAP.
*/
@@ -888,27 +813,31 @@ OnSerialReadComplete(
IoCtx_t * lpo
)
{
- RIO_t * rio;
- recvbuf_t * buff;
+ static const char * const msg =
+ "OnSerialReadComplete: read from device failed";
- /* check and bail out if operation failed */
- if (!IoResultCheck(lpo->errCode, lpo,
- "Read from Refclock failed"))
+ /* Make sure this RIO is not closed. */
+ if (NULL == getRioFromIoCtx(lpo, key, msg))
return;
- /* get & validate context and buffer. */
- rio = lpo->rio;
- buff = lpo->recv_buf;
- INSIST((ULONG_PTR)rio == key);
+ /* start next IO and leave if we hit an error */
+ if (lpo->errCode != ERROR_SUCCESS)
+ goto wait_again;
- /* Offload to worker pool */
- if (!QueueUserWorkItem(&OnSerialReadWorker, lpo, WT_EXECUTEDEFAULT)) {
- msyslog(LOG_ERR,
- "Can't offload to worker thread, will skip data: %m");
- IoCtxReset(lpo);
- buff->recv_length = 0;
- QueueSerialWait(rio, buff, lpo);
- }
+ /* Offload to worker pool, if there is data */
+ if (lpo->byteCount == 0)
+ goto wait_again;
+
+ if (QueueUserWorkItem(&OnSerialReadWorker, lpo, WT_EXECUTEDEFAULT))
+ return; /* successful regular exit! */
+
+ /* croak as we're throwing away data */
+ msyslog(LOG_ERR,
+ "Can't offload to worker thread, will skip data: %m");
+
+wait_again:
+ /* make sure the read is issued again */
+ IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
}
@@ -921,38 +850,64 @@ OnSerialReadComplete(
* discipline. Since this involves allocation of additional buffers and
* string parsing/copying, it is offloaded to the worker thread pool so
* the IO completion thread can resume faster.
+ *
+ * !!ATTENTION!!
+ * This function runs on an arbitrary worker thread, and not under the
+ * protection of the shared lock! Accessing the RIO structure must set
+ * the lock explicitely!
*/
static DWORD WINAPI
-OnSerialReadWorker(void * ctx)
+OnSerialReadWorker(
+void * ctx
+)
{
IoCtx_t * lpo;
+ SharedLock_t * slock;
recvbuf_t * buff, *obuf;
- RIO_t * rio;
char *sptr, *send, *dptr;
BOOL eol;
char ch;
+ BOOL active;
+ u_long rcvcnt;
+ RIO_t * rio;
/* Get context back */
lpo = (IoCtx_t*)ctx;
buff = lpo->recv_buf;
- rio = lpo->rio;
- /*
- * ignore 0 bytes read due to closure on fd.
+
+ /* query the lock structure under mutual exclusion */
+ active = FALSE;
+ rcvcnt = 0;
+ if (NULL != (slock = slAttachShared(lpo->slock))) {
+ if (NULL != (rio = slock->rsrc.rio)) {
+ active = TRUE;
+ rcvcnt = InterlockedIncrement(&rio->recvcount) - 1;
+ }
+ slDetachShared(slock);
+ }
+
+ /* bail out if we're disconnected now */
+ if (!active) {
+ IoCtxRelease(ctx);
+ return 0;
+ }
+
+ /* Ignore zero-byte reads due to closure on fd.
* Eat the first line of input as it's possibly partial.
*/
- if (lpo->byteCount && rio->recvcount++) {
+ if (lpo->byteCount && rcvcnt) {
/* account for additional input */
buff->recv_length += (int)lpo->byteCount;
/*
- * Now mimic the Unix line discipline.
+ * Now mimic the Unix line discipline.
*/
sptr = (char *)buff->recv_buffer;
send = sptr + buff->recv_length;
obuf = NULL;
dptr = NULL;
- /* hack #1: eat away leading CR/LF if here is any */
+ /* hack #1: eat away leading CR/LF if there is any */
while (sptr != send) {
ch = *sptr;
if (ch != '\n' && ch != '\r')
@@ -960,51 +915,47 @@ OnSerialReadWorker(void * ctx)
sptr++;
}
- while (sptr != send)
- {
+ while (sptr != send) {
/* get new buffer to store line */
obuf = get_free_recv_buffer_alloc();
- obuf->fd = rio->fd;
- obuf->receiver = &process_refclock_packet;
- obuf->dstadr = NULL;
- obuf->recv_peer = rio->srcclock;
+ obuf->fd = buff->fd;
+ obuf->receiver = buff->receiver;
+ obuf->dstadr = NULL;
+ obuf->recv_peer = buff->recv_peer;
set_serial_recv_time(obuf, lpo);
- /*
- * Copy data to new buffer, convert CR to LF on
+ /* Copy data to new buffer, convert CR to LF on
* the fly. Stop after either.
*/
dptr = (char *)obuf->recv_buffer;
- eol = FALSE;
+ eol = FALSE;
while (sptr != send && !eol) {
- ch = *sptr++;
- if ('\r' == ch) {
+ ch = *sptr++;
+ if ('\r' == ch)
ch = '\n';
- }
*dptr++ = ch;
eol = ('\n' == ch);
}
obuf->recv_length =
- (int)(dptr - (char *)obuf->recv_buffer);
+ (int)(dptr - (char *)obuf->recv_buffer);
- /*
- * If NL found, push this buffer and prepare to
- * get a new one.
+ /* If NL found, push this buffer and prepare to
+ * get a new one. Be prepared for concurrent
+ * removal of the clock...
*/
if (eol) {
- add_full_recv_buffer(obuf);
- SetEvent(WaitableIoEventHandle);
- obuf = NULL;
+ slQueueLocked(lpo->slock, slRefClockOK, obuf);
+ obuf = NULL; /* consumed in any case */
}
}
- /*
- * If we still have an output buffer, continue to fill
- * it again.
+ /* If we still have an output buffer, prepare it to be
+ * used for added input from the ComPort. Otherwise
+ * use the current input buffer again.
*/
if (obuf) {
obuf->recv_length =
- (int)(dptr - (char *)obuf->recv_buffer);
+ (int)(dptr - (char *)obuf->recv_buffer);
freerecvbuf(buff);
buff = obuf;
} else {
@@ -1015,8 +966,8 @@ OnSerialReadWorker(void * ctx)
buff->recv_length = 0;
}
- IoCtxReset(lpo);
- QueueSerialWait(rio, buff, lpo);
+ /* start next round -- must hold the lock during that! */
+ IoCtxStartLocked(lpo, QueueSerialWait, buff);
return 0;
}
@@ -1032,89 +983,67 @@ OnSerialReadWorker(void * ctx)
* -------------------------------------------------------------------
*/
-static BOOL
+static BOOL __fastcall
QueueRawSerialRead(
- RIO_t * rio,
- recvbuf_t * buff,
- IoCtx_t * lpo
+ IoCtx_t * lpo,
+ recvbuf_t * buff
)
{
- BOOL rc;
-
- lpo->onIoDone = OnRawSerialReadComplete;
- lpo->recv_buf = buff;
- lpo->flRawMem = 0;
- lpo->rio = rio;
- buff->fd = rio->fd;
-
- rc = ReadFile((HANDLE)_get_osfhandle(rio->fd),
- buff->recv_buffer,
- sizeof(buff->recv_buffer),
- NULL, &lpo->ol);
- if (!rc)
- return IoResultCheck(GetLastError(), lpo,
- "Can't read raw from Refclock");
- return TRUE;
+ lpo->onIoDone = OnRawSerialReadComplete;
+ buff->recv_length = 0;
+ return QueueSerialReadCommon(lpo, buff);
}
-
-static void
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * IO completion thread callback. Takes a time stamp and offloads the
+ * real work to the worker pool ASAP.
+ */
+static void
OnRawSerialReadComplete(
ULONG_PTR key,
IoCtx_t * lpo
)
{
- RIO_t * rio;
- recvbuf_t * buff;
+ static const char * const msg =
+ "OnRawSerialReadComplete: read from device failed";
- /* check and bail out if operation failed */
- if (!IoResultCheck(lpo->errCode, lpo,
- "Raw read from Refclock failed"))
+ recvbuf_t * buff = lpo->recv_buf;
+ RIO_t * rio = getRioFromIoCtx(lpo, key, msg);
+ /* Make sure this RIO is not closed. */
+ if (rio == NULL)
return;
- /* get & validate context and buffer. */
- rio = lpo->rio;
- buff = lpo->recv_buf;
- INSIST((ULONG_PTR)rio == key);
-
- /* ignore 0 bytes read. */
- if (lpo->byteCount > 0) {
+ /* start next IO and leave if we hit an error */
+ if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
buff->recv_length = (int)lpo->byteCount;
- buff->dstadr = NULL;
- buff->receiver = process_refclock_packet;
- buff->recv_peer = rio->srcclock;
set_serial_recv_time(buff, lpo);
- add_full_recv_buffer(buff);
- SetEvent(WaitableIoEventHandle);
+ slQueueLocked(lpo->slock, slRefClockOK, buff);
buff = get_free_recv_buffer_alloc();
}
-
- buff->recv_length = 0;
- QueueSerialWait(rio, buff, lpo);
+ IoCtxStartLocked(lpo, QueueSerialWait, buff);
}
-static inline void
+static void
set_serial_recv_time(
recvbuf_t * obuf,
IoCtx_t * lpo
)
{
- /*
- * Time stamp assignment is interesting. If we
+ /* Time stamp assignment is interesting. If we
* have a DCD stamp, we use it, otherwise we use
* the FLAG char event time, and if that is also
* not / no longer available we use the arrival
* time.
*/
- if (lpo->flTsDCDS)
- obuf->recv_time = lpo->DCDSTime;
- else if (lpo->flTsFlag)
- obuf->recv_time = lpo->FlagTime;
+ if (lpo->aux.flTsDCDS)
+ obuf->recv_time = lpo->aux.DCDSTime;
+ else if (lpo->aux.flTsFlag)
+ obuf->recv_time = lpo->aux.FlagTime;
else
- obuf->recv_time = lpo->RecvTime;
+ obuf->recv_time = lpo->aux.RecvTime;
- lpo->flTsDCDS = lpo->flTsFlag = 0; /* use only once... */
+ lpo->aux.flTsDCDS = lpo->aux.flTsFlag = 0; /* use only once! */
}
@@ -1138,30 +1067,41 @@ async_write(
unsigned int count
)
{
- IoCtx_t * lpo;
+ static const char * const msg =
+ "async_write: cannot schedule device write";
+ static const char * const dmsg =
+ "overlapped IO data buffer";
+
+ IoCtx_t * lpo = NULL;
+ void * buff = NULL;
+ HANDLE hnd = NULL;
BOOL rc;
- lpo = IoCtxAlloc(NULL);
- if (lpo == NULL) {
- DPRINTF(1, ("async_write: out of memory\n"));
- errno = ENOMEM;
- return -1;
- }
+ hnd = (HANDLE)_get_osfhandle(fd);
+ if (hnd == INVALID_HANDLE_VALUE)
+ goto fail;
+ if (NULL == (buff = IOCPLPoolMemDup(data, count, dmsg)))
+ goto fail;
+ if (NULL == (lpo = IoCtxAlloc(NULL, NULL)))
+ goto fail;
+ lpo->io.hnd = hnd;
lpo->onIoDone = OnSerialWriteComplete;
- lpo->trans_buf = emalloc(count);
+ lpo->trans_buf = buff;
lpo->flRawMem = 1;
- memcpy(lpo->trans_buf, data, count);
- rc = WriteFile((HANDLE)_get_osfhandle(fd),
- lpo->trans_buf, count,
+ rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count,
NULL, &lpo->ol);
- if (!rc && !IoResultCheck(GetLastError(), lpo,
- "Can't write to Refclock")) {
- errno = EBADF;
- return -1;
- }
- return count;
+ if (rc || IoResultCheck(GetLastError(), lpo, msg))
+ return count; /* normal/success return */
+
+ errno = EBADF;
+ return -1;
+
+fail:
+ IoCtxFree(lpo);
+ IOCPLPoolFree(buff, dmsg);
+ return -1;
}
static void
@@ -1170,12 +1110,15 @@ OnSerialWriteComplete(
IoCtx_t * lpo
)
{
- /* set RIO and force silent cleanup if no error */
- lpo->rio = (RIO_t *)key;
- if (ERROR_SUCCESS == lpo->errCode)
- lpo->errCode = ERROR_OPERATION_ABORTED;
- IoResultCheck(lpo->errCode, lpo,
- "Write to Refclock failed");
+ /* This is really trivial: Let 'getRioFromIoCtx()' do all the
+ * error processing, and it returns with a valid RIO, just
+ * drop the complete context.
+ */
+ static const char * const msg =
+ "OnSerialWriteComplete: serial output failed";
+
+ if (NULL != getRioFromIoCtx(lpo, key, msg))
+ IoCtxRelease(lpo);
}
@@ -1202,7 +1145,7 @@ OnPpsDummyRead(
RIO_t * rio;
rio = (RIO_t *)key;
- lpo->devCtx = DevCtxAttach(rio->device_context);
+ lpo->devCtx = DevCtxAttach(rio->device_ctx);
SetEvent(lpo->ppswake);
}
@@ -1224,7 +1167,7 @@ ntp_pps_attach_device(
ZERO(myIoCtx);
dev = NULL;
myEvt = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (NULL == myEvt)
+ if (myEvt == NULL)
goto done;
myIoCtx.ppswake = myEvt;
@@ -1271,22 +1214,21 @@ ntp_pps_read(
SetLastError(ERROR_INVALID_PARAMETER);
return FALSE;
}
- /*
- ** Reading from shared memory in a lock-free fashion can be
- ** a bit tricky, since we have to read the components in the
- ** opposite direction from the write, and the compiler must
- ** not reorder the read sequence.
- ** We use interlocked ops and a volatile data source to avoid
- ** reordering on compiler and CPU level. The interlocked
- ** instruction act as full barriers -- we need only aquire
- ** semantics, but we don't have them before VS2010.
- */
+ /* Reading from shared memory in a lock-free fashion can be
+ * a bit tricky, since we have to read the components in the
+ * opposite direction from the write, and the compiler must
+ * not reorder the read sequence.
+ * We use interlocked ops and a volatile data source to avoid
+ * reordering on compiler and CPU level. The interlocked
+ * instruction act as full barriers -- we need only acquire
+ * semantics, but we don't have them before VS2010.
+ */
repc = 3;
do {
- InterlockedExchange((PLONG)&covc, dev->cov_count);
+ covc = InterlockedExchangeAdd((PLONG)&dev->cov_count, 0);
ppsbuf = dev->pps_buff + (covc & PPS_QUEUE_MSK);
*data = ppsbuf->data;
- InterlockedExchange((PLONG)&guard, ppsbuf->cov_count);
+ guard = InterlockedExchangeAdd((PLONG)&ppsbuf->cov_count, 0);
guard ^= covc;
} while (guard && ~guard && --repc);
@@ -1297,217 +1239,368 @@ ntp_pps_read(
return TRUE;
}
-/*
- * Add a reference clock data structures I/O handles to
- * the I/O completion port. Return 1 if any error.
+/* --------------------------------------------------------------------
+ * register and unregister refclock IOs with the IO engine
+ * --------------------------------------------------------------------
+ */
+
+/* Add a reference clock data structures I/O handles to
+ * the I/O completion port. Return FALSE if any error,
+ * TRUE on success
*/
-int
+BOOL
io_completion_port_add_clock_io(
RIO_t *rio
)
{
+ static const char * const msgh =
+ "io_completion_port_add_clock_io";
+
IoCtx_t * lpo;
- DevCtx_t * dev;
- recvbuf_t * buff;
HANDLE h;
+ SharedLock_t * slock = NULL;
+
+ /* preset to clear state for error cleanup:*/
+ rio->ioreg_ctx = NULL;
+ rio->device_ctx = NULL;
h = (HANDLE)_get_osfhandle(rio->fd);
- if (NULL == CreateIoCompletionPort(
- h,
- hIoCompletionPort,
- (ULONG_PTR)rio,
- 0)) {
- msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m");
- return 1;
+ if (h == INVALID_HANDLE_VALUE) {
+ msyslog(LOG_ERR, "%s: COM port FD not valid",
+ msgh);
+ goto fail;
}
- dev = DevCtxAlloc();
- if (NULL == dev) {
- msyslog(LOG_ERR, "Can't allocate device context for i/o completion port: %m");
- return 1;
+ ;
+ if ( ! (rio->ioreg_ctx = slock = slCreate(rio))) {
+ msyslog(LOG_ERR, "%s: Failed to create shared lock",
+ msgh);
+ goto fail;
}
- rio->device_context = DevCtxAttach(dev);
- lpo = IoCtxAlloc(dev);
- if (NULL == lpo) {
- msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
- return 1;
+ slock->handles[0] = h;
+ slock->riofd = rio->fd;
+ slock->rsrc.rio = rio;
+
+ if ( ! (rio->device_ctx = DevCtxAlloc())) {
+ msyslog(LOG_ERR, "%s: Failed to allocate device context",
+ msgh);
+ goto fail;
}
- buff = get_free_recv_buffer_alloc();
- buff->recv_length = 0;
- QueueSerialWait(rio, buff, lpo);
- return 0;
+ if ( ! (lpo = IoCtxAlloc(slock, rio->device_ctx))) {
+ msyslog(LOG_ERR, "%: Failed to allocate IO context",
+ msgh);
+ goto fail;
+ }
+
+ if ( ! CreateIoCompletionPort(h, hndIOCPLPort, (ULONG_PTR)rio, 0)) {
+ msyslog(LOG_ERR, "%s: Can't add COM port to i/o completion port: %m",
+ msgh);
+ goto fail;
+ }
+ lpo->io.hnd = h;
+ return QueueSerialWait(lpo, get_free_recv_buffer_alloc());
+
+fail:
+ rio->ioreg_ctx = slDetach(rio->ioreg_ctx);
+ rio->device_ctx = DevCtxDetach(rio->device_ctx);
+ return FALSE;
}
+/* ----------------------------------------------------------------- */
void
io_completion_port_remove_clock_io(
RIO_t *rio
)
{
- if (rio)
- DevCtxDetach((DevCtx_t *)rio->device_context);
+ SharedLock_t * slock = NULL;
+ if (rio && NULL != (slock = slAttachExclusive(rio->ioreg_ctx))) {
+ slDetach(slock);
+
+ slock->handles[0] = INVALID_HANDLE_VALUE;
+ slock->handles[1] = INVALID_HANDLE_VALUE;
+ slock->rsrc.rio = NULL;
+ slock->riofd = -1;
+
+ rio->device_ctx = DevCtxDetach(rio->device_ctx);
+ rio->ioreg_ctx = slDetachExclusive(slock);
+ }
}
/*
- * Queue a receiver on a socket. Returns 0 if no buffer can be queued
+ * -------------------------------------------------------------------
+ * Socket IO stuff
+ * -------------------------------------------------------------------
+ */
+
+/* Queue a receiver on a socket. Returns 0 if no buffer can be queued
*
- * Note: As per the winsock documentation, we use WSARecvFrom. Using
- * ReadFile() is less efficient.
+ * Note: As per the WINSOCK documentation, we use WSARecvFrom. Using
+ * ReadFile() is less efficient. Also, WSARecvFrom delivers
+ * the remote network address. With ReadFile, getting this
+ * becomes a chore.
*/
-static BOOL
+static BOOL __fastcall
QueueSocketRecv(
- SOCKET s,
- recvbuf_t * buff,
- IoCtx_t * lpo
+ IoCtx_t * lpo,
+ recvbuf_t * buff
)
{
- WSABUF wsabuf;
- DWORD Flags;
- int rc;
+ static const char * const msg =
+ "QueueSocketRecv: cannot schedule socket receive";
+
+ WSABUF wsabuf;
+ DWORD Flags;
+ int rc;
lpo->onIoDone = OnSocketRecv;
lpo->recv_buf = buff;
lpo->flRawMem = 0;
- lpo->rio = NULL;
-
- Flags = 0;
- buff->fd = s;
+
+ buff->fd = lpo->io.sfd;
buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
+ buff->receiver = receive;
+ buff->dstadr = lpo->slock->rsrc.ept;
+
wsabuf.buf = (char *)buff->recv_buffer;
wsabuf.len = sizeof(buff->recv_buffer);
- rc = WSARecvFrom(buff->fd, &wsabuf, 1, NULL, &Flags,
+ Flags = 0; /* in/out parameter, must be valid! */
+ rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &Flags,
&buff->recv_srcadr.sa, &buff->recv_srcadr_len,
&lpo->ol, NULL);
- if (SOCKET_ERROR == rc)
- return IoResultCheck(GetLastError(), lpo,
- "Can't read from Socket");
- return TRUE;
+ return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg);
}
-
-static void
+/* ----------------------------------------------------------------- */
+static void
OnSocketRecv(
ULONG_PTR key,
IoCtx_t * lpo
)
{
- recvbuf_t * buff;
- recvbuf_t * newbuff;
- struct interface * inter = (struct interface *)key;
-
- REQUIRE(NULL != lpo);
- REQUIRE(NULL != lpo->recv_buf);
+ static const char * const msg =
+ "OnSocketRecv: receive from socket failed";
+
+ recvbuf_t * buff = NULL;
+ SharedLock_t * slock = NULL;
- /* check and bail out if operation failed */
- if (!IoResultCheck(lpo->errCode, lpo,
- "Read from Socket failed"))
+ /* Make sure this endpoint is not closed. */
+ endpt * ep = getEndptFromIoCtx(lpo, key, msg);
+ if (ep == NULL)
return;
- /*
- * Convert the overlapped pointer back to a recvbuf pointer.
- * Fetch items that are lost when the context is queued again.
+ /* We want to start a new read before we process the buffer.
+ * Since we must not use the context object once it is in
+ * another IO, we go through some pains to read everything
+ * before going out for another read request.
+ * We also need an extra hold to the SLOCK structure.
*/
- buff = lpo->recv_buf;
- buff->recv_time = lpo->RecvTime;
- buff->recv_length = (int)lpo->byteCount;
+ slock = slAttach(lpo->slock);
+ if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
+ /* keep input buffer, create new one for IO */
+ buff = lpo->recv_buf;
+ lpo->recv_buf = get_free_recv_buffer_alloc();
- /*
- * Get a new recv buffer for the replacement socket receive
+ buff->recv_time = lpo->aux.RecvTime;
+ buff->recv_length = (int)lpo->byteCount;
+
+ } /* Note: else we use the current buffer again */
+ IoCtxStartLocked(lpo, QueueSocketRecv, lpo->recv_buf);
+ /* below this, any usage of 'lpo' is invalid! */
+
+ /* If we have a buffer, do some bookkeeping and other chores,
+ * then feed it to the input queue. And we can be sure we have
+ * a packet here, so we can update the stats.
*/
- newbuff = get_free_recv_buffer_alloc();
- if (NULL != newbuff) {
- QueueSocketRecv(inter->fd, newbuff, lpo);
- } else {
- IoCtxFree(lpo);
- msyslog(LOG_ERR, "Can't add I/O request to socket");
- }
- DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n",
- (MODE_BROADCAST == get_packet_mode(buff))
+ if (buff != NULL) {
+ INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr));
+ DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n",
+ (MODE_BROADCAST == get_packet_mode(buff))
? " **** Broadcast "
: "",
- (int)buff->fd, stoa(&buff->recv_srcadr),
- get_packet_mode(buff)));
+ (int)buff->fd, stoa(&buff->recv_srcadr),
+ get_packet_mode(buff)));
+
+ if (slAttachShared(slock)) {
+ BOOL epOK = slEndPointOK(slock);
+ if (epOK)
+ InterlockedIncrement(&slock->rsrc.ept->received);
+ slDetachShared(slock);
+ if (epOK) {
+ InterlockedIncrement(&packets_received);
+ InterlockedIncrement(&handler_pkts);
+ }
+ }
- /*
- * If we keep it add some info to the structure
- */
- if (buff->recv_length && !inter->ignore_packets) {
- INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr));
- buff->receiver = &receive;
- buff->dstadr = inter;
- packets_received++;
- handler_pkts++;
- inter->received++;
- add_full_recv_buffer(buff);
+ DPRINTF(2, ("Received %d bytes fd %d in buffer %p from %s\n",
+ buff->recv_length, (int)buff->fd, buff,
+ stoa(&buff->recv_srcadr)));
+ slQueueLocked(slock, slEndPointOK, buff);
+ }
+ slDetach(slock);
+}
- DPRINTF(2, ("Received %d bytes fd %d in buffer %p from %s\n",
- buff->recv_length, (int)buff->fd, buff,
- stoa(&buff->recv_srcadr)));
+/* ----------------------------------------------------------------- */
+static void
+OnSocketSend(
+ ULONG_PTR key,
+ IoCtx_t * lpo
+ )
+{
+ /* this is somewhat easier: */
+ static const char * const msg =
+ "OnSocketRecv: send to socket failed";
+
+ SharedLock_t * slock = NULL;
+ endpt * ep = getEndptFromIoCtx(lpo, key, msg);
+ /* Make sure this endpoint is not closed. */
+ if (ep == NULL)
+ return;
- /*
- * Now signal we have something to process
- */
- SetEvent(WaitableIoEventHandle);
- } else
- freerecvbuf(buff);
+ if (lpo->errCode != ERROR_SUCCESS)
+ slock = slAttachShared(lpo->slock);
+ if (slock) {
+ BOOL epOK = slEndPointOK(slock);
+ if (epOK) {
+ InterlockedIncrement(&slock->rsrc.ept->notsent);
+ InterlockedDecrement(&slock->rsrc.ept->sent);
+ }
+ slDetachShared(slock);
+ if (epOK) {
+ InterlockedIncrement(&packets_notsent);
+ InterlockedDecrement(&packets_sent);
+ }
+ }
+ IoCtxRelease(lpo);
}
-
-/*
- * Add a socket handle to the I/O completion port, and send
- * NTP_RECVS_PER_SOCKET recv requests to the kernel.
+/* --------------------------------------------------------------------
+ * register and de-register interface endpoints with the IO engine
+ * --------------------------------------------------------------------
*/
-int
-io_completion_port_add_socket(
- SOCKET fd,
- struct interface * inter
+BOOL
+io_completion_port_add_interface(
+ endpt * ep
)
{
- IoCtx_t * lpo;
- recvbuf_t * buff;
- int n;
+ /* Registering an endpoint is simple: allocate a shared lock for
+ * the enpoint and return if the allocation was successful.
+ */
+ ep->ioreg_ctx = slCreate(ep);
+ return ep->ioreg_ctx != NULL;
+}
+/* ----------------------------------------------------------------- */
+void
+io_completion_port_remove_interface(
+ endpt * ep
+ )
+{
+ /* Removing an endpoint is simple, too: Lock the shared lock
+ * for write access, then invalidate the handles and the
+ * endpoint pointer. Do an additional detach and leave the
+ * write lock.
+ */
+ SharedLock_t * slock = slAttachExclusive(ep->ioreg_ctx);
+ if (slock != NULL) {
+ slDetach(slock);
- if (fd != INVALID_SOCKET) {
- if (NULL == CreateIoCompletionPort((HANDLE)fd,
- hIoCompletionPort, (ULONG_PTR)inter, 0)) {
- msyslog(LOG_ERR,
- "Can't add socket to i/o completion port: %m");
- return 1;
- }
+ slock->handles[0] = INVALID_HANDLE_VALUE;
+ slock->handles[1] = INVALID_HANDLE_VALUE;
+ slock->rsrc.ept = NULL;
+
+ ep->ioreg_ctx = slDetachExclusive(slock);
}
+}
+
+/* --------------------------------------------------------------------
+ * register and de-register sockets for an endpoint
+ * --------------------------------------------------------------------
+ */
- /*
- * Windows 2000 bluescreens with bugcheck 0x76
- * PROCESS_HAS_LOCKED_PAGES at ntpd process
- * termination when using more than one pending
- * receive per socket. A runtime version test
- * would allow using more on newer versions
- * of Windows.
+/* Add a socket handle to the I/O completion port, and send
+ * NTP_RECVS_PER_SOCKET receive requests to the kernel.
+ */
+BOOL
+io_completion_port_add_socket(
+ SOCKET sfd,
+ endpt * ep,
+ BOOL bcast
+ )
+{
+ /* Assume the endpoint is already registered. Set the socket
+ * handle into the proper slot, and then start up the IO engine.
*/
+ static const char * const msg =
+ "Can't add socket to i/o completion port";
-#define WINDOWS_RECVS_PER_SOCKET 1
+ IoCtx_t * lpo;
+ size_t n;
+ ULONG_PTR key;
+ SharedLock_t * slock = NULL;
- for (n = 0; n < WINDOWS_RECVS_PER_SOCKET; n++) {
+ key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast;
- buff = get_free_recv_buffer_alloc();
- lpo = IoCtxAlloc(NULL);
- if (lpo == NULL)
- {
- msyslog(LOG_ERR
- , "Can't allocate IO completion context: %m");
- return 1;
- }
+ if (NULL == (slock = slAttachExclusive(ep->ioreg_ctx))) {
+ msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting",
+ ep);
+ exit(1);
+ } else {
+ endpt * rep = slock->rsrc.ept;
+ slock->handles[!!bcast] = (HANDLE)sfd;
+ slDetachExclusive(slock);
+ INSIST(rep == ep);
+ }
- QueueSocketRecv(fd, buff, lpo);
+ if (NULL == CreateIoCompletionPort((HANDLE)sfd,
+ hndIOCPLPort, key, 0))
+ {
+ msyslog(LOG_ERR, "%s: %m", msg);
+ goto fail;
+ }
+ for (n = s_SockRecvSched; n > 0; --n) {
+ if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) {
+ msyslog(LOG_ERR, "%s: no read buffer: %m", msg);
+ goto fail;
+ }
+ lpo->io.sfd = sfd;
+ if (!QueueSocketRecv(lpo, get_free_recv_buffer_alloc()))
+ goto fail;
+ }
+ return TRUE;
+fail:
+ ep->ioreg_ctx = slDetach(ep->ioreg_ctx);
+ return FALSE;
+}
+/* ----------------------------------------------------------------- */
+void
+io_completion_port_remove_socket(
+ SOCKET fd,
+ endpt * ep
+ )
+{
+ /* Lock the shared lock for write, then search the given
+ * socket handle and replace it with an invalid handle value.
+ */
+ SharedLock_t * lp = slAttachExclusive(ep->ioreg_ctx);
+ HANDLE sh = (HANDLE)fd;
+ if (lp != NULL) {
+ if (lp->handles[0] == sh)
+ lp->handles[0] = INVALID_HANDLE_VALUE;
+ else if (lp->handles[1] == sh)
+ lp->handles[1] = INVALID_HANDLE_VALUE;
+ slDetachExclusive(lp);
}
- return 0;
}
-/*
- * io_completion_port_sendto() -- sendto() replacement for Windows
+/* --------------------------------------------------------------------
+ * I/O API functions for endpoints / interfaces
+ * --------------------------------------------------------------------
+ */
+
+/* io_completion_port_sendto() -- sendto() replacement for Windows
*
* Returns len after successful send.
* Returns -1 for any error, with the error code available via
@@ -1515,73 +1608,65 @@ io_completion_port_add_socket(
*/
int
io_completion_port_sendto(
- SOCKET fd,
+ endpt * ep,
+ SOCKET sfd,
void * pkt,
size_t len,
sockaddr_u * dest
)
{
- static u_long time_next_ifscan_after_error;
- WSABUF wsabuf;
- DWORD octets_sent;
- DWORD Result;
- int errval;
- int AddrLen;
+ static const char * const msg =
+ "sendto: cannot schedule socket send";
+ static const char * const dmsg =
+ "overlapped IO data buffer";
+
+ IoCtx_t * lpo = NULL;
+ void * dbuf = NULL;
+ WSABUF wsabuf;
+ int rc;
if (len > INT_MAX)
len = INT_MAX;
- wsabuf.buf = (void *)pkt;
- wsabuf.len = (DWORD)len;
- AddrLen = SOCKLEN(dest);
- octets_sent = 0;
-
- Result = WSASendTo(fd, &wsabuf, 1, &octets_sent, 0,
- &dest->sa, AddrLen, NULL, NULL);
- errval = GetLastError();
- if (SOCKET_ERROR == Result) {
- if (ERROR_UNEXP_NET_ERR == errval) {
- /*
- * We get this error when trying to send if the
- * network interface is gone or has lost link.
- * Rescan interfaces to catch on sooner, but no
- * more often than once per minute. Once ntpd
- * is able to detect changes without polling
- * this should be unneccessary
- */
- if (time_next_ifscan_after_error < current_time) {
- time_next_ifscan_after_error = current_time + 60;
- timer_interfacetimeout(current_time);
- }
- DPRINTF(4, ("sendto unexpected network error, interface may be down\n"));
- } else {
- msyslog(LOG_ERR, "WSASendTo(%s) error %m",
- stoa(dest));
- }
- SetLastError(errval);
- return -1;
- }
- if ((DWORD)len != octets_sent) {
- msyslog(LOG_ERR, "WSASendTo(%s) sent %u of %d octets",
- stoa(dest), octets_sent, len);
- SetLastError(ERROR_BAD_LENGTH);
- return -1;
- }
+ if (NULL == (dbuf = IOCPLPoolMemDup(pkt, len, dmsg)))
+ goto fail;
+ /* We register the IO operation against the shared lock here.
+ * This is not strictly necessary, since the callback does not
+ * access the endpoint structure in any way...
+ */
+ if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL)))
+ goto fail;
- DPRINTF(4, ("sendto %s %d octets\n", stoa(dest), len));
+ lpo->onIoDone = OnSocketSend;
+ lpo->trans_buf = dbuf;
+ lpo->flRawMem = 1;
+ lpo->io.sfd = sfd;
- return (int)len;
-}
+ wsabuf.buf = (void*)lpo->trans_buf;
+ wsabuf.len = (DWORD)len;
+ rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0,
+ &dest->sa, SOCKLEN(dest),
+ &lpo->ol, NULL);
+ if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg))
+ return (int)len; /* normal/success return */
+ errno = EBADF;
+ return -1;
-/*
+fail:
+ IoCtxFree(lpo);
+ IOCPLPoolFree(dbuf, dmsg);
+ return -1;
+}
+
+/* --------------------------------------------------------------------
* GetReceivedBuffers
* Note that this is in effect the main loop for processing requests
* both send and receive. This should be reimplemented
*/
int
-GetReceivedBuffers()
+GetReceivedBuffers(void)
{
DWORD index;
HANDLE ready;
@@ -1589,9 +1674,9 @@ GetReceivedBuffers()
have_packet = FALSE;
while (!have_packet) {
- index = WaitForMultipleObjects(ActiveWaitHandles,
- WaitHandles, FALSE,
- INFINITE);
+ index = WaitForMultipleObjectsEx(
+ ActiveWaitHandles, WaitHandles,
+ FALSE, INFINITE, TRUE);
switch (index) {
case WAIT_OBJECT_0 + 0: /* Io event */
@@ -1638,4 +1723,3 @@ GetReceivedBuffers()
#else /*defined(HAVE_IO_COMPLETION_PORT) */
static int NonEmptyCompilationUnit;
#endif /*!defined(HAVE_IO_COMPLETION_PORT) */
-