summaryrefslogtreecommitdiff
path: root/lib/Core/TaskGroup.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Core/TaskGroup.cpp')
-rw-r--r--lib/Core/TaskGroup.cpp141
1 files changed, 141 insertions, 0 deletions
diff --git a/lib/Core/TaskGroup.cpp b/lib/Core/TaskGroup.cpp
new file mode 100644
index 000000000000..d4de48ce3dc4
--- /dev/null
+++ b/lib/Core/TaskGroup.cpp
@@ -0,0 +1,141 @@
+//===- lld/Core/TaskGroup.cpp - Task Group --------------------------------===//
+//
+// The LLVM Linker
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "lld/Core/TaskGroup.h"
+#include "llvm/Config/llvm-config.h"
+
+#include <atomic>
+#include <stack>
+#include <thread>
+
+#if defined(_MSC_VER) && LLVM_ENABLE_THREADS
+#include <concrt.h>
+#include <ppl.h>
+#endif
+
+using namespace lld;
+
+namespace {
+
+/// \brief An abstract class that takes closures and runs them asynchronously.
+class Executor {
+public:
+ virtual ~Executor() = default;
+ virtual void add(std::function<void()> func) = 0;
+
+ static Executor *getDefaultExecutor();
+};
+
+#if !LLVM_ENABLE_THREADS
+class SyncExecutor : public Executor {
+public:
+ virtual void add(std::function<void()> F) { F(); }
+};
+
+Executor *Executor::getDefaultExecutor() {
+ static SyncExecutor Exec;
+ return &Exec;
+}
+
+#elif defined(_MSC_VER)
+/// \brief An Executor that runs tasks via ConcRT.
+class ConcRTExecutor : public Executor {
+ struct Taskish {
+ Taskish(std::function<void()> Task) : Task(Task) {}
+
+ std::function<void()> Task;
+
+ static void run(void *P) {
+ Taskish *Self = static_cast<Taskish *>(P);
+ Self->Task();
+ concurrency::Free(Self);
+ }
+ };
+
+public:
+ virtual void add(std::function<void()> F) {
+ Concurrency::CurrentScheduler::ScheduleTask(
+ Taskish::run, new (concurrency::Alloc(sizeof(Taskish))) Taskish(F));
+ }
+};
+
+Executor *Executor::getDefaultExecutor() {
+ static ConcRTExecutor exec;
+ return &exec;
+}
+
+#else
+/// \brief An implementation of an Executor that runs closures on a thread pool
+/// in filo order.
+class ThreadPoolExecutor : public Executor {
+public:
+ explicit ThreadPoolExecutor(
+ unsigned ThreadCount = std::thread::hardware_concurrency())
+ : Done(ThreadCount) {
+ // Spawn all but one of the threads in another thread as spawning threads
+ // can take a while.
+ std::thread([&, ThreadCount] {
+ for (size_t i = 1; i < ThreadCount; ++i) {
+ std::thread([=] { work(); }).detach();
+ }
+ work();
+ }).detach();
+ }
+
+ ~ThreadPoolExecutor() override {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ Stop = true;
+ Lock.unlock();
+ Cond.notify_all();
+ // Wait for ~Latch.
+ }
+
+ void add(std::function<void()> F) override {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ WorkStack.push(F);
+ Lock.unlock();
+ Cond.notify_one();
+ }
+
+private:
+ void work() {
+ while (true) {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
+ if (Stop)
+ break;
+ auto Task = WorkStack.top();
+ WorkStack.pop();
+ Lock.unlock();
+ Task();
+ }
+ Done.dec();
+ }
+
+ std::atomic<bool> Stop{false};
+ std::stack<std::function<void()>> WorkStack;
+ std::mutex Mutex;
+ std::condition_variable Cond;
+ Latch Done;
+};
+
+Executor *Executor::getDefaultExecutor() {
+ static ThreadPoolExecutor exec;
+ return &exec;
+}
+#endif
+}
+
+void TaskGroup::spawn(std::function<void()> f) {
+ _latch.inc();
+ Executor::getDefaultExecutor()->add([&, f] {
+ f();
+ _latch.dec();
+ });
+}