diff options
author | Daniel Wilhelm <shieldwed@outlook.com> | 2018-05-09 00:13:16 +0200 |
---|---|---|
committer | Daniel Wilhelm <shieldwed@outlook.com> | 2018-05-09 00:13:16 +0200 |
commit | 2dd6739826c86ca96a6a1548fd2e0fb7c0eb8dd0 (patch) | |
tree | 39d2d1eed28bf2c231839dd118905fb34327628d /zen/thread.h | |
parent | 9.9 (diff) | |
download | FreeFileSync-2dd6739826c86ca96a6a1548fd2e0fb7c0eb8dd0.tar.gz FreeFileSync-2dd6739826c86ca96a6a1548fd2e0fb7c0eb8dd0.tar.bz2 FreeFileSync-2dd6739826c86ca96a6a1548fd2e0fb7c0eb8dd0.zip |
10.0
Diffstat (limited to 'zen/thread.h')
-rwxr-xr-x | zen/thread.h | 72 |
1 files changed, 65 insertions, 7 deletions
diff --git a/zen/thread.h b/zen/thread.h index 3721b3c7..ed61e06b 100755 --- a/zen/thread.h +++ b/zen/thread.h @@ -71,8 +71,8 @@ std::async replacement without crappy semantics: 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor Example: - Zstring dirpath = ... - auto ft = zen::runAsync([=]{ return zen::dirExists(dirpath); }); + Zstring dirPath = ... + auto ft = zen::runAsync([=]{ return zen::dirExists(dirPath); }); if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get()) //dir exising */ @@ -120,10 +120,10 @@ public: Protected(const T& value) : value_(value) {} template <class Function> - void access(Function fun) + auto access(Function fun) //-> decltype(fun(std::declval<T&>())) { std::lock_guard<std::mutex> dummy(lockValue_); - fun(value_); + return fun(value_); } private: @@ -134,6 +134,60 @@ private: T value_{}; }; +//------------------------------------------------------------------------------------------ + +template <class Function> +class ThreadGroup +{ +public: + ThreadGroup(size_t threadCount, const std::string& groupName) + { + for (size_t i = 0; i < threadCount; ++i) + worker_.emplace_back([this, groupName, i, threadCount] + { + setCurrentThreadName((groupName + "[" + numberTo<std::string>(i + 1) + "/" + numberTo<std::string>(threadCount) + "]").c_str()); + for (;;) + getNextWorkItem()(); //throw ThreadInterruption + }); + } + ~ThreadGroup() + { + for (InterruptibleThread& w : worker_) w.interrupt(); //interrupt all first, then join + for (InterruptibleThread& w : worker_) w.join(); + } + + //context of controlling thread, non-blocking: + void run(Function&& wi) + { + assert(!worker_.empty()); + { + std::lock_guard<std::mutex> dummy(lockWork_); + workItems_.push_back(std::move(wi)); + } + conditionNewWork_.notify_all(); + } + +private: + ThreadGroup (const ThreadGroup&) = delete; + ThreadGroup& operator=(const ThreadGroup&) = delete; + + //context of worker threads, blocking: + Function getNextWorkItem() //throw ThreadInterruption + { + std::unique_lock<std::mutex> dummy(lockWork_); + + interruptibleWait(conditionNewWork_, dummy, [this] { return !workItems_.empty(); }); //throw ThreadInterruption + warn_static("implement FIFO!?") + + Function wi = std::move(workItems_. back()); // + /**/ workItems_.pop_back(); //noexcept thanks to move + return wi; // + } + std::vector<InterruptibleThread> worker_; + std::mutex lockWork_; + std::vector<Function> workItems_; + std::condition_variable conditionNewWork_; +}; @@ -222,10 +276,9 @@ public: private: bool jobDone(size_t jobsTotal) const { return result_ || (jobsFinished_ >= jobsTotal); } //call while locked! - std::mutex lockResult_; size_t jobsFinished_ = 0; // - Opt<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal" + Opt<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal" std::condition_variable conditionJobDone_; }; @@ -256,7 +309,11 @@ Opt<T> GetFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal //------------------------------------------------------------------------------------------ //thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: +#if defined __GNUC__ || defined __clang__ #define ZEN_THREAD_LOCAL_SPECIFIER __thread +#else + #error "Game over!" +#endif class ThreadInterruption {}; @@ -296,7 +353,8 @@ public: setConditionVar(&cv); ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr)); - //"interrupted_" is not protected by cv's mutex => signal may get lost!!! => add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! + //"interrupted_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins + //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted_ || pred(); })) ; |