diff options
author | Alexander Motin <mav@FreeBSD.org> | 2014-05-08 16:59:36 +0000 |
---|---|---|
committer | Alexander Motin <mav@FreeBSD.org> | 2014-05-08 16:59:36 +0000 |
commit | afb8674c798ab3348c48e1bc37d746f04f647c7c (patch) | |
tree | 5719ad8f13ab407a7152d049dbc67572936ffb15 /cddl/compat | |
parent | fa1142344294e57f9f646eb16897e9bea93424e5 (diff) | |
download | src-test2-afb8674c798ab3348c48e1bc37d746f04f647c7c.tar.gz src-test2-afb8674c798ab3348c48e1bc37d746f04f647c7c.zip |
Notes
Diffstat (limited to 'cddl/compat')
-rw-r--r-- | cddl/compat/opensolaris/include/thread_pool.h | 99 | ||||
-rw-r--r-- | cddl/compat/opensolaris/misc/thread_pool.c | 430 | ||||
-rw-r--r-- | cddl/compat/opensolaris/misc/thread_pool_impl.h | 99 |
3 files changed, 598 insertions, 30 deletions
diff --git a/cddl/compat/opensolaris/include/thread_pool.h b/cddl/compat/opensolaris/include/thread_pool.h index 25ac55dedea7..3bd23a6dd2aa 100644 --- a/cddl/compat/opensolaris/include/thread_pool.h +++ b/cddl/compat/opensolaris/include/thread_pool.h @@ -1,39 +1,78 @@ -/*- - * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> - * All rights reserved. +/* + * CDDL HEADER START * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. * - * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS - * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ + +/* + * Copyright 2006 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ + +/* * $FreeBSD$ */ -#ifndef _OPENSOLARIS_THREAD_POOL_H_ -#define _OPENSOLARIS_THREAD_POOL_H_ +#ifndef _THREAD_POOL_H_ +#define _THREAD_POOL_H_ + +#pragma ident "%Z%%M% %I% %E% SMI" + +#include <sys/types.h> +#include <thread.h> +#include <pthread.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct tpool tpool_t; /* opaque thread pool descriptor */ + +#if defined(__STDC__) + +extern tpool_t *tpool_create(uint_t min_threads, uint_t max_threads, + uint_t linger, pthread_attr_t *attr); +extern int tpool_dispatch(tpool_t *tpool, + void (*func)(void *), void *arg); +extern void tpool_destroy(tpool_t *tpool); +extern void tpool_abandon(tpool_t *tpool); +extern void tpool_wait(tpool_t *tpool); +extern void tpool_suspend(tpool_t *tpool); +extern int tpool_suspended(tpool_t *tpool); +extern void tpool_resume(tpool_t *tpool); +extern int tpool_member(tpool_t *tpool); + +#else /* Non ANSI */ + +extern tpool_t *tpool_create(); +extern int tpool_dispatch(); +extern void tpool_destroy(); +extern void tpool_abandon(); +extern void tpool_wait(); +extern void tpool_suspend(); +extern int tpool_suspended(); +extern void tpool_resume(); +extern int tpool_member(); -typedef int tpool_t; +#endif /* __STDC__ */ -#define tpool_create(a, b, c, d) (0) -#define tpool_dispatch(pool, func, arg) func(arg) -#define tpool_wait(pool) do { } while (0) -#define tpool_destroy(pool) do { } while (0) +#ifdef __cplusplus +} +#endif -#endif /* !_OPENSOLARIS_THREAD_POOL_H_ */ +#endif /* _THREAD_POOL_H_ */ diff --git a/cddl/compat/opensolaris/misc/thread_pool.c b/cddl/compat/opensolaris/misc/thread_pool.c new file mode 100644 index 000000000000..a6a834fb2bbd --- /dev/null +++ b/cddl/compat/opensolaris/misc/thread_pool.c @@ -0,0 +1,430 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ + +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ + +#include <sys/cdefs.h> +__FBSDID("$FreeBSD$"); + +#pragma ident "%Z%%M% %I% %E% SMI" + +#include <stdlib.h> +#include <signal.h> +#include <errno.h> +#include "thread_pool_impl.h" + +typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */ + +static void +delete_pool(tpool_t *tpool) +{ + tpool_job_t *job; + + /* + * There should be no pending jobs, but just in case... + */ + for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { + tpool->tp_head = job->tpj_next; + free(job); + } + (void) pthread_attr_destroy(&tpool->tp_attr); + free(tpool); +} + +/* + * Worker thread is terminating. + */ +static void +worker_cleanup(void *arg) +{ + tpool_t *tpool = arg; + + if (--tpool->tp_current == 0 && + (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { + if (tpool->tp_flags & TP_ABANDON) { + pthread_mutex_unlock(&tpool->tp_mutex); + delete_pool(tpool); + return; + } + if (tpool->tp_flags & TP_DESTROY) + (void) pthread_cond_broadcast(&tpool->tp_busycv); + } + pthread_mutex_unlock(&tpool->tp_mutex); +} + +static void +notify_waiters(tpool_t *tpool) +{ + if (tpool->tp_head == NULL && tpool->tp_active == NULL) { + tpool->tp_flags &= ~TP_WAIT; + (void) pthread_cond_broadcast(&tpool->tp_waitcv); + } +} + +/* + * Called by a worker thread on return from a tpool_dispatch()d job. + */ +static void +job_cleanup(void *arg) +{ + tpool_t *tpool = arg; + pthread_t my_tid = pthread_self(); + tpool_active_t *activep; + tpool_active_t **activepp; + + pthread_mutex_lock(&tpool->tp_mutex); + /* CSTYLED */ + for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) { + activep = *activepp; + if (activep->tpa_tid == my_tid) { + *activepp = activep->tpa_next; + break; + } + } + if (tpool->tp_flags & TP_WAIT) + notify_waiters(tpool); +} + +static void * +tpool_worker(void *arg) +{ + tpool_t *tpool = (tpool_t *)arg; + int elapsed; + tpool_job_t *job; + void (*func)(void *); + tpool_active_t active; + sigset_t maskset; + + pthread_mutex_lock(&tpool->tp_mutex); + pthread_cleanup_push(worker_cleanup, tpool); + + /* + * This is the worker's main loop. + * It will only be left if a timeout or an error has occured. + */ + active.tpa_tid = pthread_self(); + for (;;) { + elapsed = 0; + tpool->tp_idle++; + if (tpool->tp_flags & TP_WAIT) + notify_waiters(tpool); + while ((tpool->tp_head == NULL || + (tpool->tp_flags & TP_SUSPEND)) && + !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { + if (tpool->tp_current <= tpool->tp_minimum || + tpool->tp_linger == 0) { + (void) pthread_cond_wait(&tpool->tp_workcv, + &tpool->tp_mutex); + } else { + struct timespec timeout; + + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += tpool->tp_linger; + if (pthread_cond_timedwait(&tpool->tp_workcv, + &tpool->tp_mutex, &timeout) != 0) { + elapsed = 1; + break; + } + } + } + tpool->tp_idle--; + if (tpool->tp_flags & TP_DESTROY) + break; + if (tpool->tp_flags & TP_ABANDON) { + /* can't abandon a suspended pool */ + if (tpool->tp_flags & TP_SUSPEND) { + tpool->tp_flags &= ~TP_SUSPEND; + (void) pthread_cond_broadcast(&tpool->tp_workcv); + } + if (tpool->tp_head == NULL) + break; + } + if ((job = tpool->tp_head) != NULL && + !(tpool->tp_flags & TP_SUSPEND)) { + elapsed = 0; + func = job->tpj_func; + arg = job->tpj_arg; + tpool->tp_head = job->tpj_next; + if (job == tpool->tp_tail) + tpool->tp_tail = NULL; + tpool->tp_njobs--; + active.tpa_next = tpool->tp_active; + tpool->tp_active = &active; + pthread_mutex_unlock(&tpool->tp_mutex); + pthread_cleanup_push(job_cleanup, tpool); + free(job); + /* + * Call the specified function. + */ + func(arg); + /* + * We don't know what this thread has been doing, + * so we reset its signal mask and cancellation + * state back to the initial values. + */ + sigfillset(&maskset); + (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); + (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, + NULL); + (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, + NULL); + pthread_cleanup_pop(1); + } + if (elapsed && tpool->tp_current > tpool->tp_minimum) { + /* + * We timed out and there is no work to be done + * and the number of workers exceeds the minimum. + * Exit now to reduce the size of the pool. + */ + break; + } + } + pthread_cleanup_pop(1); + return (arg); +} + +/* + * Create a worker thread, with all signals blocked. + */ +static int +create_worker(tpool_t *tpool) +{ + sigset_t maskset, oset; + pthread_t thread; + int error; + + sigfillset(&maskset); + (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); + error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); + (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); + return (error); +} + +tpool_t * +tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, + pthread_attr_t *attr) +{ + tpool_t *tpool; + int error; + + if (min_threads > max_threads || max_threads < 1) { + errno = EINVAL; + return (NULL); + } + + tpool = malloc(sizeof (*tpool)); + if (tpool == NULL) { + errno = ENOMEM; + return (NULL); + } + bzero(tpool, sizeof(*tpool)); + (void) pthread_mutex_init(&tpool->tp_mutex, NULL); + (void) pthread_cond_init(&tpool->tp_busycv, NULL); + (void) pthread_cond_init(&tpool->tp_workcv, NULL); + (void) pthread_cond_init(&tpool->tp_waitcv, NULL); + tpool->tp_minimum = min_threads; + tpool->tp_maximum = max_threads; + tpool->tp_linger = linger; + + /* make all pool threads be detached daemon threads */ + (void) pthread_attr_init(&tpool->tp_attr); + (void) pthread_attr_setdetachstate(&tpool->tp_attr, + PTHREAD_CREATE_DETACHED); + + return (tpool); +} + +/* + * Dispatch a work request to the thread pool. + * If there are idle workers, awaken one. + * Else, if the maximum number of workers has + * not been reached, spawn a new worker thread. + * Else just return with the job added to the queue. + */ +int +tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) +{ + tpool_job_t *job; + + if ((job = malloc(sizeof (*job))) == NULL) + return (-1); + bzero(job, sizeof(*job)); + job->tpj_next = NULL; + job->tpj_func = func; + job->tpj_arg = arg; + + pthread_mutex_lock(&tpool->tp_mutex); + + if (tpool->tp_head == NULL) + tpool->tp_head = job; + else + tpool->tp_tail->tpj_next = job; + tpool->tp_tail = job; + tpool->tp_njobs++; + + if (!(tpool->tp_flags & TP_SUSPEND)) { + if (tpool->tp_idle > 0) + (void) pthread_cond_signal(&tpool->tp_workcv); + else if (tpool->tp_current < tpool->tp_maximum && + create_worker(tpool) == 0) + tpool->tp_current++; + } + + pthread_mutex_unlock(&tpool->tp_mutex); + return (0); +} + +/* + * Assumes: by the time tpool_destroy() is called no one will use this + * thread pool in any way and no one will try to dispatch entries to it. + * Calling tpool_destroy() from a job in the pool will cause deadlock. + */ +void +tpool_destroy(tpool_t *tpool) +{ + tpool_active_t *activep; + + pthread_mutex_lock(&tpool->tp_mutex); + pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); + + /* mark the pool as being destroyed; wakeup idle workers */ + tpool->tp_flags |= TP_DESTROY; + tpool->tp_flags &= ~TP_SUSPEND; + (void) pthread_cond_broadcast(&tpool->tp_workcv); + + /* cancel all active workers */ + for (activep = tpool->tp_active; activep; activep = activep->tpa_next) + (void) pthread_cancel(activep->tpa_tid); + + /* wait for all active workers to finish */ + while (tpool->tp_active != NULL) { + tpool->tp_flags |= TP_WAIT; + (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); + } + + /* the last worker to terminate will wake us up */ + while (tpool->tp_current != 0) + (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); + + pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ + delete_pool(tpool); +} + +/* + * Like tpool_destroy(), but don't cancel workers or wait for them to finish. + * The last worker to terminate will delete the pool. + */ +void +tpool_abandon(tpool_t *tpool) +{ + + pthread_mutex_lock(&tpool->tp_mutex); + if (tpool->tp_current == 0) { + /* no workers, just delete the pool */ + pthread_mutex_unlock(&tpool->tp_mutex); + delete_pool(tpool); + } else { + /* wake up all workers, last one will delete the pool */ + tpool->tp_flags |= TP_ABANDON; + tpool->tp_flags &= ~TP_SUSPEND; + (void) pthread_cond_broadcast(&tpool->tp_workcv); + pthread_mutex_unlock(&tpool->tp_mutex); + } +} + +/* + * Wait for all jobs to complete. + * Calling tpool_wait() from a job in the pool will cause deadlock. + */ +void +tpool_wait(tpool_t *tpool) +{ + + pthread_mutex_lock(&tpool->tp_mutex); + pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); + while (tpool->tp_head != NULL || tpool->tp_active != NULL) { + tpool->tp_flags |= TP_WAIT; + (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); + } + pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ +} + +void +tpool_suspend(tpool_t *tpool) +{ + + pthread_mutex_lock(&tpool->tp_mutex); + tpool->tp_flags |= TP_SUSPEND; + pthread_mutex_unlock(&tpool->tp_mutex); +} + +int +tpool_suspended(tpool_t *tpool) +{ + int suspended; + + pthread_mutex_lock(&tpool->tp_mutex); + suspended = (tpool->tp_flags & TP_SUSPEND) != 0; + pthread_mutex_unlock(&tpool->tp_mutex); + + return (suspended); +} + +void +tpool_resume(tpool_t *tpool) +{ + int excess; + + pthread_mutex_lock(&tpool->tp_mutex); + if (!(tpool->tp_flags & TP_SUSPEND)) { + pthread_mutex_unlock(&tpool->tp_mutex); + return; + } + tpool->tp_flags &= ~TP_SUSPEND; + (void) pthread_cond_broadcast(&tpool->tp_workcv); + excess = tpool->tp_njobs - tpool->tp_idle; + while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { + if (create_worker(tpool) != 0) + break; /* pthread_create() failed */ + tpool->tp_current++; + } + pthread_mutex_unlock(&tpool->tp_mutex); +} + +int +tpool_member(tpool_t *tpool) +{ + pthread_t my_tid = pthread_self(); + tpool_active_t *activep; + + pthread_mutex_lock(&tpool->tp_mutex); + for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { + if (activep->tpa_tid == my_tid) { + pthread_mutex_unlock(&tpool->tp_mutex); + return (1); + } + } + pthread_mutex_unlock(&tpool->tp_mutex); + return (0); +} diff --git a/cddl/compat/opensolaris/misc/thread_pool_impl.h b/cddl/compat/opensolaris/misc/thread_pool_impl.h new file mode 100644 index 000000000000..bc98ac8b1b37 --- /dev/null +++ b/cddl/compat/opensolaris/misc/thread_pool_impl.h @@ -0,0 +1,99 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ + +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ + +/* + * $FreeBSD$ + */ + +#ifndef _THREAD_POOL_IMPL_H +#define _THREAD_POOL_IMPL_H + +#pragma ident "%Z%%M% %I% %E% SMI" + +#include <thread_pool.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Thread pool implementation definitions. + * See <thread_pool.h> for interface declarations. + */ + +/* + * FIFO queued job + */ +typedef struct tpool_job tpool_job_t; +struct tpool_job { + tpool_job_t *tpj_next; /* list of jobs */ + void (*tpj_func)(void *); /* function to call */ + void *tpj_arg; /* its argument */ +}; + +/* + * List of active threads, linked through their stacks. + */ +typedef struct tpool_active tpool_active_t; +struct tpool_active { + tpool_active_t *tpa_next; /* list of active threads */ + pthread_t tpa_tid; /* active thread id */ +}; + +/* + * The thread pool. + */ +struct tpool { + tpool_t *tp_forw; /* circular list of all thread pools */ + tpool_t *tp_back; + mutex_t tp_mutex; /* protects the pool data */ + cond_t tp_busycv; /* synchronization in tpool_dispatch */ + cond_t tp_workcv; /* synchronization with workers */ + cond_t tp_waitcv; /* synchronization in tpool_wait() */ + tpool_active_t *tp_active; /* threads performing work */ + tpool_job_t *tp_head; /* FIFO job queue */ + tpool_job_t *tp_tail; + pthread_attr_t tp_attr; /* attributes of the workers */ + int tp_flags; /* see below */ + uint_t tp_linger; /* seconds before idle workers exit */ + int tp_njobs; /* number of jobs in job queue */ + int tp_minimum; /* minimum number of worker threads */ + int tp_maximum; /* maximum number of worker threads */ + int tp_current; /* current number of worker threads */ + int tp_idle; /* number of idle workers */ +}; + +/* tp_flags */ +#define TP_WAIT 0x01 /* waiting in tpool_wait() */ +#define TP_SUSPEND 0x02 /* pool is being suspended */ +#define TP_DESTROY 0x04 /* pool is being destroyed */ +#define TP_ABANDON 0x08 /* pool is abandoned (auto-destroy) */ + +#ifdef __cplusplus +} +#endif + +#endif /* _THREAD_POOL_IMPL_H */ |