diff options
author | Dimitry Andric <dim@FreeBSD.org> | 2017-04-16 16:01:22 +0000 |
---|---|---|
committer | Dimitry Andric <dim@FreeBSD.org> | 2017-04-16 16:01:22 +0000 |
commit | 71d5a2540a98c81f5bcaeb48805e0e2881f530ef (patch) | |
tree | 5343938942df402b49ec7300a1c25a2d4ccd5821 /unittests/ExecutionEngine/Orc/QueueChannel.h | |
parent | 31bbf64f3a4974a2d6c8b3b27ad2f519caf74057 (diff) |
Diffstat (limited to 'unittests/ExecutionEngine/Orc/QueueChannel.h')
-rw-r--r-- | unittests/ExecutionEngine/Orc/QueueChannel.h | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/unittests/ExecutionEngine/Orc/QueueChannel.h b/unittests/ExecutionEngine/Orc/QueueChannel.h new file mode 100644 index 000000000000..3d1058a83ebc --- /dev/null +++ b/unittests/ExecutionEngine/Orc/QueueChannel.h @@ -0,0 +1,146 @@ +//===----------------------- Queue.h - RPC Queue ------------------*-c++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H +#define LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H + +#include "llvm/ExecutionEngine/Orc/RawByteChannel.h" +#include "llvm/Support/Error.h" + +#include <queue> +#include <condition_variable> + +namespace llvm { + +class QueueChannelError : public ErrorInfo<QueueChannelError> { +public: + static char ID; +}; + +class QueueChannelClosedError + : public ErrorInfo<QueueChannelClosedError, QueueChannelError> { +public: + static char ID; + std::error_code convertToErrorCode() const override { + return inconvertibleErrorCode(); + } + + void log(raw_ostream &OS) const override { + OS << "Queue closed"; + } +}; + +class Queue : public std::queue<char> { +public: + using ErrorInjector = std::function<Error()>; + + Queue() + : ReadError([]() { return Error::success(); }), + WriteError([]() { return Error::success(); }) {} + + Queue(const Queue&) = delete; + Queue& operator=(const Queue&) = delete; + Queue(Queue&&) = delete; + Queue& operator=(Queue&&) = delete; + + std::mutex &getMutex() { return M; } + std::condition_variable &getCondVar() { return CV; } + Error checkReadError() { return ReadError(); } + Error checkWriteError() { return WriteError(); } + void setReadError(ErrorInjector NewReadError) { + { + std::lock_guard<std::mutex> Lock(M); + ReadError = std::move(NewReadError); + } + CV.notify_one(); + } + void setWriteError(ErrorInjector NewWriteError) { + std::lock_guard<std::mutex> Lock(M); + WriteError = std::move(NewWriteError); + } +private: + std::mutex M; + std::condition_variable CV; + std::function<Error()> ReadError, WriteError; +}; + +class QueueChannel : public orc::rpc::RawByteChannel { +public: + QueueChannel(std::shared_ptr<Queue> InQueue, + std::shared_ptr<Queue> OutQueue) + : InQueue(InQueue), OutQueue(OutQueue) {} + + QueueChannel(const QueueChannel&) = delete; + QueueChannel& operator=(const QueueChannel&) = delete; + QueueChannel(QueueChannel&&) = delete; + QueueChannel& operator=(QueueChannel&&) = delete; + + Error readBytes(char *Dst, unsigned Size) override { + std::unique_lock<std::mutex> Lock(InQueue->getMutex()); + while (Size) { + { + Error Err = InQueue->checkReadError(); + while (!Err && InQueue->empty()) { + InQueue->getCondVar().wait(Lock); + Err = InQueue->checkReadError(); + } + if (Err) + return Err; + } + *Dst++ = InQueue->front(); + --Size; + ++NumRead; + InQueue->pop(); + } + return Error::success(); + } + + Error appendBytes(const char *Src, unsigned Size) override { + std::unique_lock<std::mutex> Lock(OutQueue->getMutex()); + while (Size--) { + if (Error Err = OutQueue->checkWriteError()) + return Err; + OutQueue->push(*Src++); + ++NumWritten; + } + OutQueue->getCondVar().notify_one(); + return Error::success(); + } + + Error send() override { return Error::success(); } + + void close() { + auto ChannelClosed = []() { return make_error<QueueChannelClosedError>(); }; + InQueue->setReadError(ChannelClosed); + InQueue->setWriteError(ChannelClosed); + OutQueue->setReadError(ChannelClosed); + OutQueue->setWriteError(ChannelClosed); + } + + uint64_t NumWritten = 0; + uint64_t NumRead = 0; + +private: + + std::shared_ptr<Queue> InQueue; + std::shared_ptr<Queue> OutQueue; +}; + +inline std::pair<std::unique_ptr<QueueChannel>, std::unique_ptr<QueueChannel>> +createPairedQueueChannels() { + auto Q1 = std::make_shared<Queue>(); + auto Q2 = std::make_shared<Queue>(); + auto C1 = llvm::make_unique<QueueChannel>(Q1, Q2); + auto C2 = llvm::make_unique<QueueChannel>(Q2, Q1); + return std::make_pair(std::move(C1), std::move(C2)); +} + +} + +#endif |