diff options
Diffstat (limited to 'zen/thread.h')
-rw-r--r-- | zen/thread.h | 285 |
1 files changed, 228 insertions, 57 deletions
diff --git a/zen/thread.h b/zen/thread.h index 6d647de8..a3b8760b 100644 --- a/zen/thread.h +++ b/zen/thread.h @@ -4,39 +4,62 @@ // * Copyright (C) Zenju (zenju AT gmx DOT de) - All Rights Reserved * // ************************************************************************** -#ifndef BOOST_THREAD_WRAP_H_78963234 -#define BOOST_THREAD_WRAP_H_78963234 - -//temporary solution until C++11 thread becomes fully available (considering std::thread's non-interruptibility and std::async craziness, this may be NEVER) -#include <memory> - -//workaround this pathetic boost thread warning mess -#ifdef __GNUC__ - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wswitch-enum" - #pragma GCC diagnostic ignored "-Wstrict-aliasing" - #pragma GCC diagnostic ignored "-Wredundant-decls" - #pragma GCC diagnostic ignored "-Wshadow" - #ifndef __clang__ //clang defines __GNUC__, but doesn't support this warning - #pragma GCC diagnostic ignored "-Wunused-local-typedefs" - #endif -#endif -#ifdef _MSC_VER - #pragma warning(push) - #pragma warning(disable: 4702 4913) //unreachable code; user defined binary operator ',' exists but no overload could convert all operands, default built-in binary operator ',' used -#endif +#ifndef STD_THREAD_WRAP_H_7896323423432 +#define STD_THREAD_WRAP_H_7896323423432 -#include <boost/thread.hpp> - -#ifdef __GNUC__ - #pragma GCC diagnostic pop -#endif -#ifdef _MSC_VER - #pragma warning(pop) -#endif +#include <thread> +#include <future> +#include <zen/scope_guard.h> +#include <zen/type_traits.h> namespace zen { +class InterruptionStatus; + + +class InterruptibleThread +{ +public: + InterruptibleThread() {} + InterruptibleThread (InterruptibleThread&& tmp) = default; + InterruptibleThread& operator=(InterruptibleThread&& tmp) = default; + + template <class Function> + InterruptibleThread(Function f); + + bool joinable () const { return stdThread.joinable(); } + void interrupt(); + void join () { stdThread.join(); } + void detach () { stdThread.detach(); } + + template <class Rep, class Period> + bool tryJoinFor(const std::chrono::duration<Rep, Period>& relTime) + { + if (threadCompleted.wait_for(relTime) == std::future_status::ready) + { + stdThread.join(); //runs thread-local destructors => this better be fast!!! + return true; + } + return false; + } + +private: + std::thread stdThread; + std::shared_ptr<InterruptionStatus> intStatus_; + std::future<void> threadCompleted; +}; + +//context of worker thread: +void interruptionPoint(); //throw ThreadInterruption + +template<class Predicate> +void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred); //throw ThreadInterruption + +template <class Rep, class Period> +void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //throw ThreadInterruption + +//------------------------------------------------------------------------------------------ + /* std::async replacement without crappy semantics: 1. guaranteed to run asynchronously @@ -45,16 +68,20 @@ std::async replacement without crappy semantics: Example: Zstring dirpath = ... auto ft = zen::runAsync([=](){ return zen::dirExists(dirpath); }); - if (ft.wait_for(boost::chrono::milliseconds(200)) == boost::future_status::ready && ft.get()) + if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get()) //dir exising */ template <class Function> -auto runAsync(Function fun) -> boost::unique_future<decltype(fun())>; +auto runAsync(Function fun) -> std::future<decltype(fun())>; //wait for all with a time limit: return true if *all* results are available! template<class InputIterator, class Duration> bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration); +template<typename T> inline +bool isReady(const std::future<T>& f) { return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } +//------------------------------------------------------------------------------------------ + //wait until first job is successful or all failed: substitute until std::when_any is available template <class T> class GetFirstResult @@ -77,6 +104,7 @@ private: size_t jobsTotal_; }; +//------------------------------------------------------------------------------------------ //value associated with mutex and guaranteed protected access: template <class T> @@ -89,7 +117,7 @@ public: template <class Function> void access(Function fun) { - boost::lock_guard<boost::mutex> dummy(lockValue); + std::lock_guard<std::mutex> dummy(lockValue); fun(value_); } @@ -97,7 +125,7 @@ private: Protected (const Protected&) = delete; Protected& operator=(const Protected&) = delete; - boost::mutex lockValue; + std::mutex lockValue; T value_; }; @@ -110,22 +138,15 @@ private: //###################### implementation ###################### -#ifndef BOOST_HAS_THREADS - #error just some paranoia check... -#endif template <class Function> inline -auto runAsync(Function fun) -> boost::unique_future<decltype(fun())> +auto runAsync(Function fun) -> std::future<decltype(fun())> { typedef decltype(fun()) ResultType; -#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK //mirror "boost/thread/future.hpp", hopefully they know what they're doing - boost::packaged_task<ResultType()> pt(std::move(fun)); -#else - boost::packaged_task<ResultType> pt(std::move(fun)); -#endif + std::packaged_task<ResultType()> pt(std::move(fun)); auto fut = pt.get_future(); - boost::thread(std::move(pt)).detach(); //we have to explicitly detach since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! + std::thread(std::move(pt)).detach(); //we have to explicitly detach since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! return fut; } @@ -133,9 +154,9 @@ auto runAsync(Function fun) -> boost::unique_future<decltype(fun())> template<class InputIterator, class Duration> inline bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& duration) { - const boost::chrono::steady_clock::time_point endTime = boost::chrono::steady_clock::now() + duration; + const std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now() + duration; for (; first != last; ++first) - if (first->wait_until(endTime) != boost::future_status::ready) + if (first->wait_until(endTime) != std::future_status::ready) return false; //time elapsed return true; } @@ -155,27 +176,26 @@ public: void reportFinished(std::unique_ptr<T>&& result) { { - boost::lock_guard<boost::mutex> dummy(lockResult); + std::lock_guard<std::mutex> dummy(lockResult); ++jobsFinished; if (!result_) result_ = std::move(result); } - conditionJobDone.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796 - //condition handling, see: http://www.boost.org/doc/libs/1_43_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref + conditionJobDone.notify_all(); //better notify all, considering bugs like: https://svn.boost.org/trac/boost/ticket/7796 } //context: main thread template <class Duration> bool waitForResult(size_t jobsTotal, const Duration& duration) { - boost::unique_lock<boost::mutex> dummy(lockResult); - return conditionJobDone.wait_for(dummy, duration, [&] { return this->jobDone(jobsTotal); }); //throw boost::thread_interrupted + std::unique_lock<std::mutex> dummy(lockResult); + return conditionJobDone.wait_for(dummy, duration, [&] { return this->jobDone(jobsTotal); }); } std::unique_ptr<T> getResult(size_t jobsTotal) { - boost::unique_lock<boost::mutex> dummy(lockResult); - conditionJobDone.wait(dummy, [&] { return this->jobDone(jobsTotal); }); //throw boost::thread_interrupted + std::unique_lock<std::mutex> dummy(lockResult); + conditionJobDone.wait(dummy, [&] { return this->jobDone(jobsTotal); }); #ifndef NDEBUG assert(!returnedResult); @@ -191,10 +211,10 @@ private: bool returnedResult; #endif - boost::mutex lockResult; + std::mutex lockResult; size_t jobsFinished; // std::unique_ptr<T> result_; //our condition is: "have result" or "jobsFinished == jobsTotal" - boost::condition_variable conditionJobDone; + std::condition_variable conditionJobDone; }; @@ -207,8 +227,7 @@ template <class T> template <class Fun> inline void GetFirstResult<T>::addJob(Fun f) //f must return a std::unique_ptr<T> containing a value on success { - auto asyncResult = this->asyncResult_; //capture member variable, not "this"! - boost::thread t([asyncResult, f] { asyncResult->reportFinished(f()); }); + std::thread t([asyncResult = this->asyncResult_, f = std::move(f)] { asyncResult->reportFinished(f()); }); ++jobsTotal_; t.detach(); //we have to be explicit since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! } @@ -221,6 +240,158 @@ bool GetFirstResult<T>::timedWait(const Duration& duration) const { return async template <class T> inline std::unique_ptr<T> GetFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal_); } + +//------------------------------------------------------------------------------------------ + +//thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: +#ifdef _MSC_VER + #define ZEN_THREAD_LOCAL_SPECIFIER __declspec(thread) +#elif defined __GNUC__ || defined __clang__ + #define ZEN_THREAD_LOCAL_SPECIFIER __thread +#else + #error "game over" +#endif + + +class ThreadInterruption {}; + + +class InterruptionStatus +{ +public: + //context of InterruptibleThread instance: + void interrupt() + { + interrupted = true; + + { + std::lock_guard<std::mutex> dummy(lockSleep); //needed! makes sure the following signal is not lost! + //usually we'd make "interrupted" non-atomic, but this is already given due to interruptibleWait() handling + } + conditionSleepInterruption.notify_all(); + + std::lock_guard<std::mutex> dummy(lockConditionPtr); + if (activeCondition) + activeCondition->notify_all(); //signal may get lost! + //alternative design locking the cv's mutex here could be dangerous: potential for dead lock! + } + + //context of worker thread: + void checkInterruption() //throw ThreadInterruption + { + if (interrupted) + throw ThreadInterruption(); + } + + //context of worker thread: + template<class Predicate> + void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption + { + setConditionVar(&cv); + ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr)); + + //"interrupted" is not protected by cv's mutex => signal may get lost!!! => add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! + while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted || pred(); })) + ; + + checkInterruption(); //throw ThreadInterruption + } + + //context of worker thread: + template <class Rep, class Period> + void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption + { + std::unique_lock<std::mutex> lock(lockSleep); + if (conditionSleepInterruption.wait_for(lock, relTime, [&] { return static_cast<bool>(this->interrupted); })) + throw ThreadInterruption(); + } + +private: + void setConditionVar(std::condition_variable* cv) + { + std::lock_guard<std::mutex> dummy(lockConditionPtr); + activeCondition = cv; + } + + std::atomic<bool> interrupted{ false }; //std:atomic is uninitialized by default! + + std::condition_variable* activeCondition = nullptr; + std::mutex lockConditionPtr; //serialize pointer access (only!) + + std::condition_variable conditionSleepInterruption; + std::mutex lockSleep; +}; + + +namespace impl +{ +inline +InterruptionStatus*& refThreadLocalInterruptionStatus() +{ + static ZEN_THREAD_LOCAL_SPECIFIER InterruptionStatus* threadLocalInterruptionStatus = nullptr; + return threadLocalInterruptionStatus; +} +} + +//context of worker thread: +inline +void interruptionPoint() //throw ThreadInterruption +{ + assert(impl::refThreadLocalInterruptionStatus()); + if (impl::refThreadLocalInterruptionStatus()) + impl::refThreadLocalInterruptionStatus()->checkInterruption(); //throw ThreadInterruption +} + + +//context of worker thread: +template<class Predicate> inline +void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption +{ + assert(impl::refThreadLocalInterruptionStatus()); + if (impl::refThreadLocalInterruptionStatus()) + impl::refThreadLocalInterruptionStatus()->interruptibleWait(cv, lock, pred); + else + cv.wait(lock, pred); +} + +//context of worker thread: +template <class Rep, class Period> inline +void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption +{ + assert(impl::refThreadLocalInterruptionStatus()); + if (impl::refThreadLocalInterruptionStatus()) + impl::refThreadLocalInterruptionStatus()->interruptibleSleep(relTime); + else + std::this_thread::sleep_for(relTime); +} + + +template <class Function> inline +InterruptibleThread::InterruptibleThread(Function f) : intStatus_(std::make_shared<InterruptionStatus>()) +{ + std::promise<void> pFinished; + threadCompleted = pFinished.get_future(); + + stdThread = std::thread([f = std::move(f), + intStatus = this->intStatus_, + pFinished = std::move(pFinished)]() mutable + { + assert(!impl::refThreadLocalInterruptionStatus()); + impl::refThreadLocalInterruptionStatus() = intStatus.get(); + ZEN_ON_SCOPE_EXIT(impl::refThreadLocalInterruptionStatus() = nullptr); + ZEN_ON_SCOPE_EXIT(pFinished.set_value()); + + try + { + f(); //throw ThreadInterruption + } + catch (ThreadInterruption&) {} + }); +} + + +inline +void InterruptibleThread::interrupt() { intStatus_->interrupt(); } } -#endif //BOOST_THREAD_WRAP_H_78963234 +#endif //STD_THREAD_WRAP_H_7896323423432 |