diff options
author | B Stack <bgstack15@gmail.com> | 2020-08-31 20:07:13 -0400 |
---|---|---|
committer | B Stack <bgstack15@gmail.com> | 2020-08-31 20:07:13 -0400 |
commit | 8a27fa9c617533e76673ce61a65e2ba869b52208 (patch) | |
tree | acfdfb3e1046db87040477033fda0df76d92916a /zen/thread.h | |
parent | Merge branch '11.0' into 'master' (diff) | |
download | FreeFileSync-8a27fa9c617533e76673ce61a65e2ba869b52208.tar.gz FreeFileSync-8a27fa9c617533e76673ce61a65e2ba869b52208.tar.bz2 FreeFileSync-8a27fa9c617533e76673ce61a65e2ba869b52208.zip |
add upstream 11.1
Diffstat (limited to 'zen/thread.h')
-rw-r--r-- | zen/thread.h | 223 |
1 files changed, 105 insertions, 118 deletions
diff --git a/zen/thread.h b/zen/thread.h index 99e61e1f..1bea95ea 100644 --- a/zen/thread.h +++ b/zen/thread.h @@ -12,81 +12,89 @@ #include "scope_guard.h" #include "ring_buffer.h" #include "string_tools.h" +#include "zstring.h" namespace zen { class InterruptionStatus; +//migrate towards https://en.cppreference.com/w/cpp/thread/jthread class InterruptibleThread { public: InterruptibleThread() {} - InterruptibleThread (InterruptibleThread&&) noexcept = default; - InterruptibleThread& operator=(InterruptibleThread&&) noexcept = default; + InterruptibleThread (InterruptibleThread&& ) noexcept = default; + InterruptibleThread& operator=(InterruptibleThread&& tmp) noexcept //don't use swap() but end stdThread_ life time immediately + { + if (joinable()) + { + requestStop(); + join(); + } + stdThread_ = std::move(tmp.stdThread_); + intStatus_ = std::move(tmp.intStatus_); + return *this; + } template <class Function> - InterruptibleThread(Function&& f); + explicit InterruptibleThread(Function&& f); + + ~InterruptibleThread() + { + if (joinable()) + { + requestStop(); + join(); + } + } bool joinable () const { return stdThread_.joinable(); } - void interrupt(); + void requestStop(); void join () { stdThread_.join(); } void detach () { stdThread_.detach(); } - template <class Rep, class Period> - bool tryJoinFor(const std::chrono::duration<Rep, Period>& relTime) - { - if (threadCompleted_.wait_for(relTime) != std::future_status::ready) - return false; - - stdThread_.join(); //runs thread-local destructors => this better be fast!!! - return true; - } - private: std::thread stdThread_; std::shared_ptr<InterruptionStatus> intStatus_ = std::make_shared<InterruptionStatus>(); - std::future<void> threadCompleted_; }; + +class ThreadStopRequest {}; + //context of worker thread: -void interruptionPoint(); //throw ThreadInterruption +void interruptionPoint(); //throw ThreadStopRequest template<class Predicate> -void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred); //throw ThreadInterruption +void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred); //throw ThreadStopRequest template <class Rep, class Period> -void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //throw ThreadInterruption - -void setCurrentThreadName(const char* threadName); +void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //throw ThreadStopRequest -uint64_t getThreadId(); //simple integer thread id, unlike boost::thread::id: https://svn.boost.org/trac/boost/ticket/5754 -uint64_t getMainThreadId(); +void setCurrentThreadName(const Zstring& threadName); -inline bool runningMainThread() { return getThreadId() == getMainThreadId(); } +bool runningOnMainThread(); //------------------------------------------------------------------------------------------ -/* -std::async replacement without crappy semantics: - 1. guaranteed to run asynchronously - 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor - -Example: - Zstring dirPath = ... - auto ft = zen::runAsync([=]{ return zen::dirExists(dirPath); }); - if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get()) - //dir existing -*/ +/* std::async replacement without crappy semantics: + 1. guaranteed to run asynchronously + 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor + + Example: + Zstring dirPath = ... + auto ft = zen::runAsync([=]{ return zen::dirExists(dirPath); }); + if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get()) + //dir existing */ template <class Function> auto runAsync(Function&& fun); //wait for all with a time limit: return true if *all* results are available! //TODO: use std::when_all when available template<class InputIterator, class Duration> -bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration); +bool waitForAllTimed(InputIterator first, InputIterator last, const Duration& wait_duration); template<typename T> inline -bool isReady(const std::future<T>& f) { return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } +bool isReady(const std::future<T>& f) { assert(f.valid()); return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } //------------------------------------------------------------------------------------------ //wait until first job is successful or all failed @@ -115,13 +123,13 @@ private: //------------------------------------------------------------------------------------------ //value associated with mutex and guaranteed protected access: -//TODO: use std::synchronized_value when available +//TODO: use std::synchronized_value when available http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0290r2.html template <class T> class Protected { public: Protected() {} - Protected(T& value) : value_(value) {} + explicit Protected(T& value) : value_(value) {} //Protected(T&& tmp ) : value_(std::move(tmp)) {} <- wait until needed template <class Function> @@ -145,26 +153,24 @@ template <class Function> class ThreadGroup { public: - ThreadGroup(size_t threadCountMax, const std::string& groupName) : threadCountMax_(threadCountMax), groupName_(groupName) + ThreadGroup(size_t threadCountMax, const Zstring& groupName) : threadCountMax_(threadCountMax), groupName_(groupName) { if (threadCountMax == 0) throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); } + ThreadGroup (ThreadGroup&& tmp) noexcept = default; //noexcept *required* to support move for reallocations in std::vector and std::swap!!! + ThreadGroup& operator=(ThreadGroup&& tmp) noexcept = default; //don't use swap() but end worker_ life time immediately + ~ThreadGroup() { - for (InterruptibleThread& w : worker_) w.interrupt(); //interrupt all first, then join - for (InterruptibleThread& w : worker_) detach_ ? w.detach() : w.join(); - } + for (InterruptibleThread& w : worker_) + w.requestStop(); //stop *all* at the same time before join! - ThreadGroup(ThreadGroup&& tmp) noexcept : - worker_ (std::move(tmp.worker_)), - workLoad_ (std::move(tmp.workLoad_)), - detach_ (tmp.detach_), - threadCountMax_(tmp.threadCountMax_), - groupName_ (std::move(tmp.groupName_)) { tmp.worker_.clear(); /*just in case: make sure destructor is no-op!*/ } - - ThreadGroup& operator=(ThreadGroup&& tmp) noexcept { swap(tmp); return *this; } //noexcept *required* to support move for reallocations in std::vector and std::swap!!! + if (detach_) //detach() without requestStop() doesn't make sense + for (InterruptibleThread& w : worker_) + w.detach(); + } //context of controlling OR worker thread, non-blocking: - void run(Function&& wi /*should throw ThreadInterruption when needed*/, bool insertFront = false) + void run(Function&& wi /*should throw ThreadStopRequest when needed*/, bool insertFront = false) { { std::lock_guard dummy(workLoad_->lock); @@ -214,22 +220,22 @@ private: void addWorkerThread() { - std::string threadName = groupName_ + '[' + numberTo<std::string>(worker_.size() + 1) + '/' + numberTo<std::string>(threadCountMax_) + ']'; + Zstring threadName = groupName_ + Zstr('[') + numberTo<Zstring>(worker_.size() + 1) + Zstr('/') + numberTo<Zstring>(threadCountMax_) + Zstr(']'); - worker_.emplace_back([wl = workLoad_, threadName = std::move(threadName)] //don't capture "this"! consider detach() and swap() + worker_.emplace_back([wl = workLoad_, threadName = std::move(threadName)] //don't capture "this"! consider detach() and move operations { - setCurrentThreadName(threadName.c_str()); + setCurrentThreadName(threadName); std::unique_lock dummy(wl->lock); for (;;) { - interruptibleWait(wl->conditionNewTask, dummy, [&tasks = wl->tasks] { return !tasks.empty(); }); //throw ThreadInterruption + interruptibleWait(wl->conditionNewTask, dummy, [&tasks = wl->tasks] { return !tasks.empty(); }); //throw ThreadStopRequest Function task = std::move(wl->tasks. front()); //noexcept thanks to move /**/ wl->tasks.pop_front(); // dummy.unlock(); - task(); //throw ThreadInterruption? + task(); //throw ThreadStopRequest? dummy.lock(); if (--(wl->tasksPending) == 0) @@ -239,22 +245,14 @@ private: callbacks.swap(wl->onCompletionCallbacks); dummy.unlock(); - for (const auto& cb : callbacks) cb(); //noexcept! + for (const auto& cb : callbacks) + cb(); //noexcept! dummy.lock(); } } }); } - void swap(ThreadGroup& other) - { - std::swap(worker_, other.worker_); - std::swap(workLoad_, other.workLoad_); - std::swap(detach_, other.detach_); - std::swap(threadCountMax_, other.threadCountMax_); - std::swap(groupName_, other.groupName_); - } - struct WorkLoad { std::mutex lock; @@ -268,7 +266,7 @@ private: std::shared_ptr<WorkLoad> workLoad_ = std::make_shared<WorkLoad>(); bool detach_ = false; size_t threadCountMax_; - std::string groupName_; + Zstring groupName_; }; @@ -313,12 +311,12 @@ auto runAsync(Function&& fun) template<class InputIterator, class Duration> inline -bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& duration) +bool waitForAllTimed(InputIterator first, InputIterator last, const Duration& duration) { const std::chrono::steady_clock::time_point stopTime = std::chrono::steady_clock::now() + duration; for (; first != last; ++first) - if (first->wait_until(stopTime) != std::future_status::ready) - return false; //time elapsed + if (first->wait_until(stopTime) == std::future_status::timeout) + return false; return true; } @@ -360,7 +358,7 @@ private: std::mutex lockResult_; size_t jobsFinished_ = 0; // - std::optional<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal" + std::optional<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal" std::condition_variable conditionJobDone_; }; @@ -390,16 +388,13 @@ std::optional<T> AsyncFirstResult<T>::get() const { return asyncResult_->getResu //------------------------------------------------------------------------------------------ -class ThreadInterruption {}; - - class InterruptionStatus { public: //context of InterruptibleThread instance: - void interrupt() + void requestStop() { - interrupted_ = true; + stopRequested_ = true; { std::lock_guard dummy(lockSleep_); //needed! makes sure the following signal is not lost! @@ -414,34 +409,34 @@ public: } //context of worker thread: - void checkInterruption() //throw ThreadInterruption + void throwIfStopped() //throw ThreadStopRequest { - if (interrupted_) - throw ThreadInterruption(); + if (stopRequested_) + throw ThreadStopRequest(); } //context of worker thread: template<class Predicate> - void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption + void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadStopRequest { setConditionVar(&cv); ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr)); - //"interrupted_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins + //"stopRequested_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! - while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted_ || pred(); })) + while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->stopRequested_ || pred(); })) ; - checkInterruption(); //throw ThreadInterruption + throwIfStopped(); //throw ThreadStopRequest } //context of worker thread: template <class Rep, class Period> - void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption + void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadStopRequest { std::unique_lock lock(lockSleep_); - if (conditionSleepInterruption_.wait_for(lock, relTime, [this] { return static_cast<bool>(this->interrupted_); })) - throw ThreadInterruption(); + if (conditionSleepInterruption_.wait_for(lock, relTime, [this] { return static_cast<bool>(this->stopRequested_); })) + throw ThreadStopRequest(); } private: @@ -451,7 +446,7 @@ private: activeCondition_ = cv; } - std::atomic<bool> interrupted_{ false }; //std:atomic is uninitialized by default!!! + std::atomic<bool> stopRequested_{ false }; //std:atomic is uninitialized by default!!! //"The default constructor is trivial: no initialization takes place other than zero initialization of static and thread-local objects." std::condition_variable* activeCondition_ = nullptr; @@ -464,43 +459,40 @@ private: namespace impl { -inline -InterruptionStatus*& refThreadLocalInterruptionStatus() -{ - //thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: - thread_local InterruptionStatus* threadLocalInterruptionStatus = nullptr; - return threadLocalInterruptionStatus; -} +//thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: +inline thread_local InterruptionStatus* threadLocalInterruptionStatus = nullptr; } + //context of worker thread: inline -void interruptionPoint() //throw ThreadInterruption +void interruptionPoint() //throw ThreadStopRequest { - assert(impl::refThreadLocalInterruptionStatus()); - if (impl::refThreadLocalInterruptionStatus()) - impl::refThreadLocalInterruptionStatus()->checkInterruption(); //throw ThreadInterruption + assert(impl::threadLocalInterruptionStatus); + if (impl::threadLocalInterruptionStatus) + impl::threadLocalInterruptionStatus->throwIfStopped(); //throw ThreadStopRequest } //context of worker thread: template<class Predicate> inline -void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption +void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadStopRequest { - assert(impl::refThreadLocalInterruptionStatus()); - if (impl::refThreadLocalInterruptionStatus()) - impl::refThreadLocalInterruptionStatus()->interruptibleWait(cv, lock, pred); + assert(impl::threadLocalInterruptionStatus); + if (impl::threadLocalInterruptionStatus) + impl::threadLocalInterruptionStatus->interruptibleWait(cv, lock, pred); else cv.wait(lock, pred); } + //context of worker thread: template <class Rep, class Period> inline -void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption +void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadStopRequest { - assert(impl::refThreadLocalInterruptionStatus()); - if (impl::refThreadLocalInterruptionStatus()) - impl::refThreadLocalInterruptionStatus()->interruptibleSleep(relTime); + assert(impl::threadLocalInterruptionStatus); + if (impl::threadLocalInterruptionStatus) + impl::threadLocalInterruptionStatus->interruptibleSleep(relTime); else std::this_thread::sleep_for(relTime); } @@ -509,29 +501,24 @@ void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //thr template <class Function> inline InterruptibleThread::InterruptibleThread(Function&& f) { - std::promise<void> pFinished; - threadCompleted_ = pFinished.get_future(); - stdThread_ = std::thread([f = std::forward<Function>(f), - intStatus = this->intStatus_, - pFinished = std::move(pFinished)]() mutable + intStatus = this->intStatus_]() mutable { - assert(!impl::refThreadLocalInterruptionStatus()); - impl::refThreadLocalInterruptionStatus() = intStatus.get(); - ZEN_ON_SCOPE_EXIT(impl::refThreadLocalInterruptionStatus() = nullptr); - ZEN_ON_SCOPE_EXIT(pFinished.set_value()); + assert(!impl::threadLocalInterruptionStatus); + impl::threadLocalInterruptionStatus = intStatus.get(); + ZEN_ON_SCOPE_EXIT(impl::threadLocalInterruptionStatus = nullptr); try { - f(); //throw ThreadInterruption + f(); //throw ThreadStopRequest } - catch (ThreadInterruption&) {} + catch (ThreadStopRequest&) {} }); } inline -void InterruptibleThread::interrupt() { intStatus_->interrupt(); } +void InterruptibleThread::requestStop() { intStatus_->requestStop(); } } #endif //THREAD_H_7896323423432235246427 |