diff options
Diffstat (limited to 'zen/thread.h')
-rw-r--r-- | zen/thread.h | 119 |
1 files changed, 113 insertions, 6 deletions
diff --git a/zen/thread.h b/zen/thread.h index 4598ea99..ba9a46e2 100644 --- a/zen/thread.h +++ b/zen/thread.h @@ -8,6 +8,8 @@ #define BOOST_THREAD_WRAP_H //temporary solution until C++11 thread becomes fully available +#include <vector> +#include <memory> #ifdef __MINGW32__ #pragma GCC diagnostic push @@ -39,12 +41,31 @@ Example: template <class Function> auto async(Function fun) -> boost::unique_future<decltype(fun())>; +//wait for all with a time limit: return true if *all* results are available! template<class InputIterator, class Duration> -void wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration); +bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration); +//wait until first job is successful or all failed +template <class T> +class RunUntilFirstHit +{ +public: + RunUntilFirstHit(); + + template <class Fun> + void addJob(Fun f); //f must return a std::unique_ptr<T> containing a value if successful + template <class Duration> + bool timedWait(const Duration& duration) const; //true: "get()" is ready, false: time elapsed + //return first value or none if all jobs failed; blocks until result is ready! + std::unique_ptr<T> get() const; //must be called only once! +private: + class AsyncResult; + std::vector<boost::thread> workload; + std::shared_ptr<AsyncResult> result; +}; @@ -72,21 +93,107 @@ auto async2(Function fun) -> boost::unique_future<T> //workaround VS2010 bug: bo } -template <class Function> inline auto async(Function fun) -> boost::unique_future<decltype(fun())> { return async2<decltype(fun())>(fun); } +template <class Function> inline +auto async(Function fun) -> boost::unique_future<decltype(fun())> { return async2<decltype(fun())>(fun); } template<class InputIterator, class Duration> inline -void wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration) +bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration) { const boost::system_time endTime = boost::get_system_time() + wait_duration; while (first != last) { - first->timed_wait_until(endTime); - if (boost::get_system_time() >= endTime) - return; + if (!first->timed_wait_until(endTime)) + return false; //time elapsed ++first; } + return true; } + + +template <class T> +class RunUntilFirstHit<T>::AsyncResult +{ +public: + AsyncResult() : +#ifndef NDEBUG + returnedResult(false), +#endif + jobsFinished(0) {} + + //context: worker threads + void reportFinished(std::unique_ptr<T>&& result) + { + { + boost::unique_lock<boost::mutex> dummy(lockResult); + ++jobsFinished; + if (!result_) + result_ = std::move(result); + } + conditionJobDone.notify_one(); + //condition handling, see: http://www.boost.org/doc/libs/1_43_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref + } + + //context: main thread + template <class Duration> + bool waitForResult(size_t jobsTotal, const Duration& duration) + { + boost::unique_lock<boost::mutex> dummy(lockResult); + return conditionJobDone.timed_wait(dummy, duration, [&] { return this->jobDone(jobsTotal); }); + //use timed_wait predicate if exitting before condition is reached: http://www.boost.org/doc/libs/1_49_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref.condition_variable.timed_wait_rel + } + + std::unique_ptr<T> getResult(size_t jobsTotal) + { + boost::unique_lock<boost::mutex> dummy(lockResult); + + while (!jobDone(jobsTotal)) + conditionJobDone.timed_wait(dummy, boost::posix_time::milliseconds(50)); //interruption point! + +#ifndef NDEBUG + assert(!returnedResult); + returnedResult = true; +#endif + return std::move(result_); + } + +private: + bool jobDone(size_t jobsTotal) const { return result_ || (jobsFinished >= jobsTotal); } //call while locked! +#ifndef NDEBUG + bool returnedResult; +#endif + + boost::mutex lockResult; + size_t jobsFinished; // + std::unique_ptr<T> result_; //our condition is: "have result" or "jobsFinished == jobsTotal" + boost::condition_variable conditionJobDone; +}; + + + +template <class T> inline +RunUntilFirstHit<T>::RunUntilFirstHit() : result(std::make_shared<AsyncResult>()) {} + + +template <class T> +template <class Fun> inline +void RunUntilFirstHit<T>::addJob(Fun f) //f must return a std::unique_ptr<T> containing a value on success +{ + auto result2 = result; //VC11: this is ridiculous!!! + workload.push_back(boost::thread([result2, f] + { + result2->reportFinished(f()); + })); +} + + +template <class T> +template <class Duration> inline +bool RunUntilFirstHit<T>::timedWait(const Duration& duration) const { return result->waitForResult(workload.size(), duration); } + + +template <class T> inline +std::unique_ptr<T> RunUntilFirstHit<T>::get() const { return result->getResult(workload.size()); } } #endif //BOOST_THREAD_WRAP_H |