// ***************************************************************************** // * This file is part of the FreeFileSync project. It is distributed under * // * GNU General Public License: http://www.gnu.org/licenses/gpl-3.0 * // * Copyright (C) Zenju (zenju AT freefilesync DOT org) - All Rights Reserved * // ***************************************************************************** #ifndef THREAD_H_7896323423432235246427 #define THREAD_H_7896323423432235246427 #include #include #include "scope_guard.h" #include "type_traits.h" #include "optional.h" namespace zen { class InterruptionStatus; class InterruptibleThread { public: InterruptibleThread() {} InterruptibleThread (InterruptibleThread&& tmp) = default; InterruptibleThread& operator=(InterruptibleThread&& tmp) = default; template InterruptibleThread(Function&& f); bool joinable () const { return stdThread_.joinable(); } void interrupt(); void join () { stdThread_.join(); } void detach () { stdThread_.detach(); } template bool tryJoinFor(const std::chrono::duration& relTime) { if (threadCompleted_.wait_for(relTime) == std::future_status::ready) { stdThread_.join(); //runs thread-local destructors => this better be fast!!! return true; } return false; } private: std::thread stdThread_; std::shared_ptr intStatus_; std::future threadCompleted_; }; //context of worker thread: void interruptionPoint(); //throw ThreadInterruption template void interruptibleWait(std::condition_variable& cv, std::unique_lock& lock, Predicate pred); //throw ThreadInterruption template void interruptibleSleep(const std::chrono::duration& relTime); //throw ThreadInterruption uint64_t getThreadId(); //simple integer thread id, unlike boost::thread::id: https://svn.boost.org/trac/boost/ticket/5754 //------------------------------------------------------------------------------------------ /* std::async replacement without crappy semantics: 1. guaranteed to run asynchronously 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor Example: Zstring dirpath = ... auto ft = zen::runAsync([=](){ return zen::dirExists(dirpath); }); if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get()) //dir exising */ template auto runAsync(Function&& fun); //wait for all with a time limit: return true if *all* results are available! template bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration); template inline bool isReady(const std::future& f) { return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } //------------------------------------------------------------------------------------------ //wait until first job is successful or all failed: substitute until std::when_any is available template class GetFirstResult { public: GetFirstResult(); template void addJob(Fun&& f); //f must return a zen::Opt containing a value if successful template bool timedWait(const Duration& duration) const; //true: "get()" is ready, false: time elapsed //return first value or none if all jobs failed; blocks until result is ready! Opt get() const; //may be called only once! private: class AsyncResult; std::shared_ptr asyncResult_; size_t jobsTotal_ = 0; }; //------------------------------------------------------------------------------------------ //value associated with mutex and guaranteed protected access: template class Protected { public: Protected() {} Protected(const T& value) : value_(value) {} template void access(Function fun) { std::lock_guard dummy(lockValue); fun(value_); } private: Protected (const Protected&) = delete; Protected& operator=(const Protected&) = delete; std::mutex lockValue; T value_{}; }; //###################### implementation ###################### namespace impl { template inline auto runAsync(Function&& fun, TrueType /*copy-constructible*/) { using ResultType = decltype(fun()); //note: std::packaged_task does NOT support move-only function objects! std::packaged_task pt(std::forward(fun)); auto fut = pt.get_future(); std::thread(std::move(pt)).detach(); //we have to explicitly detach since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! return fut; } template inline auto runAsync(Function&& fun, FalseType /*copy-constructible*/) { //support move-only function objects! auto sharedFun = std::make_shared(std::forward(fun)); return runAsync([sharedFun] { return (*sharedFun)(); }, TrueType()); } } template inline auto runAsync(Function&& fun) { return impl::runAsync(std::forward(fun), StaticBool::value>()); } template inline bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& duration) { const std::chrono::steady_clock::time_point stopTime = std::chrono::steady_clock::now() + duration; for (; first != last; ++first) if (first->wait_until(stopTime) != std::future_status::ready) return false; //time elapsed return true; } template class GetFirstResult::AsyncResult { public: //context: worker threads void reportFinished(Opt&& result) { { std::lock_guard dummy(lockResult_); ++jobsFinished_; if (!result_) result_ = std::move(result); } conditionJobDone_.notify_all(); //better notify all, considering bugs like: https://svn.boost.org/trac/boost/ticket/7796 } //context: main thread template bool waitForResult(size_t jobsTotal, const Duration& duration) { std::unique_lock dummy(lockResult_); return conditionJobDone_.wait_for(dummy, duration, [&] { return this->jobDone(jobsTotal); }); } Opt getResult(size_t jobsTotal) { std::unique_lock dummy(lockResult_); conditionJobDone_.wait(dummy, [&] { return this->jobDone(jobsTotal); }); return std::move(result_); } private: bool jobDone(size_t jobsTotal) const { return result_ || (jobsFinished_ >= jobsTotal); } //call while locked! std::mutex lockResult_; size_t jobsFinished_ = 0; // Opt result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal" std::condition_variable conditionJobDone_; }; template inline GetFirstResult::GetFirstResult() : asyncResult_(std::make_shared()) {} template template inline void GetFirstResult::addJob(Fun&& f) //f must return a zen::Opt containing a value on success { std::thread t([asyncResult = this->asyncResult_, f = std::forward(f)] { asyncResult->reportFinished(f()); }); ++jobsTotal_; t.detach(); //we have to be explicit since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! } template template inline bool GetFirstResult::timedWait(const Duration& duration) const { return asyncResult_->waitForResult(jobsTotal_, duration); } template inline Opt GetFirstResult::get() const { return asyncResult_->getResult(jobsTotal_); } //------------------------------------------------------------------------------------------ //thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: #define ZEN_THREAD_LOCAL_SPECIFIER __thread class ThreadInterruption {}; class InterruptionStatus { public: //context of InterruptibleThread instance: void interrupt() { interrupted = true; { std::lock_guard dummy(lockSleep); //needed! makes sure the following signal is not lost! //usually we'd make "interrupted" non-atomic, but this is already given due to interruptibleWait() handling } conditionSleepInterruption.notify_all(); std::lock_guard dummy(lockConditionPtr); if (activeCondition) activeCondition->notify_all(); //signal may get lost! //alternative design locking the cv's mutex here could be dangerous: potential for dead lock! } //context of worker thread: void checkInterruption() //throw ThreadInterruption { if (interrupted) throw ThreadInterruption(); } //context of worker thread: template void interruptibleWait(std::condition_variable& cv, std::unique_lock& lock, Predicate pred) //throw ThreadInterruption { setConditionVar(&cv); ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr)); //"interrupted" is not protected by cv's mutex => signal may get lost!!! => add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted || pred(); })) ; checkInterruption(); //throw ThreadInterruption } //context of worker thread: template void interruptibleSleep(const std::chrono::duration& relTime) //throw ThreadInterruption { std::unique_lock lock(lockSleep); if (conditionSleepInterruption.wait_for(lock, relTime, [this] { return static_cast(this->interrupted); })) throw ThreadInterruption(); } private: void setConditionVar(std::condition_variable* cv) { std::lock_guard dummy(lockConditionPtr); activeCondition = cv; } std::atomic interrupted{ false }; //std:atomic is uninitialized by default!!! //"The default constructor is trivial: no initialization takes place other than zero initialization of static and thread-local objects." std::condition_variable* activeCondition = nullptr; std::mutex lockConditionPtr; //serialize pointer access (only!) std::condition_variable conditionSleepInterruption; std::mutex lockSleep; }; namespace impl { inline InterruptionStatus*& refThreadLocalInterruptionStatus() { static ZEN_THREAD_LOCAL_SPECIFIER InterruptionStatus* threadLocalInterruptionStatus = nullptr; return threadLocalInterruptionStatus; } } //context of worker thread: inline void interruptionPoint() //throw ThreadInterruption { assert(impl::refThreadLocalInterruptionStatus()); if (impl::refThreadLocalInterruptionStatus()) impl::refThreadLocalInterruptionStatus()->checkInterruption(); //throw ThreadInterruption } //context of worker thread: template inline void interruptibleWait(std::condition_variable& cv, std::unique_lock& lock, Predicate pred) //throw ThreadInterruption { assert(impl::refThreadLocalInterruptionStatus()); if (impl::refThreadLocalInterruptionStatus()) impl::refThreadLocalInterruptionStatus()->interruptibleWait(cv, lock, pred); else cv.wait(lock, pred); } //context of worker thread: template inline void interruptibleSleep(const std::chrono::duration& relTime) //throw ThreadInterruption { assert(impl::refThreadLocalInterruptionStatus()); if (impl::refThreadLocalInterruptionStatus()) impl::refThreadLocalInterruptionStatus()->interruptibleSleep(relTime); else std::this_thread::sleep_for(relTime); } template inline InterruptibleThread::InterruptibleThread(Function&& f) : intStatus_(std::make_shared()) { std::promise pFinished; threadCompleted_ = pFinished.get_future(); stdThread_ = std::thread([f = std::forward(f), intStatus = this->intStatus_, pFinished = std::move(pFinished)]() mutable { assert(!impl::refThreadLocalInterruptionStatus()); impl::refThreadLocalInterruptionStatus() = intStatus.get(); ZEN_ON_SCOPE_EXIT(impl::refThreadLocalInterruptionStatus() = nullptr); ZEN_ON_SCOPE_EXIT(pFinished.set_value()); try { f(); //throw ThreadInterruption } catch (ThreadInterruption&) {} }); } inline void InterruptibleThread::interrupt() { intStatus_->interrupt(); } inline uint64_t getThreadId() { //obviously "gettid()" is not available on Ubuntu/Debian/Suse => use the OpenSSL approach: static_assert(sizeof(uint64_t) >= sizeof(void*), ""); return reinterpret_cast(static_cast(&errno)); } } #endif //THREAD_H_7896323423432235246427