summaryrefslogtreecommitdiff
path: root/zen/thread.h
diff options
context:
space:
mode:
Diffstat (limited to 'zen/thread.h')
-rwxr-xr-xzen/thread.h72
1 files changed, 65 insertions, 7 deletions
diff --git a/zen/thread.h b/zen/thread.h
index 3721b3c7..ed61e06b 100755
--- a/zen/thread.h
+++ b/zen/thread.h
@@ -71,8 +71,8 @@ std::async replacement without crappy semantics:
2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor
Example:
- Zstring dirpath = ...
- auto ft = zen::runAsync([=]{ return zen::dirExists(dirpath); });
+ Zstring dirPath = ...
+ auto ft = zen::runAsync([=]{ return zen::dirExists(dirPath); });
if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get())
//dir exising
*/
@@ -120,10 +120,10 @@ public:
Protected(const T& value) : value_(value) {}
template <class Function>
- void access(Function fun)
+ auto access(Function fun) //-> decltype(fun(std::declval<T&>()))
{
std::lock_guard<std::mutex> dummy(lockValue_);
- fun(value_);
+ return fun(value_);
}
private:
@@ -134,6 +134,60 @@ private:
T value_{};
};
+//------------------------------------------------------------------------------------------
+
+template <class Function>
+class ThreadGroup
+{
+public:
+ ThreadGroup(size_t threadCount, const std::string& groupName)
+ {
+ for (size_t i = 0; i < threadCount; ++i)
+ worker_.emplace_back([this, groupName, i, threadCount]
+ {
+ setCurrentThreadName((groupName + "[" + numberTo<std::string>(i + 1) + "/" + numberTo<std::string>(threadCount) + "]").c_str());
+ for (;;)
+ getNextWorkItem()(); //throw ThreadInterruption
+ });
+ }
+ ~ThreadGroup()
+ {
+ for (InterruptibleThread& w : worker_) w.interrupt(); //interrupt all first, then join
+ for (InterruptibleThread& w : worker_) w.join();
+ }
+
+ //context of controlling thread, non-blocking:
+ void run(Function&& wi)
+ {
+ assert(!worker_.empty());
+ {
+ std::lock_guard<std::mutex> dummy(lockWork_);
+ workItems_.push_back(std::move(wi));
+ }
+ conditionNewWork_.notify_all();
+ }
+
+private:
+ ThreadGroup (const ThreadGroup&) = delete;
+ ThreadGroup& operator=(const ThreadGroup&) = delete;
+
+ //context of worker threads, blocking:
+ Function getNextWorkItem() //throw ThreadInterruption
+ {
+ std::unique_lock<std::mutex> dummy(lockWork_);
+
+ interruptibleWait(conditionNewWork_, dummy, [this] { return !workItems_.empty(); }); //throw ThreadInterruption
+ warn_static("implement FIFO!?")
+
+ Function wi = std::move(workItems_. back()); //
+ /**/ workItems_.pop_back(); //noexcept thanks to move
+ return wi; //
+ }
+ std::vector<InterruptibleThread> worker_;
+ std::mutex lockWork_;
+ std::vector<Function> workItems_;
+ std::condition_variable conditionNewWork_;
+};
@@ -222,10 +276,9 @@ public:
private:
bool jobDone(size_t jobsTotal) const { return result_ || (jobsFinished_ >= jobsTotal); } //call while locked!
-
std::mutex lockResult_;
size_t jobsFinished_ = 0; //
- Opt<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal"
+ Opt<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal"
std::condition_variable conditionJobDone_;
};
@@ -256,7 +309,11 @@ Opt<T> GetFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal
//------------------------------------------------------------------------------------------
//thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine:
+#if defined __GNUC__ || defined __clang__
#define ZEN_THREAD_LOCAL_SPECIFIER __thread
+#else
+ #error "Game over!"
+#endif
class ThreadInterruption {};
@@ -296,7 +353,8 @@ public:
setConditionVar(&cv);
ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr));
- //"interrupted_" is not protected by cv's mutex => signal may get lost!!! => add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out!
+ //"interrupted_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins
+ //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out!
while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted_ || pred(); }))
;
bgstack15