summaryrefslogtreecommitdiff
path: root/zen/thread.h
diff options
context:
space:
mode:
Diffstat (limited to 'zen/thread.h')
-rwxr-xr-xzen/thread.h56
1 files changed, 41 insertions, 15 deletions
diff --git a/zen/thread.h b/zen/thread.h
index ee36f305..5828d07a 100755
--- a/zen/thread.h
+++ b/zen/thread.h
@@ -119,7 +119,7 @@ class Protected
{
public:
Protected() {}
- Protected(const T& value) : value_(value) {}
+ Protected(T& value) : value_(value) {}
//Protected( T&& tmp ) : value_(std::move(tmp)) {} <- wait until needed
template <class Function>
@@ -161,28 +161,46 @@ public:
ThreadGroup& operator=(ThreadGroup&& tmp) noexcept { swap(tmp); return *this; } //noexcept *required* to support move for reallocations in std::vector and std::swap!!!
- //context of controlling thread, non-blocking:
- void run(Function&& wi)
+ //context of controlling OR worker thread, non-blocking:
+ void run(Function&& wi /*should throw ThreadInterruption when needed*/)
{
- size_t tasksPending = 0;
{
std::lock_guard<std::mutex> dummy(workLoad_->lock);
+
workLoad_->tasks.push_back(std::move(wi));
- tasksPending = ++(workLoad_->tasksPending);
+ const size_t tasksPending = ++(workLoad_->tasksPending);
+
+ if (worker_.size() < std::min(tasksPending, threadCountMax_))
+ addWorkerThread();
}
workLoad_->conditionNewTask.notify_all();
-
- if (worker_.size() < std::min(tasksPending, threadCountMax_))
- addWorkerThread();
}
//context of controlling thread, blocking:
void wait()
{
- std::unique_lock<std::mutex> dummy(workLoad_->lock);
- workLoad_->conditionTasksDone.wait(dummy, [&tasksPending = workLoad_->tasksPending] { return tasksPending == 0; });
+ //perf: no difference in xBRZ test case compared to std::condition_variable-based implementation
+ auto promiseDone = std::make_shared<std::promise<void>>(); //
+ std::future<void> allDone = promiseDone->get_future();
+
+ notifyWhenDone([promiseDone] { promiseDone->set_value(); }); //std::function doesn't support construction involving move-only types!
+ //use reference? => not guaranteed safe, e.g. promise object theoretically might be accessed inside set_value() after future gets signalled
+
+ allDone.get();
+ }
+
+ //non-blocking wait()-alternative: context of controlling thread:
+ void notifyWhenDone(const std::function<void()>& onCompletion /*noexcept! runs on worker thread!*/)
+ {
+ std::lock_guard<std::mutex> dummy(workLoad_->lock);
+
+ if (workLoad_->tasksPending == 0)
+ onCompletion();
+ else
+ workLoad_->onCompletionCallbacks.push_back(onCompletion);
}
+ //context of controlling thread:
void detach() { detach_ = true; } //not expected to also interrupt!
private:
@@ -204,13 +222,21 @@ private:
Function task = std::move(wl->tasks. front()); //noexcept thanks to move
/**/ wl->tasks.pop_front(); //
- dummy.unlock();
-
- task();
+ dummy.unlock();
+ task(); //throw ThreadInterruption?
dummy.lock();
+
if (--(wl->tasksPending) == 0)
- wl->conditionTasksDone.notify_all(); //too difficult to notify outside the lock
+ if (!wl->onCompletionCallbacks.empty())
+ {
+ std::vector<std::function<void()>> callbacks;
+ callbacks.swap(wl->onCompletionCallbacks);
+
+ dummy.unlock();
+ for (const auto& cb : callbacks) cb(); //noexcept!
+ dummy.lock();
+ }
}
});
}
@@ -230,7 +256,7 @@ private:
RingBuffer<Function> tasks; //FIFO! :)
size_t tasksPending = 0;
std::condition_variable conditionNewTask;
- std::condition_variable conditionTasksDone;
+ std::vector<std::function<void()>> onCompletionCallbacks;
};
std::vector<InterruptibleThread> worker_;
bgstack15