diff options
| author | Dimitry Andric <dim@FreeBSD.org> | 2017-04-16 16:01:22 +0000 | 
|---|---|---|
| committer | Dimitry Andric <dim@FreeBSD.org> | 2017-04-16 16:01:22 +0000 | 
| commit | 71d5a2540a98c81f5bcaeb48805e0e2881f530ef (patch) | |
| tree | 5343938942df402b49ec7300a1c25a2d4ccd5821 /unittests/ExecutionEngine/Orc/QueueChannel.h | |
| parent | 31bbf64f3a4974a2d6c8b3b27ad2f519caf74057 (diff) | |
Diffstat (limited to 'unittests/ExecutionEngine/Orc/QueueChannel.h')
| -rw-r--r-- | unittests/ExecutionEngine/Orc/QueueChannel.h | 146 | 
1 files changed, 146 insertions, 0 deletions
| diff --git a/unittests/ExecutionEngine/Orc/QueueChannel.h b/unittests/ExecutionEngine/Orc/QueueChannel.h new file mode 100644 index 000000000000..3d1058a83ebc --- /dev/null +++ b/unittests/ExecutionEngine/Orc/QueueChannel.h @@ -0,0 +1,146 @@ +//===----------------------- Queue.h - RPC Queue ------------------*-c++-*-===// +// +//                     The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H +#define LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H + +#include "llvm/ExecutionEngine/Orc/RawByteChannel.h" +#include "llvm/Support/Error.h" + +#include <queue> +#include <condition_variable> + +namespace llvm { + +class QueueChannelError : public ErrorInfo<QueueChannelError> { +public: +  static char ID; +}; + +class QueueChannelClosedError +    : public ErrorInfo<QueueChannelClosedError, QueueChannelError> { +public: +  static char ID; +  std::error_code convertToErrorCode() const override { +    return inconvertibleErrorCode(); +  } + +  void log(raw_ostream &OS) const override { +    OS << "Queue closed"; +  } +}; + +class Queue : public std::queue<char> { +public: +  using ErrorInjector = std::function<Error()>; + +  Queue() +    : ReadError([]() { return Error::success(); }), +      WriteError([]() { return Error::success(); }) {} + +  Queue(const Queue&) = delete; +  Queue& operator=(const Queue&) = delete; +  Queue(Queue&&) = delete; +  Queue& operator=(Queue&&) = delete; + +  std::mutex &getMutex() { return M; } +  std::condition_variable &getCondVar() { return CV; } +  Error checkReadError() { return ReadError(); } +  Error checkWriteError() { return WriteError(); } +  void setReadError(ErrorInjector NewReadError) { +    { +      std::lock_guard<std::mutex> Lock(M); +      ReadError = std::move(NewReadError); +    } +    CV.notify_one(); +  } +  void setWriteError(ErrorInjector NewWriteError) { +    std::lock_guard<std::mutex> Lock(M); +    WriteError = std::move(NewWriteError); +  } +private: +  std::mutex M; +  std::condition_variable CV; +  std::function<Error()> ReadError, WriteError; +}; + +class QueueChannel : public orc::rpc::RawByteChannel { +public: +  QueueChannel(std::shared_ptr<Queue> InQueue, +               std::shared_ptr<Queue> OutQueue) +      : InQueue(InQueue), OutQueue(OutQueue) {} + +  QueueChannel(const QueueChannel&) = delete; +  QueueChannel& operator=(const QueueChannel&) = delete; +  QueueChannel(QueueChannel&&) = delete; +  QueueChannel& operator=(QueueChannel&&) = delete; + +  Error readBytes(char *Dst, unsigned Size) override { +    std::unique_lock<std::mutex> Lock(InQueue->getMutex()); +    while (Size) { +      { +        Error Err = InQueue->checkReadError(); +        while (!Err && InQueue->empty()) { +          InQueue->getCondVar().wait(Lock); +          Err = InQueue->checkReadError(); +        } +        if (Err) +          return Err; +      } +      *Dst++ = InQueue->front(); +      --Size; +      ++NumRead; +      InQueue->pop(); +    } +    return Error::success(); +  } + +  Error appendBytes(const char *Src, unsigned Size) override { +    std::unique_lock<std::mutex> Lock(OutQueue->getMutex()); +    while (Size--) { +      if (Error Err = OutQueue->checkWriteError()) +        return Err; +      OutQueue->push(*Src++); +      ++NumWritten; +    } +    OutQueue->getCondVar().notify_one(); +    return Error::success(); +  } + +  Error send() override { return Error::success(); } + +  void close() { +    auto ChannelClosed = []() { return make_error<QueueChannelClosedError>(); }; +    InQueue->setReadError(ChannelClosed); +    InQueue->setWriteError(ChannelClosed); +    OutQueue->setReadError(ChannelClosed); +    OutQueue->setWriteError(ChannelClosed); +  } + +  uint64_t NumWritten = 0; +  uint64_t NumRead = 0; + +private: + +  std::shared_ptr<Queue> InQueue; +  std::shared_ptr<Queue> OutQueue; +}; + +inline std::pair<std::unique_ptr<QueueChannel>, std::unique_ptr<QueueChannel>> +createPairedQueueChannels() { +  auto Q1 = std::make_shared<Queue>(); +  auto Q2 = std::make_shared<Queue>(); +  auto C1 = llvm::make_unique<QueueChannel>(Q1, Q2); +  auto C2 = llvm::make_unique<QueueChannel>(Q2, Q1); +  return std::make_pair(std::move(C1), std::move(C2)); +} + +} + +#endif | 
