summaryrefslogtreecommitdiff
path: root/zen/thread.h
diff options
context:
space:
mode:
authorDaniel Wilhelm <shieldwed@outlook.com>2018-06-30 12:43:08 +0200
committerDaniel Wilhelm <shieldwed@outlook.com>2018-06-30 12:43:08 +0200
commita98326eb2954ac1e79f5eac28dbeab3ec15e047f (patch)
treebb16257a1894b488e365851273735ec13a9442ef /zen/thread.h
parent10.0 (diff)
downloadFreeFileSync-a98326eb2954ac1e79f5eac28dbeab3ec15e047f.tar.gz
FreeFileSync-a98326eb2954ac1e79f5eac28dbeab3ec15e047f.tar.bz2
FreeFileSync-a98326eb2954ac1e79f5eac28dbeab3ec15e047f.zip
10.1
Diffstat (limited to 'zen/thread.h')
-rwxr-xr-xzen/thread.h154
1 files changed, 89 insertions, 65 deletions
diff --git a/zen/thread.h b/zen/thread.h
index ed61e06b..ee36f305 100755
--- a/zen/thread.h
+++ b/zen/thread.h
@@ -10,9 +10,9 @@
#include <thread>
#include <future>
#include "scope_guard.h"
-#include "type_traits.h"
+#include "ring_buffer.h"
#include "optional.h"
- #include <sys/prctl.h>
+#include "string_tools.h"
namespace zen
@@ -23,8 +23,8 @@ class InterruptibleThread
{
public:
InterruptibleThread() {}
- InterruptibleThread (InterruptibleThread&& tmp) = default;
- InterruptibleThread& operator=(InterruptibleThread&& tmp) = default;
+ InterruptibleThread (InterruptibleThread&&) noexcept = default;
+ InterruptibleThread& operator=(InterruptibleThread&&) noexcept = default;
template <class Function>
InterruptibleThread(Function&& f);
@@ -46,7 +46,7 @@ public:
private:
std::thread stdThread_;
- std::shared_ptr<InterruptionStatus> intStatus_{ std::make_shared<InterruptionStatus>() };
+ std::shared_ptr<InterruptionStatus> intStatus_ = std::make_shared<InterruptionStatus>();
std::future<void> threadCompleted_;
};
@@ -62,7 +62,9 @@ void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //th
void setCurrentThreadName(const char* threadName);
uint64_t getThreadId(); //simple integer thread id, unlike boost::thread::id: https://svn.boost.org/trac/boost/ticket/5754
+uint64_t getMainThreadId();
+inline bool runningMainThread() { return getThreadId() == getMainThreadId(); }
//------------------------------------------------------------------------------------------
/*
@@ -118,6 +120,7 @@ class Protected
public:
Protected() {}
Protected(const T& value) : value_(value) {}
+ //Protected( T&& tmp ) : value_(std::move(tmp)) {} <- wait until needed
template <class Function>
auto access(Function fun) //-> decltype(fun(std::declval<T&>()))
@@ -140,53 +143,101 @@ template <class Function>
class ThreadGroup
{
public:
- ThreadGroup(size_t threadCount, const std::string& groupName)
- {
- for (size_t i = 0; i < threadCount; ++i)
- worker_.emplace_back([this, groupName, i, threadCount]
- {
- setCurrentThreadName((groupName + "[" + numberTo<std::string>(i + 1) + "/" + numberTo<std::string>(threadCount) + "]").c_str());
- for (;;)
- getNextWorkItem()(); //throw ThreadInterruption
- });
- }
+ ThreadGroup(size_t threadCountMax, const std::string& groupName) : threadCountMax_(threadCountMax), groupName_(groupName)
+ { if (threadCountMax == 0) throw std::logic_error("Contract violation! " + std::string(__FILE__) + ":" + numberTo<std::string>(__LINE__)); }
+
~ThreadGroup()
{
for (InterruptibleThread& w : worker_) w.interrupt(); //interrupt all first, then join
- for (InterruptibleThread& w : worker_) w.join();
+ for (InterruptibleThread& w : worker_) detach_ ? w.detach() : w.join();
}
+ ThreadGroup(ThreadGroup&& tmp) noexcept :
+ worker_ (std::move(tmp.worker_)),
+ workLoad_ (std::move(tmp.workLoad_)),
+ detach_ (tmp.detach_),
+ threadCountMax_(tmp.threadCountMax_),
+ groupName_ (std::move(tmp.groupName_)) { tmp.worker_.clear(); /*just in case: make sure destructor is no-op!*/ }
+
+ ThreadGroup& operator=(ThreadGroup&& tmp) noexcept { swap(tmp); return *this; } //noexcept *required* to support move for reallocations in std::vector and std::swap!!!
+
//context of controlling thread, non-blocking:
void run(Function&& wi)
{
- assert(!worker_.empty());
+ size_t tasksPending = 0;
{
- std::lock_guard<std::mutex> dummy(lockWork_);
- workItems_.push_back(std::move(wi));
+ std::lock_guard<std::mutex> dummy(workLoad_->lock);
+ workLoad_->tasks.push_back(std::move(wi));
+ tasksPending = ++(workLoad_->tasksPending);
}
- conditionNewWork_.notify_all();
+ workLoad_->conditionNewTask.notify_all();
+
+ if (worker_.size() < std::min(tasksPending, threadCountMax_))
+ addWorkerThread();
}
+ //context of controlling thread, blocking:
+ void wait()
+ {
+ std::unique_lock<std::mutex> dummy(workLoad_->lock);
+ workLoad_->conditionTasksDone.wait(dummy, [&tasksPending = workLoad_->tasksPending] { return tasksPending == 0; });
+ }
+
+ void detach() { detach_ = true; } //not expected to also interrupt!
+
private:
ThreadGroup (const ThreadGroup&) = delete;
ThreadGroup& operator=(const ThreadGroup&) = delete;
- //context of worker threads, blocking:
- Function getNextWorkItem() //throw ThreadInterruption
+ void addWorkerThread()
{
- std::unique_lock<std::mutex> dummy(lockWork_);
+ std::string threadName = groupName_ + '[' + numberTo<std::string>(worker_.size() + 1) + '/' + numberTo<std::string>(threadCountMax_) + ']';
+
+ worker_.emplace_back([wl = workLoad_, threadName = std::move(threadName)] //don't capture "this"! consider detach() and swap()
+ {
+ setCurrentThreadName(threadName.c_str());
+
+ std::unique_lock<std::mutex> dummy(wl->lock);
+ for (;;)
+ {
+ interruptibleWait(wl->conditionNewTask, dummy, [&tasks = wl->tasks] { return !tasks.empty(); }); //throw ThreadInterruption
+
+ Function task = std::move(wl->tasks. front()); //noexcept thanks to move
+ /**/ wl->tasks.pop_front(); //
+ dummy.unlock();
- interruptibleWait(conditionNewWork_, dummy, [this] { return !workItems_.empty(); }); //throw ThreadInterruption
- warn_static("implement FIFO!?")
+ task();
- Function wi = std::move(workItems_. back()); //
- /**/ workItems_.pop_back(); //noexcept thanks to move
- return wi; //
+ dummy.lock();
+ if (--(wl->tasksPending) == 0)
+ wl->conditionTasksDone.notify_all(); //too difficult to notify outside the lock
+ }
+ });
}
+
+ void swap(ThreadGroup& other)
+ {
+ std::swap(worker_, other.worker_);
+ std::swap(workLoad_, other.workLoad_);
+ std::swap(detach_, other.detach_);
+ std::swap(threadCountMax_, other.threadCountMax_);
+ std::swap(groupName_, other.groupName_);
+ }
+
+ struct WorkLoad
+ {
+ std::mutex lock;
+ RingBuffer<Function> tasks; //FIFO! :)
+ size_t tasksPending = 0;
+ std::condition_variable conditionNewTask;
+ std::condition_variable conditionTasksDone;
+ };
+
std::vector<InterruptibleThread> worker_;
- std::mutex lockWork_;
- std::vector<Function> workItems_;
- std::condition_variable conditionNewWork_;
+ std::shared_ptr<WorkLoad> workLoad_ = std::make_shared<WorkLoad>();
+ bool detach_ = false;
+ size_t threadCountMax_;
+ std::string groupName_;
};
@@ -201,7 +252,7 @@ private:
namespace impl
{
template <class Function> inline
-auto runAsync(Function&& fun, TrueType /*copy-constructible*/)
+auto runAsync(Function&& fun, std::true_type /*copy-constructible*/)
{
using ResultType = decltype(fun());
@@ -214,11 +265,11 @@ auto runAsync(Function&& fun, TrueType /*copy-constructible*/)
template <class Function> inline
-auto runAsync(Function&& fun, FalseType /*copy-constructible*/)
+auto runAsync(Function&& fun, std::false_type /*copy-constructible*/)
{
//support move-only function objects!
auto sharedFun = std::make_shared<Function>(std::forward<Function>(fun));
- return runAsync([sharedFun] { return (*sharedFun)(); }, TrueType());
+ return runAsync([sharedFun] { return (*sharedFun)(); }, std::true_type());
}
}
@@ -226,7 +277,7 @@ auto runAsync(Function&& fun, FalseType /*copy-constructible*/)
template <class Function> inline
auto runAsync(Function&& fun)
{
- return impl::runAsync(std::forward<Function>(fun), StaticBool<std::is_copy_constructible<Function>::value>());
+ return impl::runAsync(std::forward<Function>(fun), std::is_copy_constructible<Function>());
}
@@ -308,14 +359,6 @@ Opt<T> GetFirstResult<T>::get() const { return asyncResult_->getResult(jobsTotal
//------------------------------------------------------------------------------------------
-//thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine:
-#if defined __GNUC__ || defined __clang__
- #define ZEN_THREAD_LOCAL_SPECIFIER __thread
-#else
- #error "Game over!"
-#endif
-
-
class ThreadInterruption {};
@@ -354,7 +397,7 @@ public:
ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr));
//"interrupted_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins
- //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out!
+ //=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out!
while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted_ || pred(); }))
;
@@ -393,7 +436,8 @@ namespace impl
inline
InterruptionStatus*& refThreadLocalInterruptionStatus()
{
- static ZEN_THREAD_LOCAL_SPECIFIER InterruptionStatus* threadLocalInterruptionStatus = nullptr;
+ //thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine:
+ thread_local InterruptionStatus* threadLocalInterruptionStatus = nullptr;
return threadLocalInterruptionStatus;
}
}
@@ -457,26 +501,6 @@ InterruptibleThread::InterruptibleThread(Function&& f)
inline
void InterruptibleThread::interrupt() { intStatus_->interrupt(); }
-
-
-
-
-inline
-void setCurrentThreadName(const char* threadName)
-{
- ::prctl(PR_SET_NAME, threadName, 0, 0, 0);
-
-}
-
-
-inline
-uint64_t getThreadId()
-{
- //obviously "gettid()" is not available on Ubuntu/Debian/Suse => use the OpenSSL approach:
- static_assert(sizeof(uint64_t) >= sizeof(void*), "");
- return reinterpret_cast<uint64_t>(static_cast<void*>(&errno));
-
-}
}
#endif //THREAD_H_7896323423432235246427
bgstack15