summaryrefslogtreecommitdiff
path: root/zen/thread.h
diff options
context:
space:
mode:
authorB Stack <bgstack15@gmail.com>2020-09-01 00:24:17 +0000
committerB Stack <bgstack15@gmail.com>2020-09-01 00:24:17 +0000
commit5a3f52b016581a6a0cb4513614b6c620d365dde2 (patch)
treeacfdfb3e1046db87040477033fda0df76d92916a /zen/thread.h
parentMerge branch '11.0' into 'master' (diff)
parentadd upstream 11.1 (diff)
downloadFreeFileSync-5a3f52b016581a6a0cb4513614b6c620d365dde2.tar.gz
FreeFileSync-5a3f52b016581a6a0cb4513614b6c620d365dde2.tar.bz2
FreeFileSync-5a3f52b016581a6a0cb4513614b6c620d365dde2.zip
Merge branch '11.1' into 'master'11.1
add upstream 11.1 See merge request opensource-tracking/FreeFileSync!25
Diffstat (limited to 'zen/thread.h')
-rw-r--r--zen/thread.h223
1 files changed, 105 insertions, 118 deletions
diff --git a/zen/thread.h b/zen/thread.h
index 99e61e1f..1bea95ea 100644
--- a/zen/thread.h
+++ b/zen/thread.h
@@ -12,81 +12,89 @@
#include "scope_guard.h"
#include "ring_buffer.h"
#include "string_tools.h"
+#include "zstring.h"
namespace zen
{
class InterruptionStatus;
+//migrate towards https://en.cppreference.com/w/cpp/thread/jthread
class InterruptibleThread
{
public:
InterruptibleThread() {}
- InterruptibleThread (InterruptibleThread&&) noexcept = default;
- InterruptibleThread& operator=(InterruptibleThread&&) noexcept = default;
+ InterruptibleThread (InterruptibleThread&& ) noexcept = default;
+ InterruptibleThread& operator=(InterruptibleThread&& tmp) noexcept //don't use swap() but end stdThread_ life time immediately
+ {
+ if (joinable())
+ {
+ requestStop();
+ join();
+ }
+ stdThread_ = std::move(tmp.stdThread_);
+ intStatus_ = std::move(tmp.intStatus_);
+ return *this;
+ }
template <class Function>
- InterruptibleThread(Function&& f);
+ explicit InterruptibleThread(Function&& f);
+
+ ~InterruptibleThread()
+ {
+ if (joinable())
+ {
+ requestStop();
+ join();
+ }
+ }
bool joinable () const { return stdThread_.joinable(); }
- void interrupt();
+ void requestStop();
void join () { stdThread_.join(); }
void detach () { stdThread_.detach(); }
- template <class Rep, class Period>
- bool tryJoinFor(const std::chrono::duration<Rep, Period>& relTime)
- {
- if (threadCompleted_.wait_for(relTime) != std::future_status::ready)
- return false;
-
- stdThread_.join(); //runs thread-local destructors => this better be fast!!!
- return true;
- }
-
private:
std::thread stdThread_;
std::shared_ptr<InterruptionStatus> intStatus_ = std::make_shared<InterruptionStatus>();
- std::future<void> threadCompleted_;
};
+
+class ThreadStopRequest {};
+
//context of worker thread:
-void interruptionPoint(); //throw ThreadInterruption
+void interruptionPoint(); //throw ThreadStopRequest
template<class Predicate>
-void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred); //throw ThreadInterruption
+void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred); //throw ThreadStopRequest
template <class Rep, class Period>
-void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //throw ThreadInterruption
-
-void setCurrentThreadName(const char* threadName);
+void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime); //throw ThreadStopRequest
-uint64_t getThreadId(); //simple integer thread id, unlike boost::thread::id: https://svn.boost.org/trac/boost/ticket/5754
-uint64_t getMainThreadId();
+void setCurrentThreadName(const Zstring& threadName);
-inline bool runningMainThread() { return getThreadId() == getMainThreadId(); }
+bool runningOnMainThread();
//------------------------------------------------------------------------------------------
-/*
-std::async replacement without crappy semantics:
- 1. guaranteed to run asynchronously
- 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor
-
-Example:
- Zstring dirPath = ...
- auto ft = zen::runAsync([=]{ return zen::dirExists(dirPath); });
- if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get())
- //dir existing
-*/
+/* std::async replacement without crappy semantics:
+ 1. guaranteed to run asynchronously
+ 2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor
+
+ Example:
+ Zstring dirPath = ...
+ auto ft = zen::runAsync([=]{ return zen::dirExists(dirPath); });
+ if (ft.wait_for(std::chrono::milliseconds(200)) == std::future_status::ready && ft.get())
+ //dir existing */
template <class Function>
auto runAsync(Function&& fun);
//wait for all with a time limit: return true if *all* results are available!
//TODO: use std::when_all when available
template<class InputIterator, class Duration>
-bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration);
+bool waitForAllTimed(InputIterator first, InputIterator last, const Duration& wait_duration);
template<typename T> inline
-bool isReady(const std::future<T>& f) { return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; }
+bool isReady(const std::future<T>& f) { assert(f.valid()); return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; }
//------------------------------------------------------------------------------------------
//wait until first job is successful or all failed
@@ -115,13 +123,13 @@ private:
//------------------------------------------------------------------------------------------
//value associated with mutex and guaranteed protected access:
-//TODO: use std::synchronized_value when available
+//TODO: use std::synchronized_value when available http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0290r2.html
template <class T>
class Protected
{
public:
Protected() {}
- Protected(T& value) : value_(value) {}
+ explicit Protected(T& value) : value_(value) {}
//Protected(T&& tmp ) : value_(std::move(tmp)) {} <- wait until needed
template <class Function>
@@ -145,26 +153,24 @@ template <class Function>
class ThreadGroup
{
public:
- ThreadGroup(size_t threadCountMax, const std::string& groupName) : threadCountMax_(threadCountMax), groupName_(groupName)
+ ThreadGroup(size_t threadCountMax, const Zstring& groupName) : threadCountMax_(threadCountMax), groupName_(groupName)
{ if (threadCountMax == 0) throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); }
+ ThreadGroup (ThreadGroup&& tmp) noexcept = default; //noexcept *required* to support move for reallocations in std::vector and std::swap!!!
+ ThreadGroup& operator=(ThreadGroup&& tmp) noexcept = default; //don't use swap() but end worker_ life time immediately
+
~ThreadGroup()
{
- for (InterruptibleThread& w : worker_) w.interrupt(); //interrupt all first, then join
- for (InterruptibleThread& w : worker_) detach_ ? w.detach() : w.join();
- }
+ for (InterruptibleThread& w : worker_)
+ w.requestStop(); //stop *all* at the same time before join!
- ThreadGroup(ThreadGroup&& tmp) noexcept :
- worker_ (std::move(tmp.worker_)),
- workLoad_ (std::move(tmp.workLoad_)),
- detach_ (tmp.detach_),
- threadCountMax_(tmp.threadCountMax_),
- groupName_ (std::move(tmp.groupName_)) { tmp.worker_.clear(); /*just in case: make sure destructor is no-op!*/ }
-
- ThreadGroup& operator=(ThreadGroup&& tmp) noexcept { swap(tmp); return *this; } //noexcept *required* to support move for reallocations in std::vector and std::swap!!!
+ if (detach_) //detach() without requestStop() doesn't make sense
+ for (InterruptibleThread& w : worker_)
+ w.detach();
+ }
//context of controlling OR worker thread, non-blocking:
- void run(Function&& wi /*should throw ThreadInterruption when needed*/, bool insertFront = false)
+ void run(Function&& wi /*should throw ThreadStopRequest when needed*/, bool insertFront = false)
{
{
std::lock_guard dummy(workLoad_->lock);
@@ -214,22 +220,22 @@ private:
void addWorkerThread()
{
- std::string threadName = groupName_ + '[' + numberTo<std::string>(worker_.size() + 1) + '/' + numberTo<std::string>(threadCountMax_) + ']';
+ Zstring threadName = groupName_ + Zstr('[') + numberTo<Zstring>(worker_.size() + 1) + Zstr('/') + numberTo<Zstring>(threadCountMax_) + Zstr(']');
- worker_.emplace_back([wl = workLoad_, threadName = std::move(threadName)] //don't capture "this"! consider detach() and swap()
+ worker_.emplace_back([wl = workLoad_, threadName = std::move(threadName)] //don't capture "this"! consider detach() and move operations
{
- setCurrentThreadName(threadName.c_str());
+ setCurrentThreadName(threadName);
std::unique_lock dummy(wl->lock);
for (;;)
{
- interruptibleWait(wl->conditionNewTask, dummy, [&tasks = wl->tasks] { return !tasks.empty(); }); //throw ThreadInterruption
+ interruptibleWait(wl->conditionNewTask, dummy, [&tasks = wl->tasks] { return !tasks.empty(); }); //throw ThreadStopRequest
Function task = std::move(wl->tasks. front()); //noexcept thanks to move
/**/ wl->tasks.pop_front(); //
dummy.unlock();
- task(); //throw ThreadInterruption?
+ task(); //throw ThreadStopRequest?
dummy.lock();
if (--(wl->tasksPending) == 0)
@@ -239,22 +245,14 @@ private:
callbacks.swap(wl->onCompletionCallbacks);
dummy.unlock();
- for (const auto& cb : callbacks) cb(); //noexcept!
+ for (const auto& cb : callbacks)
+ cb(); //noexcept!
dummy.lock();
}
}
});
}
- void swap(ThreadGroup& other)
- {
- std::swap(worker_, other.worker_);
- std::swap(workLoad_, other.workLoad_);
- std::swap(detach_, other.detach_);
- std::swap(threadCountMax_, other.threadCountMax_);
- std::swap(groupName_, other.groupName_);
- }
-
struct WorkLoad
{
std::mutex lock;
@@ -268,7 +266,7 @@ private:
std::shared_ptr<WorkLoad> workLoad_ = std::make_shared<WorkLoad>();
bool detach_ = false;
size_t threadCountMax_;
- std::string groupName_;
+ Zstring groupName_;
};
@@ -313,12 +311,12 @@ auto runAsync(Function&& fun)
template<class InputIterator, class Duration> inline
-bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& duration)
+bool waitForAllTimed(InputIterator first, InputIterator last, const Duration& duration)
{
const std::chrono::steady_clock::time_point stopTime = std::chrono::steady_clock::now() + duration;
for (; first != last; ++first)
- if (first->wait_until(stopTime) != std::future_status::ready)
- return false; //time elapsed
+ if (first->wait_until(stopTime) == std::future_status::timeout)
+ return false;
return true;
}
@@ -360,7 +358,7 @@ private:
std::mutex lockResult_;
size_t jobsFinished_ = 0; //
- std::optional<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal"
+ std::optional<T> result_; //our condition is: "have result" or "jobsFinished_ == jobsTotal"
std::condition_variable conditionJobDone_;
};
@@ -390,16 +388,13 @@ std::optional<T> AsyncFirstResult<T>::get() const { return asyncResult_->getResu
//------------------------------------------------------------------------------------------
-class ThreadInterruption {};
-
-
class InterruptionStatus
{
public:
//context of InterruptibleThread instance:
- void interrupt()
+ void requestStop()
{
- interrupted_ = true;
+ stopRequested_ = true;
{
std::lock_guard dummy(lockSleep_); //needed! makes sure the following signal is not lost!
@@ -414,34 +409,34 @@ public:
}
//context of worker thread:
- void checkInterruption() //throw ThreadInterruption
+ void throwIfStopped() //throw ThreadStopRequest
{
- if (interrupted_)
- throw ThreadInterruption();
+ if (stopRequested_)
+ throw ThreadStopRequest();
}
//context of worker thread:
template<class Predicate>
- void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption
+ void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadStopRequest
{
setConditionVar(&cv);
ZEN_ON_SCOPE_EXIT(setConditionVar(nullptr));
- //"interrupted_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins
+ //"stopRequested_" is not protected by cv's mutex => signal may get lost!!! e.g. after condition was checked but before the wait begins
//=> add artifical time out to mitigate! CPU: 0.25% vs 0% for longer time out!
- while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->interrupted_ || pred(); }))
+ while (!cv.wait_for(lock, std::chrono::milliseconds(1), [&] { return this->stopRequested_ || pred(); }))
;
- checkInterruption(); //throw ThreadInterruption
+ throwIfStopped(); //throw ThreadStopRequest
}
//context of worker thread:
template <class Rep, class Period>
- void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption
+ void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadStopRequest
{
std::unique_lock lock(lockSleep_);
- if (conditionSleepInterruption_.wait_for(lock, relTime, [this] { return static_cast<bool>(this->interrupted_); }))
- throw ThreadInterruption();
+ if (conditionSleepInterruption_.wait_for(lock, relTime, [this] { return static_cast<bool>(this->stopRequested_); }))
+ throw ThreadStopRequest();
}
private:
@@ -451,7 +446,7 @@ private:
activeCondition_ = cv;
}
- std::atomic<bool> interrupted_{ false }; //std:atomic is uninitialized by default!!!
+ std::atomic<bool> stopRequested_{ false }; //std:atomic is uninitialized by default!!!
//"The default constructor is trivial: no initialization takes place other than zero initialization of static and thread-local objects."
std::condition_variable* activeCondition_ = nullptr;
@@ -464,43 +459,40 @@ private:
namespace impl
{
-inline
-InterruptionStatus*& refThreadLocalInterruptionStatus()
-{
- //thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine:
- thread_local InterruptionStatus* threadLocalInterruptionStatus = nullptr;
- return threadLocalInterruptionStatus;
-}
+//thread_local with non-POD seems to cause memory leaks on VS 14 => pointer only is fine:
+inline thread_local InterruptionStatus* threadLocalInterruptionStatus = nullptr;
}
+
//context of worker thread:
inline
-void interruptionPoint() //throw ThreadInterruption
+void interruptionPoint() //throw ThreadStopRequest
{
- assert(impl::refThreadLocalInterruptionStatus());
- if (impl::refThreadLocalInterruptionStatus())
- impl::refThreadLocalInterruptionStatus()->checkInterruption(); //throw ThreadInterruption
+ assert(impl::threadLocalInterruptionStatus);
+ if (impl::threadLocalInterruptionStatus)
+ impl::threadLocalInterruptionStatus->throwIfStopped(); //throw ThreadStopRequest
}
//context of worker thread:
template<class Predicate> inline
-void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadInterruption
+void interruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lock, Predicate pred) //throw ThreadStopRequest
{
- assert(impl::refThreadLocalInterruptionStatus());
- if (impl::refThreadLocalInterruptionStatus())
- impl::refThreadLocalInterruptionStatus()->interruptibleWait(cv, lock, pred);
+ assert(impl::threadLocalInterruptionStatus);
+ if (impl::threadLocalInterruptionStatus)
+ impl::threadLocalInterruptionStatus->interruptibleWait(cv, lock, pred);
else
cv.wait(lock, pred);
}
+
//context of worker thread:
template <class Rep, class Period> inline
-void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadInterruption
+void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //throw ThreadStopRequest
{
- assert(impl::refThreadLocalInterruptionStatus());
- if (impl::refThreadLocalInterruptionStatus())
- impl::refThreadLocalInterruptionStatus()->interruptibleSleep(relTime);
+ assert(impl::threadLocalInterruptionStatus);
+ if (impl::threadLocalInterruptionStatus)
+ impl::threadLocalInterruptionStatus->interruptibleSleep(relTime);
else
std::this_thread::sleep_for(relTime);
}
@@ -509,29 +501,24 @@ void interruptibleSleep(const std::chrono::duration<Rep, Period>& relTime) //thr
template <class Function> inline
InterruptibleThread::InterruptibleThread(Function&& f)
{
- std::promise<void> pFinished;
- threadCompleted_ = pFinished.get_future();
-
stdThread_ = std::thread([f = std::forward<Function>(f),
- intStatus = this->intStatus_,
- pFinished = std::move(pFinished)]() mutable
+ intStatus = this->intStatus_]() mutable
{
- assert(!impl::refThreadLocalInterruptionStatus());
- impl::refThreadLocalInterruptionStatus() = intStatus.get();
- ZEN_ON_SCOPE_EXIT(impl::refThreadLocalInterruptionStatus() = nullptr);
- ZEN_ON_SCOPE_EXIT(pFinished.set_value());
+ assert(!impl::threadLocalInterruptionStatus);
+ impl::threadLocalInterruptionStatus = intStatus.get();
+ ZEN_ON_SCOPE_EXIT(impl::threadLocalInterruptionStatus = nullptr);
try
{
- f(); //throw ThreadInterruption
+ f(); //throw ThreadStopRequest
}
- catch (ThreadInterruption&) {}
+ catch (ThreadStopRequest&) {}
});
}
inline
-void InterruptibleThread::interrupt() { intStatus_->interrupt(); }
+void InterruptibleThread::requestStop() { intStatus_->requestStop(); }
}
#endif //THREAD_H_7896323423432235246427
bgstack15