diff options
Diffstat (limited to 'zen/thread.h')
-rw-r--r-- | zen/thread.h | 63 |
1 files changed, 43 insertions, 20 deletions
diff --git a/zen/thread.h b/zen/thread.h index c9b4c76f..6d647de8 100644 --- a/zen/thread.h +++ b/zen/thread.h @@ -39,23 +39,23 @@ namespace zen { /* std::async replacement without crappy semantics: - 1. guaranteed to run asynchronous + 1. guaranteed to run asynchronously 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor Example: Zstring dirpath = ... - auto ft = zen::async([=](){ return zen::dirExists(dirpath); }); - if (ft.timed_wait(boost::posix_time::milliseconds(200)) && ft.get()) + auto ft = zen::runAsync([=](){ return zen::dirExists(dirpath); }); + if (ft.wait_for(boost::chrono::milliseconds(200)) == boost::future_status::ready && ft.get()) //dir exising */ template <class Function> -auto async(Function fun) -> boost::unique_future<decltype(fun())>; +auto runAsync(Function fun) -> boost::unique_future<decltype(fun())>; //wait for all with a time limit: return true if *all* results are available! template<class InputIterator, class Duration> bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration); -//wait until first job is successful or all failed +//wait until first job is successful or all failed: substitute until std::when_any is available template <class T> class GetFirstResult { @@ -73,11 +73,36 @@ public: private: class AsyncResult; - std::shared_ptr<AsyncResult> asyncResult; + std::shared_ptr<AsyncResult> asyncResult_; size_t jobsTotal_; }; +//value associated with mutex and guaranteed protected access: +template <class T> +class Protected +{ +public: + Protected() : value_() {} + Protected(const T& value) : value_(value) {} + + template <class Function> + void access(Function fun) + { + boost::lock_guard<boost::mutex> dummy(lockValue); + fun(value_); + } + +private: + Protected (const Protected&) = delete; + Protected& operator=(const Protected&) = delete; + + boost::mutex lockValue; + T value_; +}; + + + @@ -90,12 +115,12 @@ private: #endif template <class Function> inline -auto async(Function fun) -> boost::unique_future<decltype(fun())> +auto runAsync(Function fun) -> boost::unique_future<decltype(fun())> { typedef decltype(fun()) ResultType; #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK //mirror "boost/thread/future.hpp", hopefully they know what they're doing - boost::packaged_task<ResultType()> pt(std::move(fun)); //packaged task seems to even require r-value reference: https://sourceforge.net/p/freefilesync/bugs/234/ + boost::packaged_task<ResultType()> pt(std::move(fun)); #else boost::packaged_task<ResultType> pt(std::move(fun)); #endif @@ -106,11 +131,11 @@ auto async(Function fun) -> boost::unique_future<decltype(fun())> template<class InputIterator, class Duration> inline -bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration) +bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& duration) { - const boost::system_time endTime = boost::get_system_time() + wait_duration; + const boost::chrono::steady_clock::time_point endTime = boost::chrono::steady_clock::now() + duration; for (; first != last; ++first) - if (!first->timed_wait_until(endTime)) + if (first->wait_until(endTime) != boost::future_status::ready) return false; //time elapsed return true; } @@ -144,15 +169,13 @@ public: bool waitForResult(size_t jobsTotal, const Duration& duration) { boost::unique_lock<boost::mutex> dummy(lockResult); - return conditionJobDone.timed_wait(dummy, duration, [&] { return this->jobDone(jobsTotal); }); - //use timed_wait predicate if exitting before condition is reached: http://www.boost.org/doc/libs/1_49_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref.condition_variable.timed_wait_rel + return conditionJobDone.wait_for(dummy, duration, [&] { return this->jobDone(jobsTotal); }); //throw boost::thread_interrupted } std::unique_ptr<T> getResult(size_t jobsTotal) { boost::unique_lock<boost::mutex> dummy(lockResult); - while (!jobDone(jobsTotal)) - conditionJobDone.timed_wait(dummy, boost::posix_time::milliseconds(50)); //interruption point! + conditionJobDone.wait(dummy, [&] { return this->jobDone(jobsTotal); }); //throw boost::thread_interrupted #ifndef NDEBUG assert(!returnedResult); @@ -177,15 +200,15 @@ private: template <class T> inline -GetFirstResult<T>::GetFirstResult() : asyncResult(std::make_shared<AsyncResult>()), jobsTotal_(0) {} +GetFirstResult<T>::GetFirstResult() : asyncResult_(std::make_shared<AsyncResult>()), jobsTotal_(0) {} template <class T> template <class Fun> inline void GetFirstResult<T>::addJob(Fun f) //f must return a std::unique_ptr<T> containing a value on success { - auto asyncResult2 = asyncResult; //capture member variable, not "this"! - boost::thread t([asyncResult2, f] { asyncResult2->reportFinished(f()); }); + auto asyncResult = this->asyncResult_; //capture member variable, not "this"! + boost::thread t([asyncResult, f] { asyncResult->reportFinished(f()); }); ++jobsTotal_; t.detach(); //we have to be explicit since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!! } @@ -193,11 +216,11 @@ void GetFirstResult<T>::addJob(Fun f) //f must return a std::unique_ptr<T> conta template <class T> template <class Duration> inline -bool GetFirstResult<T>::timedWait(const Duration& duration) const { return asyncResult->waitForResult(jobsTotal_, duration); } +bool GetFirstResult<T>::timedWait(const Duration& duration) const { return asyncResult_->waitForResult(jobsTotal_, duration); } template <class T> inline -std::unique_ptr<T> GetFirstResult<T>::get() const { return asyncResult->getResult(jobsTotal_); } +std::unique_ptr<T> GetFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal_); } } #endif //BOOST_THREAD_WRAP_H_78963234 |