diff options
Diffstat (limited to 'lib/libtpool')
-rw-r--r-- | lib/libtpool/Makefile.am | 11 | ||||
-rw-r--r-- | lib/libtpool/thread_pool.c | 612 | ||||
-rw-r--r-- | lib/libtpool/thread_pool_impl.h | 94 |
3 files changed, 0 insertions, 717 deletions
diff --git a/lib/libtpool/Makefile.am b/lib/libtpool/Makefile.am deleted file mode 100644 index 5a2b8a5701da..000000000000 --- a/lib/libtpool/Makefile.am +++ /dev/null @@ -1,11 +0,0 @@ -libtpool_la_CFLAGS = $(AM_CFLAGS) $(LIBRARY_CFLAGS) -libtpool_la_CFLAGS += -fvisibility=hidden -# https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61118 -libtpool_la_CFLAGS += $(NO_CLOBBERED) - -noinst_LTLIBRARIES += libtpool.la -CPPCHECKTARGETS += libtpool.la - -libtpool_la_SOURCES = \ - %D%/thread_pool.c \ - %D%/thread_pool_impl.h diff --git a/lib/libtpool/thread_pool.c b/lib/libtpool/thread_pool.c deleted file mode 100644 index 39b92ae81166..000000000000 --- a/lib/libtpool/thread_pool.c +++ /dev/null @@ -1,612 +0,0 @@ -// SPDX-License-Identifier: CDDL-1.0 -/* - * CDDL HEADER START - * - * The contents of this file are subject to the terms of the - * Common Development and Distribution License (the "License"). - * You may not use this file except in compliance with the License. - * - * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE - * or https://opensource.org/licenses/CDDL-1.0. - * See the License for the specific language governing permissions - * and limitations under the License. - * - * When distributing Covered Code, include this CDDL HEADER in each - * file and include the License file at usr/src/OPENSOLARIS.LICENSE. - * If applicable, add the following below this CDDL HEADER, with the - * fields enclosed by brackets "[]" replaced with your own identifying - * information: Portions Copyright [yyyy] [name of copyright owner] - * - * CDDL HEADER END - */ - -/* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. - */ - -#include <stdlib.h> -#include <signal.h> -#include <errno.h> -#include <assert.h> -#include <limits.h> -#include "thread_pool_impl.h" - -static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER; -static tpool_t *thread_pools = NULL; - -static void -delete_pool(tpool_t *tpool) -{ - tpool_job_t *job; - - ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); - - /* - * Unlink the pool from the global list of all pools. - */ - (void) pthread_mutex_lock(&thread_pool_lock); - if (thread_pools == tpool) - thread_pools = tpool->tp_forw; - if (thread_pools == tpool) - thread_pools = NULL; - else { - tpool->tp_back->tp_forw = tpool->tp_forw; - tpool->tp_forw->tp_back = tpool->tp_back; - } - pthread_mutex_unlock(&thread_pool_lock); - - /* - * There should be no pending jobs, but just in case... - */ - for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { - tpool->tp_head = job->tpj_next; - free(job); - } - (void) pthread_attr_destroy(&tpool->tp_attr); - free(tpool); -} - -/* - * Worker thread is terminating. - */ -static void -worker_cleanup(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - - if (--tpool->tp_current == 0 && - (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { - if (tpool->tp_flags & TP_ABANDON) { - pthread_mutex_unlock(&tpool->tp_mutex); - delete_pool(tpool); - return; - } - if (tpool->tp_flags & TP_DESTROY) - (void) pthread_cond_broadcast(&tpool->tp_busycv); - } - pthread_mutex_unlock(&tpool->tp_mutex); -} - -static void -notify_waiters(tpool_t *tpool) -{ - if (tpool->tp_head == NULL && tpool->tp_active == NULL) { - tpool->tp_flags &= ~TP_WAIT; - (void) pthread_cond_broadcast(&tpool->tp_waitcv); - } -} - -/* - * Called by a worker thread on return from a tpool_dispatch()d job. - */ -static void -job_cleanup(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - - pthread_t my_tid = pthread_self(); - tpool_active_t *activep; - tpool_active_t **activepp; - - pthread_mutex_lock(&tpool->tp_mutex); - for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) { - activep = *activepp; - if (activep->tpa_tid == my_tid) { - *activepp = activep->tpa_next; - break; - } - } - if (tpool->tp_flags & TP_WAIT) - notify_waiters(tpool); -} - -static void * -tpool_worker(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - int elapsed; - tpool_job_t *job; - void (*func)(void *); - tpool_active_t active; - - pthread_mutex_lock(&tpool->tp_mutex); - pthread_cleanup_push(worker_cleanup, tpool); - - /* - * This is the worker's main loop. - * It will only be left if a timeout or an error has occurred. - */ - active.tpa_tid = pthread_self(); - for (;;) { - elapsed = 0; - tpool->tp_idle++; - if (tpool->tp_flags & TP_WAIT) - notify_waiters(tpool); - while ((tpool->tp_head == NULL || - (tpool->tp_flags & TP_SUSPEND)) && - !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { - if (tpool->tp_current <= tpool->tp_minimum || - tpool->tp_linger == 0) { - (void) pthread_cond_wait(&tpool->tp_workcv, - &tpool->tp_mutex); - } else { - struct timespec ts; - - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += tpool->tp_linger; - - if (pthread_cond_timedwait(&tpool->tp_workcv, - &tpool->tp_mutex, &ts) != 0) { - elapsed = 1; - break; - } - } - } - tpool->tp_idle--; - if (tpool->tp_flags & TP_DESTROY) - break; - if (tpool->tp_flags & TP_ABANDON) { - /* can't abandon a suspended pool */ - if (tpool->tp_flags & TP_SUSPEND) { - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast( - &tpool->tp_workcv); - } - if (tpool->tp_head == NULL) - break; - } - if ((job = tpool->tp_head) != NULL && - !(tpool->tp_flags & TP_SUSPEND)) { - elapsed = 0; - func = job->tpj_func; - arg = job->tpj_arg; - tpool->tp_head = job->tpj_next; - if (job == tpool->tp_tail) - tpool->tp_tail = NULL; - tpool->tp_njobs--; - active.tpa_next = tpool->tp_active; - tpool->tp_active = &active; - pthread_mutex_unlock(&tpool->tp_mutex); - pthread_cleanup_push(job_cleanup, tpool); - free(job); - - sigset_t maskset; - (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset); - - /* - * Call the specified function. - */ - func(arg); - /* - * We don't know what this thread has been doing, - * so we reset its signal mask and cancellation - * state back to the values prior to calling func(). - */ - (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); - (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, - NULL); - (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, - NULL); - pthread_cleanup_pop(1); - } - if (elapsed && tpool->tp_current > tpool->tp_minimum) { - /* - * We timed out and there is no work to be done - * and the number of workers exceeds the minimum. - * Exit now to reduce the size of the pool. - */ - break; - } - } - pthread_cleanup_pop(1); - return (arg); -} - -/* - * Create a worker thread, with default signals blocked. - */ -static int -create_worker(tpool_t *tpool) -{ - pthread_t thread; - sigset_t oset; - int error; - - (void) pthread_sigmask(SIG_SETMASK, NULL, &oset); - error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); - (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); - return (error); -} - - -/* - * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr - * is NULL initialize the cloned attr using default values. - */ -static int -pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr) -{ - int error; - - error = pthread_attr_init(attr); - if (error || (old_attr == NULL)) - return (error); - -#ifdef __GLIBC__ - cpu_set_t cpuset; - size_t cpusetsize = sizeof (cpuset); - error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset); - if (error == 0) - error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset); - if (error) - goto error; -#endif /* __GLIBC__ */ - - int detachstate; - error = pthread_attr_getdetachstate(old_attr, &detachstate); - if (error == 0) - error = pthread_attr_setdetachstate(attr, detachstate); - if (error) - goto error; - - size_t guardsize; - error = pthread_attr_getguardsize(old_attr, &guardsize); - if (error == 0) - error = pthread_attr_setguardsize(attr, guardsize); - if (error) - goto error; - - int inheritsched; - error = pthread_attr_getinheritsched(old_attr, &inheritsched); - if (error == 0) - error = pthread_attr_setinheritsched(attr, inheritsched); - if (error) - goto error; - - struct sched_param param; - error = pthread_attr_getschedparam(old_attr, ¶m); - if (error == 0) - error = pthread_attr_setschedparam(attr, ¶m); - if (error) - goto error; - - int policy; - error = pthread_attr_getschedpolicy(old_attr, &policy); - if (error == 0) - error = pthread_attr_setschedpolicy(attr, policy); - if (error) - goto error; - - int scope; - error = pthread_attr_getscope(old_attr, &scope); - if (error == 0) - error = pthread_attr_setscope(attr, scope); - if (error) - goto error; - - void *stackaddr; - size_t stacksize; - error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize); - if (error == 0) - error = pthread_attr_setstack(attr, stackaddr, stacksize); - if (error) - goto error; - - return (0); -error: - pthread_attr_destroy(attr); - return (error); -} - -tpool_t * -tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, - pthread_attr_t *attr) -{ - tpool_t *tpool; - void *stackaddr; - size_t stacksize; - size_t minstack; - int error; - - if (min_threads > max_threads || max_threads < 1) { - errno = EINVAL; - return (NULL); - } - if (attr != NULL) { - if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { - errno = EINVAL; - return (NULL); - } - /* - * Allow only one thread in the pool with a specified stack. - * Require threads to have at least the minimum stack size. - */ - minstack = PTHREAD_STACK_MIN; - if (stackaddr != NULL) { - if (stacksize < minstack || max_threads != 1) { - errno = EINVAL; - return (NULL); - } - } else if (stacksize != 0 && stacksize < minstack) { - errno = EINVAL; - return (NULL); - } - } - - tpool = calloc(1, sizeof (*tpool)); - if (tpool == NULL) { - errno = ENOMEM; - return (NULL); - } - (void) pthread_mutex_init(&tpool->tp_mutex, NULL); - (void) pthread_cond_init(&tpool->tp_busycv, NULL); - (void) pthread_cond_init(&tpool->tp_workcv, NULL); - (void) pthread_cond_init(&tpool->tp_waitcv, NULL); - tpool->tp_minimum = min_threads; - tpool->tp_maximum = max_threads; - tpool->tp_linger = linger; - - /* - * We cannot just copy the attribute pointer. - * We need to initialize a new pthread_attr_t structure - * with the values from the user-supplied pthread_attr_t. - * If the attribute pointer is NULL, we need to initialize - * the new pthread_attr_t structure with default values. - */ - error = pthread_attr_clone(&tpool->tp_attr, attr); - if (error) { - free(tpool); - errno = error; - return (NULL); - } - - /* make all pool threads be detached daemon threads */ - (void) pthread_attr_setdetachstate(&tpool->tp_attr, - PTHREAD_CREATE_DETACHED); - - /* insert into the global list of all thread pools */ - pthread_mutex_lock(&thread_pool_lock); - if (thread_pools == NULL) { - tpool->tp_forw = tpool; - tpool->tp_back = tpool; - thread_pools = tpool; - } else { - thread_pools->tp_back->tp_forw = tpool; - tpool->tp_forw = thread_pools; - tpool->tp_back = thread_pools->tp_back; - thread_pools->tp_back = tpool; - } - pthread_mutex_unlock(&thread_pool_lock); - - return (tpool); -} - -/* - * Dispatch a work request to the thread pool. - * If there are idle workers, awaken one. - * Else, if the maximum number of workers has - * not been reached, spawn a new worker thread. - * Else just return with the job added to the queue. - */ -int -tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) -{ - tpool_job_t *job; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - if ((job = calloc(1, sizeof (*job))) == NULL) - return (-1); - job->tpj_next = NULL; - job->tpj_func = func; - job->tpj_arg = arg; - - pthread_mutex_lock(&tpool->tp_mutex); - - if (!(tpool->tp_flags & TP_SUSPEND)) { - if (tpool->tp_idle > 0) - (void) pthread_cond_signal(&tpool->tp_workcv); - else if (tpool->tp_current >= tpool->tp_maximum) { - /* At worker limit. Leave task on queue */ - } else { - if (create_worker(tpool) == 0) { - /* Started a new worker thread */ - tpool->tp_current++; - } else if (tpool->tp_current > 0) { - /* Leave task on queue */ - } else { - /* Cannot start a single worker! */ - pthread_mutex_unlock(&tpool->tp_mutex); - free(job); - return (-1); - } - } - } - - if (tpool->tp_head == NULL) - tpool->tp_head = job; - else - tpool->tp_tail->tpj_next = job; - tpool->tp_tail = job; - tpool->tp_njobs++; - - pthread_mutex_unlock(&tpool->tp_mutex); - return (0); -} - -static void -tpool_cleanup(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - - pthread_mutex_unlock(&tpool->tp_mutex); -} - -/* - * Assumes: by the time tpool_destroy() is called no one will use this - * thread pool in any way and no one will try to dispatch entries to it. - * Calling tpool_destroy() from a job in the pool will cause deadlock. - */ -void -tpool_destroy(tpool_t *tpool) -{ - tpool_active_t *activep; - - ASSERT(!tpool_member(tpool)); - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - pthread_cleanup_push(tpool_cleanup, tpool); - - /* mark the pool as being destroyed; wakeup idle workers */ - tpool->tp_flags |= TP_DESTROY; - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast(&tpool->tp_workcv); - - /* cancel all active workers */ - for (activep = tpool->tp_active; activep; activep = activep->tpa_next) - (void) pthread_cancel(activep->tpa_tid); - - /* wait for all active workers to finish */ - while (tpool->tp_active != NULL) { - tpool->tp_flags |= TP_WAIT; - (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); - } - - /* the last worker to terminate will wake us up */ - while (tpool->tp_current != 0) - (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); - - pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ - delete_pool(tpool); -} - -/* - * Like tpool_destroy(), but don't cancel workers or wait for them to finish. - * The last worker to terminate will delete the pool. - */ -void -tpool_abandon(tpool_t *tpool) -{ - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - if (tpool->tp_current == 0) { - /* no workers, just delete the pool */ - pthread_mutex_unlock(&tpool->tp_mutex); - delete_pool(tpool); - } else { - /* wake up all workers, last one will delete the pool */ - tpool->tp_flags |= TP_ABANDON; - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast(&tpool->tp_workcv); - pthread_mutex_unlock(&tpool->tp_mutex); - } -} - -/* - * Wait for all jobs to complete. - * Calling tpool_wait() from a job in the pool will cause deadlock. - */ -void -tpool_wait(tpool_t *tpool) -{ - ASSERT(!tpool_member(tpool)); - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - pthread_cleanup_push(tpool_cleanup, tpool); - while (tpool->tp_head != NULL || tpool->tp_active != NULL) { - tpool->tp_flags |= TP_WAIT; - (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - } - pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ -} - -void -tpool_suspend(tpool_t *tpool) -{ - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - tpool->tp_flags |= TP_SUSPEND; - pthread_mutex_unlock(&tpool->tp_mutex); -} - -int -tpool_suspended(tpool_t *tpool) -{ - int suspended; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - suspended = (tpool->tp_flags & TP_SUSPEND) != 0; - pthread_mutex_unlock(&tpool->tp_mutex); - - return (suspended); -} - -void -tpool_resume(tpool_t *tpool) -{ - int excess; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - if (!(tpool->tp_flags & TP_SUSPEND)) { - pthread_mutex_unlock(&tpool->tp_mutex); - return; - } - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast(&tpool->tp_workcv); - excess = tpool->tp_njobs - tpool->tp_idle; - while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { - if (create_worker(tpool) != 0) - break; /* pthread_create() failed */ - tpool->tp_current++; - } - pthread_mutex_unlock(&tpool->tp_mutex); -} - -int -tpool_member(tpool_t *tpool) -{ - pthread_t my_tid = pthread_self(); - tpool_active_t *activep; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { - if (activep->tpa_tid == my_tid) { - pthread_mutex_unlock(&tpool->tp_mutex); - return (1); - } - } - pthread_mutex_unlock(&tpool->tp_mutex); - return (0); -} diff --git a/lib/libtpool/thread_pool_impl.h b/lib/libtpool/thread_pool_impl.h deleted file mode 100644 index e2bffd37d522..000000000000 --- a/lib/libtpool/thread_pool_impl.h +++ /dev/null @@ -1,94 +0,0 @@ -// SPDX-License-Identifier: CDDL-1.0 -/* - * CDDL HEADER START - * - * The contents of this file are subject to the terms of the - * Common Development and Distribution License (the "License"). - * You may not use this file except in compliance with the License. - * - * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE - * or https://opensource.org/licenses/CDDL-1.0. - * See the License for the specific language governing permissions - * and limitations under the License. - * - * When distributing Covered Code, include this CDDL HEADER in each - * file and include the License file at usr/src/OPENSOLARIS.LICENSE. - * If applicable, add the following below this CDDL HEADER, with the - * fields enclosed by brackets "[]" replaced with your own identifying - * information: Portions Copyright [yyyy] [name of copyright owner] - * - * CDDL HEADER END - */ - -/* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. - */ - -#ifndef _THREAD_POOL_IMPL_H -#define _THREAD_POOL_IMPL_H - -#include <thread_pool.h> - -#ifdef __cplusplus -extern "C" { -#endif - -/* - * Thread pool implementation definitions. - * See <thread_pool.h> for interface declarations. - */ - -/* - * FIFO queued job - */ -typedef struct tpool_job tpool_job_t; -struct tpool_job { - tpool_job_t *tpj_next; /* list of jobs */ - void (*tpj_func)(void *); /* function to call */ - void *tpj_arg; /* its argument */ -}; - -/* - * List of active threads, linked through their stacks. - */ -typedef struct tpool_active tpool_active_t; -struct tpool_active { - tpool_active_t *tpa_next; /* list of active threads */ - pthread_t tpa_tid; /* active thread id */ -}; - -/* - * The thread pool. - */ -struct tpool { - tpool_t *tp_forw; /* circular list of all thread pools */ - tpool_t *tp_back; - pthread_mutex_t tp_mutex; /* protects the pool data */ - pthread_cond_t tp_busycv; /* synchronization in tpool_dispatch */ - pthread_cond_t tp_workcv; /* synchronization with workers */ - pthread_cond_t tp_waitcv; /* synchronization in tpool_wait() */ - tpool_active_t *tp_active; /* threads performing work */ - tpool_job_t *tp_head; /* FIFO job queue */ - tpool_job_t *tp_tail; - pthread_attr_t tp_attr; /* attributes of the workers */ - int tp_flags; /* see below */ - uint_t tp_linger; /* seconds before idle workers exit */ - int tp_njobs; /* number of jobs in job queue */ - int tp_minimum; /* minimum number of worker threads */ - int tp_maximum; /* maximum number of worker threads */ - int tp_current; /* current number of worker threads */ - int tp_idle; /* number of idle workers */ -}; - -/* tp_flags */ -#define TP_WAIT 0x01 /* waiting in tpool_wait() */ -#define TP_SUSPEND 0x02 /* pool is being suspended */ -#define TP_DESTROY 0x04 /* pool is being destroyed */ -#define TP_ABANDON 0x08 /* pool is abandoned (auto-destroy) */ - -#ifdef __cplusplus -} -#endif - -#endif /* _THREAD_POOL_IMPL_H */ |