diff options
author | Dimitry Andric <dim@FreeBSD.org> | 2023-07-26 19:03:47 +0000 |
---|---|---|
committer | Dimitry Andric <dim@FreeBSD.org> | 2023-07-26 19:04:23 +0000 |
commit | 7fa27ce4a07f19b07799a767fc29416f3b625afb (patch) | |
tree | 27825c83636c4de341eb09a74f49f5d38a15d165 /llvm/lib/Support/Parallel.cpp | |
parent | e3b557809604d036af6e00c60f012c2025b59a5e (diff) |
Diffstat (limited to 'llvm/lib/Support/Parallel.cpp')
-rw-r--r-- | llvm/lib/Support/Parallel.cpp | 106 |
1 files changed, 69 insertions, 37 deletions
diff --git a/llvm/lib/Support/Parallel.cpp b/llvm/lib/Support/Parallel.cpp index 23ed9d813548..9b14b05b5211 100644 --- a/llvm/lib/Support/Parallel.cpp +++ b/llvm/lib/Support/Parallel.cpp @@ -12,8 +12,8 @@ #include "llvm/Support/Threading.h" #include <atomic> +#include <deque> #include <future> -#include <stack> #include <thread> #include <vector> @@ -24,11 +24,11 @@ namespace parallel { #if LLVM_ENABLE_THREADS #ifdef _WIN32 -static thread_local unsigned threadIndex; +static thread_local unsigned threadIndex = UINT_MAX; -unsigned getThreadIndex() { return threadIndex; } +unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } #else -thread_local unsigned threadIndex; +thread_local unsigned threadIndex = UINT_MAX; #endif namespace detail { @@ -39,7 +39,8 @@ namespace { class Executor { public: virtual ~Executor() = default; - virtual void add(std::function<void()> func) = 0; + virtual void add(std::function<void()> func, bool Sequential = false) = 0; + virtual size_t getThreadCount() const = 0; static Executor *getDefaultExecutor(); }; @@ -49,13 +50,16 @@ public: class ThreadPoolExecutor : public Executor { public: explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { - unsigned ThreadCount = S.compute_thread_count(); + ThreadCount = S.compute_thread_count(); // Spawn all but one of the threads in another thread as spawning threads // can take a while. Threads.reserve(ThreadCount); Threads.resize(1); std::lock_guard<std::mutex> Lock(Mutex); - Threads[0] = std::thread([this, ThreadCount, S] { + // Use operator[] before creating the thread to avoid data race in .size() + // in “safe libc++” mode. + auto &Thread0 = Threads[0]; + Thread0 = std::thread([this, S] { for (unsigned I = 1; I < ThreadCount; ++I) { Threads.emplace_back([=] { work(S, I); }); if (Stop) @@ -94,36 +98,61 @@ public: static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } }; - void add(std::function<void()> F) override { + void add(std::function<void()> F, bool Sequential = false) override { { std::lock_guard<std::mutex> Lock(Mutex); - WorkStack.push(std::move(F)); + if (Sequential) + WorkQueueSequential.emplace_front(std::move(F)); + else + WorkQueue.emplace_back(std::move(F)); } Cond.notify_one(); } + size_t getThreadCount() const override { return ThreadCount; } + private: + bool hasSequentialTasks() const { + return !WorkQueueSequential.empty() && !SequentialQueueIsLocked; + } + + bool hasGeneralTasks() const { return !WorkQueue.empty(); } + void work(ThreadPoolStrategy S, unsigned ThreadID) { threadIndex = ThreadID; S.apply_thread_strategy(ThreadID); while (true) { std::unique_lock<std::mutex> Lock(Mutex); - Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); + Cond.wait(Lock, [&] { + return Stop || hasGeneralTasks() || hasSequentialTasks(); + }); if (Stop) break; - auto Task = std::move(WorkStack.top()); - WorkStack.pop(); + bool Sequential = hasSequentialTasks(); + if (Sequential) + SequentialQueueIsLocked = true; + else + assert(hasGeneralTasks()); + + auto &Queue = Sequential ? WorkQueueSequential : WorkQueue; + auto Task = std::move(Queue.back()); + Queue.pop_back(); Lock.unlock(); Task(); + if (Sequential) + SequentialQueueIsLocked = false; } } std::atomic<bool> Stop{false}; - std::stack<std::function<void()>> WorkStack; + std::atomic<bool> SequentialQueueIsLocked{false}; + std::deque<std::function<void()>> WorkQueue; + std::deque<std::function<void()>> WorkQueueSequential; std::mutex Mutex; std::condition_variable Cond; std::promise<void> ThreadsCreated; std::vector<std::thread> Threads; + unsigned ThreadCount; }; Executor *Executor::getDefaultExecutor() { @@ -153,54 +182,53 @@ Executor *Executor::getDefaultExecutor() { } } // namespace } // namespace detail -#endif -static std::atomic<int> TaskGroupInstances; +size_t getThreadCount() { + return detail::Executor::getDefaultExecutor()->getThreadCount(); +} +#endif // Latch::sync() called by the dtor may cause one thread to block. If is a dead // lock if all threads in the default executor are blocked. To prevent the dead -// lock, only allow the first TaskGroup to run tasks parallelly. In the scenario +// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario // of nested parallel_for_each(), only the outermost one runs parallelly. -TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} +TaskGroup::TaskGroup() +#if LLVM_ENABLE_THREADS + : Parallel((parallel::strategy.ThreadsRequested != 1) && + (threadIndex == UINT_MAX)) {} +#else + : Parallel(false) {} +#endif TaskGroup::~TaskGroup() { // We must ensure that all the workloads have finished before decrementing the // instances count. L.sync(); - --TaskGroupInstances; } -void TaskGroup::spawn(std::function<void()> F) { +void TaskGroup::spawn(std::function<void()> F, bool Sequential) { #if LLVM_ENABLE_THREADS if (Parallel) { L.inc(); - detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { - F(); - L.dec(); - }); + detail::Executor::getDefaultExecutor()->add( + [&, F = std::move(F)] { + F(); + L.dec(); + }, + Sequential); return; } #endif F(); } -void TaskGroup::execute(std::function<void()> F) { - if (parallel::strategy.ThreadsRequested == 1) - F(); - else - spawn(F); -} } // namespace parallel } // namespace llvm void llvm::parallelFor(size_t Begin, size_t End, llvm::function_ref<void(size_t)> Fn) { - // If we have zero or one items, then do not incur the overhead of spinning up - // a task group. They are surprisingly expensive, and because they do not - // support nested parallelism, a single entry task group can block parallel - // execution underneath them. #if LLVM_ENABLE_THREADS - auto NumItems = End - Begin; - if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) { + if (parallel::strategy.ThreadsRequested != 1) { + auto NumItems = End - Begin; // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling // overhead on large inputs. auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; @@ -214,8 +242,12 @@ void llvm::parallelFor(size_t Begin, size_t End, Fn(I); }); } - for (; Begin != End; ++Begin) - Fn(Begin); + if (Begin != End) { + TG.spawn([=, &Fn] { + for (size_t I = Begin; I != End; ++I) + Fn(I); + }); + } return; } #endif |