diff options
Diffstat (limited to 'zen/thread.h')
-rwxr-xr-x | zen/thread.h | 154 |
1 files changed, 89 insertions, 65 deletions
diff --git a/zen/thread.h b/zen/thread.h index ed61e06b..ee36f305 100755 --- a/zen/thread.h +++ b/zen/thread.h @@ -10,9 +10,9 @@ #include <thread> #include <future> #include "scope_guard.h" -#include "type_traits.h" +#include "ring_buffer.h" #include "optional.h" - #include <sys/prctl.h> +#include "string_tools.h" namespace zen @@ -23,8 +23,8 @@ class InterruptibleThread { public: InterruptibleThread() {} - InterruptibleThread (InterruptibleThread&& tmp) = default; - InterruptibleThread& operator=(InterruptibleThread&& tmp) = default; + InterruptibleThread (InterruptibleThread&&) noexcept = default; + InterruptibleThread& operator=(InterruptibleThread&&) noexcept = default; template <class Function> InterruptibleThread(Function&& f); @@ -46,7 +46,7 @@ public: private: std::thread stdThread_; - std::shared_ptr<InterruptionStatus> intStatus_{ std::make_shared<InterruptionStatus>() }; + std::shared_ptr<InterruptionStatus> intStatus_ = std::make_shared<InterruptionStatus>(); std::future<void> threadCompleted_; }; @@ -62,7 +62,9 @@ void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //th void setCurrentThreadName(const char* threadName); uint64_t getThreadId(); //simple integer thread id, unlike boost::thread::id: https://svn.boost.org/trac/boost/ticket/5754 +uint64_t getMainThreadId(); +inline bool runningMainThread() { return getThreadId() == getMainThreadId(); } //------------------------------------------------------------------------------------------ /* @@ -118,6 +120,7 @@ class Protected public: Protected() {} Protected(const T& value) : value_(value) {} + //Protected( T&& tmp ) : value_(std::move(tmp)) {} <- wait until needed template <class Function> auto access(Function fun) //-> decltype(fun(std::declval<T&>())) @@ -140,53 +143,101 @@ template <class Function> class ThreadGroup { public: - ThreadGroup(size_t threadCount, const std::string& groupName) - { - for (size_t i = 0; i < threadCount; ++i) - worker_.emplace_back([this, groupName, i, threadCount] - { - setCurrentThreadName((groupName + "[" + numberTo<std::string>(i + 1) + "/" + numberTo<std::string>(threadCount) + "]").c_str()); - for (;;) - getNextWorkItem()(); //throw ThreadInterruption - }); - } + ThreadGroup(size_t threadCountMax, const std::string& groupName) : threadCountMax_(threadCountMax), groupName_(groupName) + { if (threadCountMax == 0) throw std::logic_error("Contract violation! " + std::string(__FILE__) + ":" + numberTo<std::string>(__LINE__)); } + ~ThreadGroup() { for (InterruptibleThread& w : worker_) w.interrupt(); //interrupt all first, then join - for (InterruptibleThread& w : worker_) w.join(); + for (InterruptibleThread& w : worker_) detach_ ? w.detach() : w.join(); } + ThreadGroup(ThreadGroup&& tmp) noexcept : + worker_ (std::move(tmp.worker_)), + workLoad_ (std::move(tmp.workLoad_)), + detach_ (tmp.detach_), + threadCountMax_(tmp.threadCountMax_), + groupName_ (std::move(tmp.groupName_)) { tmp.worker_.clear(); /*just in case: make sure destructor is no-op!*/ } + + ThreadGroup& operator=(ThreadGroup&& tmp) noexcept { swap(tmp); return *this; } //noexcept *required* to support move for reallocations in std::vector and std::swap!!! + //context of controlling thread, non-blocking: void run(Function&& wi) { - assert(!worker_.empty()); + size_t tasksPending = 0; { - std::lock_guard<std::mutex> dummy(lockWork_); - workItems_.push_back(std::move(wi)); + std::lock_guard<std::mutex> dummy(workLoad_->lock); + workLoad_->tasks.push_back(std::move(wi)); + tasksPending = ++(workLoad_->tasksPending); } - conditionNewWork_.notify_all(); + workLoad_->conditionNewTask.notify_all(); + + if (worker_.size() < std::min(tasksPending, threadCountMax_)) + addWorkerThread(); } + //context of controlling thread, blocking: + void wait() + { + std::unique_lock<std::mutex> dummy(workLoad_->lock); + workLoad_->conditionTasksDone.wait(dummy, [&tasksPending = workLoad_->tasksPending] { return tasksPending == 0; }); + } + + void detach() { detach_ = true; } //not expected to also interrupt! + private: ThreadGroup (const ThreadGroup&) = delete; ThreadGroup& operator=(const ThreadGroup&) = delete; - //context of worker threads, blocking: - Function getNextWorkItem() //throw ThreadInterruption + void addWorkerThread() { - std::unique_lock<std::mutex> dummy(lockWork_); + std::string threadName = groupName_ + '[' + numberTo<std::string>(worker_.size() + 1) + '/' + numberTo<std::string>(threadCountMax_) + ']'; + + worker_.emplace_back([wl = workLoad_, threadName = std::move(threadName)] //don't capture "this"! consider detach() and swap() + { + setCurrentThreadName(threadName.c_str()); + + std::unique_lock<std::mutex> dummy(wl->lock); + for (;;) + { + interruptibleWait(wl->conditionNewTask, dummy, [&tasks = wl->tasks] { return !tasks.empty(); }); //throw ThreadInterruption + + Function task = std::move(wl->tasks. front()); //noexcept thanks to move + /**/ wl->tasks.pop_front(); // + dummy.unlock(); - interruptibleWait(conditionNewWork_, dummy, [this] { return !workItems_.empty(); }); //throw ThreadInterruption - warn_static("implement FIFO!?") + task(); - Function wi = std::move(workItems_. back()); // - /**/ workItems_.pop_back(); //noexcept thanks to move - return wi; // + dummy.lock(); + if (--(wl->tasksPending) == 0) + wl->conditionTasksDone.notify_all(); //too difficult to notify outside the lock + } + }); } + + void swap(ThreadGroup& other) + { + std::swap(worker_, other.worker_); + std::swap(workLoad_, other.workLoad_); + std::swap(detach_, other.detach_); + std::swap(threadCountMax_, other.threadCountMax_); + std::swap(groupName_, other.groupName_); + } + + struct WorkLoad + { + std::mutex lock; + RingBuffer<Function> tasks; //FIFO! :) + size_t tasksPending = 0; + std::condition_variable conditionNewTask; + std::condition_variable conditionTasksDone; + }; + std::vector<InterruptibleThread> worker_; - std::mutex lockWork_; - std::vector<Function> workItems_; - std::condition_variable conditionNewWork_; + std::shared_ptr<WorkLoad> workLoad_ = std::make_shared<WorkLoad>(); + bool detach_ = false; + size_t threadCountMax_; + std::string groupName_; }; @@ -201,7 +252,7 @@ private: namespace impl { template <class Function> inline -auto runAsync(Function&& fun, TrueType /*copy-constructible*/) +auto runAsync(Function&& fun, std::true_type /*copy-constructible*/) { using ResultType = decltype(fun()); @@ -214,11 +265,11 @@ auto runAsync(Function&& fun, TrueType /*copy-constructible*/) template <class Function> inline -auto runAsync(Function&& fun, FalseType /*copy-constructible*/) +auto runAsync(Function&& fun, std::false_type /*copy-constructible*/) { //support move-only function objects! auto sharedFun = std::make_shared<Function>(std::forward<Function>(fun)); - return runAsync([sharedFun] { return (*sharedFun)(); }, TrueType()); + return runAsync([sharedFun] { return (*sharedFun)(); }, std::true_type()); } } @@ -226,7 +277,7 @@ auto runAsync(Function&& fun, FalseType /*copy-constructible*/) template <class Function> inline auto runAsync(Function&& fun) { - return impl::runAsync(std::forward<Function>(fun), StaticBool<std::is_copy_constructible<Function>::value>()); + return impl::runAsync(std::forward<Function>(fun), std::is_copy_constructible<Function>()); } @@ -308,14 +359,6 @@ Opt<T> GetFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal //------------------------------------------------------------------------------------------ -//thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: -#if defined __GNUC__ || defined __clang__ - #define ZEN_THREAD_LOCAL_SPECIFIER __thread -#else - #error "Game over!" -#endif - - class ThreadInterruption {}; @@ -354,7 +397,7 @@ public: ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr)); //"interrupted_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins - //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! + //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out! while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted_ || pred(); })) ; @@ -393,7 +436,8 @@ namespace impl inline InterruptionStatus*& refThreadLocalInterruptionStatus() { - static ZEN_THREAD_LOCAL_SPECIFIER InterruptionStatus* threadLocalInterruptionStatus = nullptr; + //thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine: + thread_local InterruptionStatus* threadLocalInterruptionStatus = nullptr; return threadLocalInterruptionStatus; } } @@ -457,26 +501,6 @@ InterruptibleThread::InterruptibleThread(Function&& f) inline void InterruptibleThread::interrupt() { intStatus_->interrupt(); } - - - - -inline -void setCurrentThreadName(const char* threadName) -{ - ::prctl(PR_SET_NAME, threadName, 0, 0, 0); - -} - - -inline -uint64_t getThreadId() -{ - //obviously "gettid()" is not available on Ubuntu/Debian/Suse => use the OpenSSL approach: - static_assert(sizeof(uint64_t) >= sizeof(void*), ""); - return reinterpret_cast<uint64_t>(static_cast<void*>(&errno)); - -} } #endif //THREAD_H_7896323423432235246427 |