summaryrefslogtreecommitdiff
path: root/zen/thread.h
diff options
context:
space:
mode:
Diffstat (limited to 'zen/thread.h')
-rw-r--r--zen/thread.h285
1 files changed, 228 insertions, 57 deletions
diff --git a/zen/thread.h b/zen/thread.h
index 6d647de8..a3b8760b 100644
--- a/zen/thread.h
+++ b/zen/thread.h
@@ -4,39 +4,62 @@
// * Copyright (C) Zenju (zenju AT gmx DOT de) - All Rights Reserved *
// **************************************************************************
-#ifndef BOOST_THREAD_WRAP_H_78963234
-#define BOOST_THREAD_WRAP_H_78963234
-
-//temporary solution until C++11 thread becomes fully available (considering std::thread's non-interruptibility and std::async craziness, this may be NEVER)
-#include <memory>
-
-//workaround this pathetic boost thread warning mess
-#ifdef __GNUC__
- #pragma GCC diagnostic push
- #pragma GCC diagnostic ignored "-Wswitch-enum"
- #pragma GCC diagnostic ignored "-Wstrict-aliasing"
- #pragma GCC diagnostic ignored "-Wredundant-decls"
- #pragma GCC diagnostic ignored "-Wshadow"
- #ifndef __clang__ //clang defines __GNUC__, but doesn't support this warning
- #pragma GCC diagnostic ignored "-Wunused-local-typedefs"
- #endif
-#endif
-#ifdef _MSC_VER
- #pragma warning(push)
- #pragma warning(disable: 4702 4913) //unreachable code; user defined binary operator ',' exists but no overload could convert all operands, default built-in binary operator ',' used
-#endif
+#ifndef STD_THREAD_WRAP_H_7896323423432
+#define STD_THREAD_WRAP_H_7896323423432
-#include <boost/thread.hpp>
-
-#ifdef __GNUC__
- #pragma GCC diagnostic pop
-#endif
-#ifdef _MSC_VER
- #pragma warning(pop)
-#endif
+#include <thread>
+#include <future>
+#include <zen/scope_guard.h>
+#include <zen/type_traits.h>
namespace zen
{
+class InterruptionStatus;
+
+
+class InterruptibleThread
+{
+public:
+ InterruptibleThread() {}
+ InterruptibleThread (InterruptibleThread&& tmp) = default;
+ InterruptibleThread& operator=(InterruptibleThread&& tmp) = default;
+
+ template <class Function>
+ InterruptibleThread(Function f);
+
+ bool joinable () const { return stdThread.joinable(); }
+ void interrupt();
+ void join () { stdThread.join(); }
+ void detach () { stdThread.detach(); }
+
+ template <class Rep, class Period>
+ bool tryJoinFor(const std::chrono::duration<Rep, Period>& relTime)
+ {
+ if (threadCompleted.wait_for(relTime) == std::future_status::ready)
+ {
+ stdThread.join(); //runs thread-local destructors => this better be fast!!!
+ return true;
+ }
+ return false;
+ }
+
+private:
+ std::thread stdThread;
+ std::shared_ptr<InterruptionStatus> intStatus_;
+ std::future<void> threadCompleted;
+};
+
+//context of worker thread:
+void interruptionPoint(); //throw ThreadInterruption
+
+template<class Predicate>
+void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred); //throw ThreadInterruption
+
+template <class Rep, class Period>
+void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //throw ThreadInterruption
+
+//------------------------------------------------------------------------------------------
+
/*
std::async replacement without crappy semantics:
1. guaranteed to run asynchronously
@@ -45,16 +68,20 @@ std::async replacement without crappy semantics:
Example:
Zstring dirpath = ...
auto ft = zen::runAsync([=](){ return zen::dirExists(dirpath); });
- if (ft.wait_for(boost::chrono::milliseconds(200)) == boost::future_status::ready && ft.get())
+ if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get())
//dir exising
*/
template <class Function>
-auto runAsync(Function fun) -> boost::unique_future<decltype(fun())>;
+auto runAsync(Function fun) -> std::future<decltype(fun())>;
//wait for all with a time limit: return true if *all* results are available!
template<class InputIterator, class Duration>
bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration);
+template<typename T> inline
+bool isReady(const std::future<T>& f) { return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; }
+//------------------------------------------------------------------------------------------
+
//wait until first job is successful or all failed: substitute until std::when_any is available
template <class T>
class GetFirstResult
@@ -77,6 +104,7 @@ private:
size_t jobsTotal_;
};
+//------------------------------------------------------------------------------------------
//value associated with mutex and guaranteed protected access:
template <class T>
@@ -89,7 +117,7 @@ public:
template <class Function>
void access(Function fun)
{
- boost::lock_guard<boost::mutex> dummy(lockValue);
+ std::lock_guard<std::mutex> dummy(lockValue);
fun(value_);
}
@@ -97,7 +125,7 @@ private:
Protected (const Protected&) = delete;
Protected& operator=(const Protected&) = delete;
- boost::mutex lockValue;
+ std::mutex lockValue;
T value_;
};
@@ -110,22 +138,15 @@ private:
//###################### implementation ######################
-#ifndef BOOST_HAS_THREADS
- #error just some paranoia check...
-#endif
template <class Function> inline
-auto runAsync(Function fun) -> boost::unique_future<decltype(fun())>
+auto runAsync(Function fun) -> std::future<decltype(fun())>
{
typedef decltype(fun()) ResultType;
-#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK //mirror "boost/thread/future.hpp", hopefully they know what they're doing
- boost::packaged_task<ResultType()> pt(std::move(fun));
-#else
- boost::packaged_task<ResultType> pt(std::move(fun));
-#endif
+ std::packaged_task<ResultType()> pt(std::move(fun));
auto fut = pt.get_future();
- boost::thread(std::move(pt)).detach(); //we have to explicitly detach since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!!
+ std::thread(std::move(pt)).detach(); //we have to explicitly detach since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!!
return fut;
}
@@ -133,9 +154,9 @@ auto runAsync(Function fun) -> boost::unique_future<decltype(fun())>
template<class InputIterator, class Duration> inline
bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& duration)
{
- const boost::chrono::steady_clock::time_point endTime = boost::chrono::steady_clock::now() + duration;
+ const std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now() + duration;
for (; first != last; ++first)
- if (first->wait_until(endTime) != boost::future_status::ready)
+ if (first->wait_until(endTime) != std::future_status::ready)
return false; //time elapsed
return true;
}
@@ -155,27 +176,26 @@ public:
void reportFinished(std::unique_ptr<T>&& result)
{
{
- boost::lock_guard<boost::mutex> dummy(lockResult);
+ std::lock_guard<std::mutex> dummy(lockResult);
++jobsFinished;
if (!result_)
result_ = std::move(result);
}
- conditionJobDone.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796
- //condition handling, see: http://www.boost.org/doc/libs/1_43_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref
+ conditionJobDone.notify_all(); //better notify all, considering bugs like: https://svn.boost.org/trac/boost/ticket/7796
}
//context: main thread
template <class Duration>
bool waitForResult(size_t jobsTotal, const Duration& duration)
{
- boost::unique_lock<boost::mutex> dummy(lockResult);
- return conditionJobDone.wait_for(dummy, duration, [&] { return this->jobDone(jobsTotal); }); //throw boost::thread_interrupted
+ std::unique_lock<std::mutex> dummy(lockResult);
+ return conditionJobDone.wait_for(dummy, duration, [&] { return this->jobDone(jobsTotal); });
}
std::unique_ptr<T> getResult(size_t jobsTotal)
{
- boost::unique_lock<boost::mutex> dummy(lockResult);
- conditionJobDone.wait(dummy, [&] { return this->jobDone(jobsTotal); }); //throw boost::thread_interrupted
+ std::unique_lock<std::mutex> dummy(lockResult);
+ conditionJobDone.wait(dummy, [&] { return this->jobDone(jobsTotal); });
#ifndef NDEBUG
assert(!returnedResult);
@@ -191,10 +211,10 @@ private:
bool returnedResult;
#endif
- boost::mutex lockResult;
+ std::mutex lockResult;
size_t jobsFinished; //
std::unique_ptr<T> result_; //our condition is: "have result" or "jobsFinished == jobsTotal"
- boost::condition_variable conditionJobDone;
+ std::condition_variable conditionJobDone;
};
@@ -207,8 +227,7 @@ template <class T>
template <class Fun> inline
void GetFirstResult<T>::addJob(Fun f) //f must return a std::unique_ptr<T> containing a value on success
{
- auto asyncResult = this->asyncResult_; //capture member variable, not "this"!
- boost::thread t([asyncResult, f] { asyncResult->reportFinished(f()); });
+ std::thread t([asyncResult = this->asyncResult_, f = std::move(f)] { asyncResult->reportFinished(f()); });
++jobsTotal_;
t.detach(); //we have to be explicit since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!!
}
@@ -221,6 +240,158 @@ bool GetFirstResult<T>::timedWait(const Duration& duration) const { return async
template <class T> inline
std::unique_ptr<T> GetFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal_); }
+
+//------------------------------------------------------------------------------------------
+
+//thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine:
+#ifdef _MSC_VER
+ #define ZEN_THREAD_LOCAL_SPECIFIER __declspec(thread)
+#elif defined __GNUC__ || defined __clang__
+ #define ZEN_THREAD_LOCAL_SPECIFIER __thread
+#else
+ #error "game over"
+#endif
+
+
+class ThreadInterruption {};
+
+
+class InterruptionStatus
+{
+public:
+ //context of InterruptibleThread instance:
+ void interrupt()
+ {
+ interrupted = true;
+
+ {
+ std::lock_guard<std::mutex> dummy(lockSleep); //needed! makes sure the following signal is not lost!
+ //usually we'd make "interrupted" non-atomic, but this is already given due to interruptibleWait() handling
+ }
+ conditionSleepInterruption.notify_all();
+
+ std::lock_guard<std::mutex> dummy(lockConditionPtr);
+ if (activeCondition)
+ activeCondition->notify_all(); //signal may get lost!
+ //alternative design locking the cv's mutex here could be dangerous: potential for dead lock!
+ }
+
+ //context of worker thread:
+ void checkInterruption() //throw ThreadInterruption
+ {
+ if (interrupted)
+ throw ThreadInterruption();
+ }
+
+ //context of worker thread:
+ template<class Predicate>
+ void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption
+ {
+ setConditionVar(&cv);
+ ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr));
+
+ //"interrupted" is not protected by cv's mutex => signal may get lost!!! => add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out!
+ while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted || pred(); }))
+ ;
+
+ checkInterruption(); //throw ThreadInterruption
+ }
+
+ //context of worker thread:
+ template <class Rep, class Period>
+ void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption
+ {
+ std::unique_lock<std::mutex> lock(lockSleep);
+ if (conditionSleepInterruption.wait_for(lock, relTime, [&] { return static_cast<bool>(this->interrupted); }))
+ throw ThreadInterruption();
+ }
+
+private:
+ void setConditionVar(std::condition_variable* cv)
+ {
+ std::lock_guard<std::mutex> dummy(lockConditionPtr);
+ activeCondition = cv;
+ }
+
+ std::atomic<bool> interrupted{ false }; //std:atomic is uninitialized by default!
+
+ std::condition_variable* activeCondition = nullptr;
+ std::mutex lockConditionPtr; //serialize pointer access (only!)
+
+ std::condition_variable conditionSleepInterruption;
+ std::mutex lockSleep;
+};
+
+
+namespace impl
+{
+inline
+InterruptionStatus*& refThreadLocalInterruptionStatus()
+{
+ static ZEN_THREAD_LOCAL_SPECIFIER InterruptionStatus* threadLocalInterruptionStatus = nullptr;
+ return threadLocalInterruptionStatus;
+}
+}
+
+//context of worker thread:
+inline
+void interruptionPoint() //throw ThreadInterruption
+{
+ assert(impl::refThreadLocalInterruptionStatus());
+ if (impl::refThreadLocalInterruptionStatus())
+ impl::refThreadLocalInterruptionStatus()->checkInterruption(); //throw ThreadInterruption
+}
+
+
+//context of worker thread:
+template<class Predicate> inline
+void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption
+{
+ assert(impl::refThreadLocalInterruptionStatus());
+ if (impl::refThreadLocalInterruptionStatus())
+ impl::refThreadLocalInterruptionStatus()->interruptibleWait(cv, lock, pred);
+ else
+ cv.wait(lock, pred);
+}
+
+//context of worker thread:
+template <class Rep, class Period> inline
+void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption
+{
+ assert(impl::refThreadLocalInterruptionStatus());
+ if (impl::refThreadLocalInterruptionStatus())
+ impl::refThreadLocalInterruptionStatus()->interruptibleSleep(relTime);
+ else
+ std::this_thread::sleep_for(relTime);
+}
+
+
+template <class Function> inline
+InterruptibleThread::InterruptibleThread(Function f) : intStatus_(std::make_shared<InterruptionStatus>())
+{
+ std::promise<void> pFinished;
+ threadCompleted = pFinished.get_future();
+
+ stdThread = std::thread([f = std::move(f),
+ intStatus = this->intStatus_,
+ pFinished = std::move(pFinished)]() mutable
+ {
+ assert(!impl::refThreadLocalInterruptionStatus());
+ impl::refThreadLocalInterruptionStatus() = intStatus.get();
+ ZEN_ON_SCOPE_EXIT(impl::refThreadLocalInterruptionStatus() = nullptr);
+ ZEN_ON_SCOPE_EXIT(pFinished.set_value());
+
+ try
+ {
+ f(); //throw ThreadInterruption
+ }
+ catch (ThreadInterruption&) {}
+ });
+}
+
+
+inline
+void InterruptibleThread::interrupt() { intStatus_->interrupt(); }
}
-#endif //BOOST_THREAD_WRAP_H_78963234
+#endif //STD_THREAD_WRAP_H_7896323423432
bgstack15