// ***************************************************************************** // * This file is part of the FreeFileSync project. It is distributed under * // * GNU General Public License: https://www.gnu.org/licenses/gpl-3.0 * // * Copyright (C) Zenju (zenju AT freefilesync DOT org) - All Rights Reserved * // ***************************************************************************** #ifndef THREAD_H_7896323423432235246427 #define THREAD_H_7896323423432235246427 #include <thread> #include <future> #include "scope_guard.h" #include "ring_buffer.h" #include "string_tools.h" #include "zstring.h" namespace zen { class InterruptionStatus; //migrate towards https://en.cppreference.com/w/cpp/thread/jthread class InterruptibleThread { public: InterruptibleThread() {} InterruptibleThread (InterruptibleThread&& ) noexcept = default; InterruptibleThread& operator=(InterruptibleThread&& tmp) noexcept //don't use swap() but end stdThread_ life time immediately { if (joinable()) { requestStop(); join(); } stdThread_ = std::move(tmp.stdThread_); intStatus_ = std::move(tmp.intStatus_); return *this; } template <class Function> explicit InterruptibleThread(Function&& f); ~InterruptibleThread() { if (joinable()) { requestStop(); join(); } } bool joinable () const { return stdThread_.joinable(); } void requestStop(); void join () { stdThread_.join(); } void detach () { stdThread_.detach(); } private: std::thread stdThread_; std::shared_ptr<InterruptionStatus> intStatus_ = std::make_shared<InterruptionStatus>(); }; class ThreadStopRequest {}; //context of worker thread: void interruptionPoint(); //throw ThreadStopRequest template<class Predicate> void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred); //throw ThreadStopRequest template <class Rep, class Period> void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //throw ThreadStopRequest void setCurrentThreadName(const Zstring& threadName); bool runningOnMainThread(); //------------------------------------------------------------------------------------------ /* std::async replacement without crappy semantics: 1. guaranteed to run asynchronously 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor Example: Zstring dirPath = ... auto ft = zen::runAsync([=]{ return zen::dirExists(dirPath); }); if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get()) //dir existing */ template <class Function> auto runAsync(Function&& fun); //wait for all with a time limit: return true if *all* results are available! //TODO: use std::when_all when available template<class InputIterator, class Duration> bool waitForAllTimed(InputIterator first, InputIterator last, const Duration& wait_duration); template<typename T> inline bool isReady(const std::future<T>& f) { assert(f.valid()); return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } //------------------------------------------------------------------------------------------ //wait until first job is successful or all failed //TODO: use std::when_any when available template <class T> class AsyncFirstResult { public: AsyncFirstResult(); template <class Fun> void addJob(Fun&& f); //f must return a std::optional<T> containing a value if successful template <class Duration> bool timedWait(const Duration& duration) const; //true: "get()" is ready, false: time elapsed //return first value or none if all jobs failed; blocks until result is ready! std::optional<T> get() const; //may be called only once! private: class AsyncResult; std::shared_ptr<AsyncResult> asyncResult_; size_t jobsTotal_ = 0; }; //------------------------------------------------------------------------------------------ //value associated with mutex and guaranteed protected access: //TODO: use std::synchronized_value when available http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0290r2.html template <class T> class Protected { public: Protected() {} explicit Protected(T& value) : value_(value) {} //Protected(T&& tmp ) : value_(std::move(tmp)) {} <- wait until needed template <class Function> auto access(Function fun) //-> decltype(fun(std::declval<T&>())) { std::lock_guard dummy(lockValue_); return fun(value_); } private: Protected (const Protected&) = delete; Protected& operator=(const Protected&) = delete; std::mutex lockValue_; T value_{}; }; //------------------------------------------------------------------------------------------ template <class Function> class ThreadGroup { public: ThreadGroup(size_t threadCountMax, const Zstring& groupName) : threadCountMax_(threadCountMax), groupName_(groupName) { if (threadCountMax == 0) throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); } ThreadGroup (ThreadGroup&& tmp) noexcept = default; //noexcept *required* to support move for reallocations in std::vector and std::swap!!! ThreadGroup& operator=(ThreadGroup&& tmp) noexcept = default; //don't use swap() but end worker_ life time immediately ~ThreadGroup() { for (InterruptibleThread& w : worker_) w.requestStop(); //stop *all* at the same time before join! if (detach_) //detach() without requestStop() doesn't make sense for (InterruptibleThread& w : worker_) w.detach(); } //context of controlling OR worker thread, non-blocking: void run(Function&& wi /*should throw ThreadStopRequest when needed*/, bool insertFront = false) { { std::lock_guard dummy(workLoad_.ref().lock); if (insertFront) workLoad_.ref().tasks.push_front(std::move(wi)); else workLoad_.ref().tasks.push_back(std::move(wi)); const size_t tasksPending = ++(workLoad_.ref().tasksPending); if (worker_.size() < std::min(tasksPending, threadCountMax_)) addWorkerThread(); } workLoad_.ref().conditionNewTask.notify_all(); } //context of controlling thread, blocking: void wait() { //perf: no difference in xBRZ test case compared to std::condition_variable-based implementation auto promiseDone = std::make_shared<std::promise<void>>(); // std::future<void> allDone = promiseDone->get_future(); notifyWhenDone([promiseDone] { promiseDone->set_value(); }); //std::function doesn't support construction involving move-only types! //use reference? => potential lifetime issue, e.g. promise object theoretically might be accessed inside set_value() after future gets signalled allDone.get(); } //non-blocking wait()-alternative: context of controlling thread: void notifyWhenDone(const std::function<void()>& onCompletion /*noexcept! runs on worker thread!*/) { std::lock_guard dummy(workLoad_.ref().lock); if (workLoad_.ref().tasksPending == 0) onCompletion(); else workLoad_.ref().onCompletionCallbacks.push_back(onCompletion); } //context of controlling thread: void detach() { detach_ = true; } //not expected to also interrupt! private: ThreadGroup (const ThreadGroup&) = delete; ThreadGroup& operator=(const ThreadGroup&) = delete; void addWorkerThread() { Zstring threadName = groupName_ + Zstr('[') + numberTo<Zstring>(worker_.size() + 1) + Zstr('/') + numberTo<Zstring>(threadCountMax_) + Zstr(']'); worker_.emplace_back([workLoad_ /*clang bug*/= workLoad_ /*share ownership!*/, threadName = std::move(threadName)]() mutable //don't capture "this"! consider detach() and move operations { setCurrentThreadName(threadName); WorkLoad& workLoad = workLoad_.ref(); std::unique_lock dummy(workLoad.lock); for (;;) { interruptibleWait(workLoad.conditionNewTask, dummy, [&tasks = workLoad.tasks] { return !tasks.empty(); }); //throw ThreadStopRequest Function task = std::move(workLoad.tasks. front()); //noexcept thanks to move /**/ workLoad.tasks.pop_front(); // dummy.unlock(); task(); //throw ThreadStopRequest? dummy.lock(); if (--(workLoad.tasksPending) == 0) if (!workLoad.onCompletionCallbacks.empty()) { std::vector<std::function<void()>> callbacks; callbacks.swap(workLoad.onCompletionCallbacks); dummy.unlock(); for (const auto& cb : callbacks) cb(); //noexcept! dummy.lock(); } } }); } struct WorkLoad { std::mutex lock; RingBuffer<Function> tasks; //FIFO! :) size_t tasksPending = 0; std::condition_variable conditionNewTask; std::vector<std::function<void()>> onCompletionCallbacks; }; std::vector<InterruptibleThread> worker_; SharedRef<WorkLoad> workLoad_ = makeSharedRef<WorkLoad>(); bool detach_ = false; size_t threadCountMax_; Zstring groupName_; }; //###################### implementation ###################### namespace impl { template <class Function> inline auto runAsync(Function&& fun, std::true_type /*copy-constructible*/) { using ResultType = decltype(fun()); //note: std::packaged_task does NOT support move-only function objects! std::packaged_task<ResultType()> pt(std::forward<Function>(fun)); auto fut = pt.get_future(); std::thread(std::move(pt)).detach(); //we have to explicitly detach since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! return fut; } template <class Function> inline auto runAsync(Function&& fun, std::false_type /*copy-constructible*/) { //support move-only function objects! auto sharedFun = std::make_shared<Function>(std::forward<Function>(fun)); return runAsync([sharedFun] { return (*sharedFun)(); }, std::true_type()); } } template <class Function> inline auto runAsync(Function&& fun) { return impl::runAsync(std::forward<Function>(fun), std::is_copy_constructible<Function>()); } template<class InputIterator, class Duration> inline bool waitForAllTimed(InputIterator first, InputIterator last, const Duration& duration) { const std::chrono::steady_clock::time_point stopTime = std::chrono::steady_clock::now() + duration; for (; first != last; ++first) if (first->wait_until(stopTime) == std::future_status::timeout) return false; return true; } template <class T> class AsyncFirstResult<T>::AsyncResult { public: //context: worker threads void reportFinished(std::optional<T>&& result) { { std::lock_guard dummy(lockResult_); ++jobsFinished_; if (!result_) result_ = std::move(result); } conditionJobDone_.notify_all(); //better notify all, considering bugs like: https://svn.boost.org/trac/boost/ticket/7796 } //context: main thread template <class Duration> bool waitForResult(size_t jobsTotal, const Duration& duration) { std::unique_lock dummy(lockResult_); return conditionJobDone_.wait_for(dummy, duration, [&] { return this->jobDone(jobsTotal); }); } std::optional<T> getResult(size_t jobsTotal) { std::unique_lock dummy(lockResult_); conditionJobDone_.wait(dummy, [&] { return this->jobDone(jobsTotal); }); return std::move(result_); } private: bool jobDone(size_t jobsTotal) const { return result_ || (jobsFinished_ >= jobsTotal); } //call while locked! std::mutex lockResult_; size_t jobsFinished_ = 0; // std::optional<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal" std::condition_variable conditionJobDone_; }; template <class T> inline AsyncFirstResult<T>::AsyncFirstResult() : asyncResult_(std::make_shared<AsyncResult>()) {} template <class T> template <class Fun> inline void AsyncFirstResult<T>::addJob(Fun&& f) //f must return a std::optional<T> containing a value on success { std::thread t([asyncResult = this->asyncResult_, f = std::forward<Fun>(f)] { asyncResult->reportFinished(f()); }); ++jobsTotal_; t.detach(); //we have to be explicit since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! } template <class T> template <class Duration> inline bool AsyncFirstResult<T>::timedWait(const Duration& duration) const { return asyncResult_->waitForResult(jobsTotal_, duration); } template <class T> inline std::optional<T> AsyncFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal_); } //------------------------------------------------------------------------------------------ class InterruptionStatus { public: //context of InterruptibleThread instance: void requestStop() { stopRequested_ = true; { std::lock_guard dummy(lockSleep_); //needed! makes sure the following signal is not lost! //usually we'd make "interrupted" non-atomic, but this is already given due to interruptibleWait() handling } conditionSleepInterruption_.notify_all(); std::lock_guard dummy(lockConditionPtr_); if (activeCondition_) activeCondition_->notify_all(); //signal may get lost! //alternative design locking the cv's mutex here could be dangerous: potential for dead lock! } //context of worker thread: void throwIfStopped() //throw ThreadStopRequest { if (stopRequested_) throw ThreadStopRequest(); } //context of worker thread: template<class Predicate> void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadStopRequest { setConditionVar(&cv); ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr)); //"stopRequested_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->stopRequested_ || pred(); })) ; throwIfStopped(); //throw ThreadStopRequest } //context of worker thread: template <class Rep, class Period> void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadStopRequest { std::unique_lock lock(lockSleep_); if (conditionSleepInterruption_.wait_for(lock, relTime, [this] { return static_cast<bool>(this->stopRequested_); })) throw ThreadStopRequest(); } private: void setConditionVar(std::condition_variable* cv) { std::lock_guard dummy(lockConditionPtr_); activeCondition_ = cv; } std::atomic<bool> stopRequested_{false}; //std:atomic is uninitialized by default!!! //"The default constructor is trivial: no initialization takes place other than zero initialization of static and thread-local objects." std::condition_variable* activeCondition_ = nullptr; std::mutex lockConditionPtr_; //serialize pointer access (only!) std::condition_variable conditionSleepInterruption_; std::mutex lockSleep_; }; namespace impl { //thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: inline thread_local InterruptionStatus* threadLocalInterruptionStatus = nullptr; } //context of worker thread: inline void interruptionPoint() //throw ThreadStopRequest { assert(impl::threadLocalInterruptionStatus); if (impl::threadLocalInterruptionStatus) impl::threadLocalInterruptionStatus->throwIfStopped(); //throw ThreadStopRequest } //context of worker thread: template<class Predicate> inline void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadStopRequest { assert(impl::threadLocalInterruptionStatus); if (impl::threadLocalInterruptionStatus) impl::threadLocalInterruptionStatus->interruptibleWait(cv, lock, pred); else cv.wait(lock, pred); } //context of worker thread: template <class Rep, class Period> inline void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadStopRequest { assert(impl::threadLocalInterruptionStatus); if (impl::threadLocalInterruptionStatus) impl::threadLocalInterruptionStatus->interruptibleSleep(relTime); else std::this_thread::sleep_for(relTime); } template <class Function> inline InterruptibleThread::InterruptibleThread(Function&& f) { stdThread_ = std::thread([f = std::forward<Function>(f), intStatus = this->intStatus_]() mutable { assert(!impl::threadLocalInterruptionStatus); impl::threadLocalInterruptionStatus = intStatus.get(); ZEN_ON_SCOPE_EXIT(impl::threadLocalInterruptionStatus = nullptr); try { f(); //throw ThreadStopRequest } catch (ThreadStopRequest&) {} }); } inline void InterruptibleThread::requestStop() { intStatus_->requestStop(); } } #endif //THREAD_H_7896323423432235246427