diff options
Diffstat (limited to 'zen/thread.h')
-rwxr-xr-x | zen/thread.h | 56 |
1 files changed, 41 insertions, 15 deletions
diff --git a/zen/thread.h b/zen/thread.h index ee36f305..5828d07a 100755 --- a/zen/thread.h +++ b/zen/thread.h @@ -119,7 +119,7 @@ class Protected { public: Protected() {} - Protected(const T& value) : value_(value) {} + Protected(T& value) : value_(value) {} //Protected( T&& tmp ) : value_(std::move(tmp)) {} <- wait until needed template <class Function> @@ -161,28 +161,46 @@ public: ThreadGroup& operator=(ThreadGroup&& tmp) noexcept { swap(tmp); return *this; } //noexcept *required* to support move for reallocations in std::vector and std::swap!!! - //context of controlling thread, non-blocking: - void run(Function&& wi) + //context of controlling OR worker thread, non-blocking: + void run(Function&& wi /*should throw ThreadInterruption when needed*/) { - size_t tasksPending = 0; { std::lock_guard<std::mutex> dummy(workLoad_->lock); + workLoad_->tasks.push_back(std::move(wi)); - tasksPending = ++(workLoad_->tasksPending); + const size_t tasksPending = ++(workLoad_->tasksPending); + + if (worker_.size() < std::min(tasksPending, threadCountMax_)) + addWorkerThread(); } workLoad_->conditionNewTask.notify_all(); - - if (worker_.size() < std::min(tasksPending, threadCountMax_)) - addWorkerThread(); } //context of controlling thread, blocking: void wait() { - std::unique_lock<std::mutex> dummy(workLoad_->lock); - workLoad_->conditionTasksDone.wait(dummy, [&tasksPending = workLoad_->tasksPending] { return tasksPending == 0; }); + //perf: no difference in xBRZ test case compared to std::condition_variable-based implementation + auto promiseDone = std::make_shared<std::promise<void>>(); // + std::future<void> allDone = promiseDone->get_future(); + + notifyWhenDone([promiseDone] { promiseDone->set_value(); }); //std::function doesn't support construction involving move-only types! + //use reference? => not guaranteed safe, e.g. promise object theoretically might be accessed inside set_value() after future gets signalled + + allDone.get(); + } + + //non-blocking wait()-alternative: context of controlling thread: + void notifyWhenDone(const std::function<void()>& onCompletion /*noexcept! runs on worker thread!*/) + { + std::lock_guard<std::mutex> dummy(workLoad_->lock); + + if (workLoad_->tasksPending == 0) + onCompletion(); + else + workLoad_->onCompletionCallbacks.push_back(onCompletion); } + //context of controlling thread: void detach() { detach_ = true; } //not expected to also interrupt! private: @@ -204,13 +222,21 @@ private: Function task = std::move(wl->tasks. front()); //noexcept thanks to move /**/ wl->tasks.pop_front(); // - dummy.unlock(); - - task(); + dummy.unlock(); + task(); //throw ThreadInterruption? dummy.lock(); + if (--(wl->tasksPending) == 0) - wl->conditionTasksDone.notify_all(); //too difficult to notify outside the lock + if (!wl->onCompletionCallbacks.empty()) + { + std::vector<std::function<void()>> callbacks; + callbacks.swap(wl->onCompletionCallbacks); + + dummy.unlock(); + for (const auto& cb : callbacks) cb(); //noexcept! + dummy.lock(); + } } }); } @@ -230,7 +256,7 @@ private: RingBuffer<Function> tasks; //FIFO! :) size_t tasksPending = 0; std::condition_variable conditionNewTask; - std::condition_variable conditionTasksDone; + std::vector<std::function<void()>> onCompletionCallbacks; }; std::vector<InterruptibleThread> worker_; |