diff options
author | Dimitry Andric <dim@FreeBSD.org> | 2022-07-04 19:20:19 +0000 |
---|---|---|
committer | Dimitry Andric <dim@FreeBSD.org> | 2023-02-08 19:02:26 +0000 |
commit | 81ad626541db97eb356e2c1d4a20eb2a26a766ab (patch) | |
tree | 311b6a8987c32b1e1dcbab65c54cfac3fdb56175 /contrib/llvm-project/llvm/lib/Support/ThreadPool.cpp | |
parent | 5fff09660e06a66bed6482da9c70df328e16bbb6 (diff) | |
parent | 145449b1e420787bb99721a429341fa6be3adfb6 (diff) |
Diffstat (limited to 'contrib/llvm-project/llvm/lib/Support/ThreadPool.cpp')
-rw-r--r-- | contrib/llvm-project/llvm/lib/Support/ThreadPool.cpp | 171 |
1 files changed, 131 insertions, 40 deletions
diff --git a/contrib/llvm-project/llvm/lib/Support/ThreadPool.cpp b/contrib/llvm-project/llvm/lib/Support/ThreadPool.cpp index 9f92ae1c7a7c..31461e31c65c 100644 --- a/contrib/llvm-project/llvm/lib/Support/ThreadPool.cpp +++ b/contrib/llvm-project/llvm/lib/Support/ThreadPool.cpp @@ -24,11 +24,19 @@ using namespace llvm; #if LLVM_ENABLE_THREADS +// A note on thread groups: Tasks are by default in no group (represented +// by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality +// here normally works on all tasks regardless of their group (functions +// in that case receive nullptr ThreadPoolTaskGroup pointer as argument). +// A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks +// queue, and functions called to work only on tasks from one group take that +// pointer. + ThreadPool::ThreadPool(ThreadPoolStrategy S) : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} void ThreadPool::grow(int requested) { - std::unique_lock<std::mutex> LockGuard(ThreadsLock); + llvm::sys::ScopedWriter LockGuard(ThreadsLock); if (Threads.size() >= MaxThreadCount) return; // Already hit the max thread pool size. int newThreadCount = std::min<int>(requested, MaxThreadCount); @@ -36,52 +44,129 @@ void ThreadPool::grow(int requested) { int ThreadID = Threads.size(); Threads.emplace_back([this, ThreadID] { Strategy.apply_thread_strategy(ThreadID); - while (true) { - std::function<void()> Task; - { - std::unique_lock<std::mutex> LockGuard(QueueLock); - // Wait for tasks to be pushed in the queue - QueueCondition.wait(LockGuard, - [&] { return !EnableFlag || !Tasks.empty(); }); - // Exit condition - if (!EnableFlag && Tasks.empty()) - return; - // Yeah, we have a task, grab it and release the lock on the queue - - // We first need to signal that we are active before popping the queue - // in order for wait() to properly detect that even if the queue is - // empty, there is still a task in flight. - ++ActiveThreads; - Task = std::move(Tasks.front()); - Tasks.pop(); - } - // Run the task we just grabbed - Task(); - - bool Notify; - { - // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() - std::lock_guard<std::mutex> LockGuard(QueueLock); - --ActiveThreads; - Notify = workCompletedUnlocked(); - } - // Notify task completion if this is the last active thread, in case - // someone waits on ThreadPool::wait(). - if (Notify) - CompletionCondition.notify_all(); - } + processTasks(nullptr); }); } } +#ifndef NDEBUG +// The group of the tasks run by the current thread. +static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *> + *CurrentThreadTaskGroups = nullptr; +#endif + +// WaitingForGroup == nullptr means all tasks regardless of their group. +void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { + while (true) { + std::function<void()> Task; + ThreadPoolTaskGroup *GroupOfTask; + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + bool workCompletedForGroup = false; // Result of workCompletedUnlocked() + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, [&] { + return !EnableFlag || !Tasks.empty() || + (WaitingForGroup != nullptr && + (workCompletedForGroup = + workCompletedUnlocked(WaitingForGroup))); + }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + if (WaitingForGroup != nullptr && workCompletedForGroup) + return; + // Yeah, we have a task, grab it and release the lock on the queue + + // We first need to signal that we are active before popping the queue + // in order for wait() to properly detect that even if the queue is + // empty, there is still a task in flight. + ++ActiveThreads; + Task = std::move(Tasks.front().first); + GroupOfTask = Tasks.front().second; + // Need to count active threads in each group separately, ActiveThreads + // would never be 0 if waiting for another group inside a wait. + if (GroupOfTask != nullptr) + ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item + Tasks.pop_front(); + } +#ifndef NDEBUG + if (CurrentThreadTaskGroups == nullptr) + CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>; + CurrentThreadTaskGroups->push_back(GroupOfTask); +#endif + + // Run the task we just grabbed + Task(); + +#ifndef NDEBUG + CurrentThreadTaskGroups->pop_back(); + if (CurrentThreadTaskGroups->empty()) { + delete CurrentThreadTaskGroups; + CurrentThreadTaskGroups = nullptr; + } +#endif + + bool Notify; + bool NotifyGroup; + { + // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() + std::lock_guard<std::mutex> LockGuard(QueueLock); + --ActiveThreads; + if (GroupOfTask != nullptr) { + auto A = ActiveGroups.find(GroupOfTask); + if (--(A->second) == 0) + ActiveGroups.erase(A); + } + Notify = workCompletedUnlocked(GroupOfTask); + NotifyGroup = GroupOfTask != nullptr && Notify; + } + // Notify task completion if this is the last active thread, in case + // someone waits on ThreadPool::wait(). + if (Notify) + CompletionCondition.notify_all(); + // If this was a task in a group, notify also threads waiting for tasks + // in this function on QueueCondition, to make a recursive wait() return + // after the group it's been waiting for has finished. + if (NotifyGroup) + QueueCondition.notify_all(); + } +} + +bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { + if (Group == nullptr) + return !ActiveThreads && Tasks.empty(); + return ActiveGroups.count(Group) == 0 && + !llvm::any_of(Tasks, + [Group](const auto &T) { return T.second == Group; }); +} + void ThreadPool::wait() { + assert(!isWorkerThread()); // Would deadlock waiting for itself. // Wait for all threads to complete and the queue to be empty std::unique_lock<std::mutex> LockGuard(QueueLock); - CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); }); + CompletionCondition.wait(LockGuard, + [&] { return workCompletedUnlocked(nullptr); }); +} + +void ThreadPool::wait(ThreadPoolTaskGroup &Group) { + // Wait for all threads in the group to complete. + if (!isWorkerThread()) { + std::unique_lock<std::mutex> LockGuard(QueueLock); + CompletionCondition.wait(LockGuard, + [&] { return workCompletedUnlocked(&Group); }); + return; + } + // Make sure to not deadlock waiting for oneself. + assert(CurrentThreadTaskGroups == nullptr || + !llvm::is_contained(*CurrentThreadTaskGroups, &Group)); + // Handle the case of recursive call from another task in a different group, + // in which case process tasks while waiting to keep the thread busy and avoid + // possible deadlock. + processTasks(&Group); } bool ThreadPool::isWorkerThread() const { - std::unique_lock<std::mutex> LockGuard(ThreadsLock); + llvm::sys::ScopedReader LockGuard(ThreadsLock); llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); for (const llvm::thread &Thread : Threads) if (CurrentThreadId == Thread.get_id()) @@ -96,7 +181,7 @@ ThreadPool::~ThreadPool() { EnableFlag = false; } QueueCondition.notify_all(); - std::unique_lock<std::mutex> LockGuard(ThreadsLock); + llvm::sys::ScopedReader LockGuard(ThreadsLock); for (auto &Worker : Threads) Worker.join(); } @@ -115,12 +200,18 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) { void ThreadPool::wait() { // Sequential implementation running the tasks while (!Tasks.empty()) { - auto Task = std::move(Tasks.front()); - Tasks.pop(); + auto Task = std::move(Tasks.front().first); + Tasks.pop_front(); Task(); } } +void ThreadPool::wait(ThreadPoolTaskGroup &) { + // Simply wait for all, this works even if recursive (the running task + // is already removed from the queue). + wait(); +} + bool ThreadPool::isWorkerThread() const { report_fatal_error("LLVM compiled without multithreading"); } |