summaryrefslogtreecommitdiff
path: root/zen/thread.h
blob: a834f070081264fa246ecd4826c3bffc4dbbb3ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// **************************************************************************
// * This file is part of the FreeFileSync project. It is distributed under *
// * GNU General Public License: http://www.gnu.org/licenses/gpl.html       *
// * Copyright (C) Zenju (zenju AT gmx DOT de) - All Rights Reserved        *
// **************************************************************************

#ifndef BOOST_THREAD_WRAP_H_78963234
#define BOOST_THREAD_WRAP_H_78963234

//temporary solution until C++11 thread becomes fully available (considering std::thread's non-interruptibility and std::async craziness, this may be NEVER)
#include <memory>

//workaround this pathetic boost thread warning mess
#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wswitch-enum"
#pragma GCC diagnostic ignored "-Wstrict-aliasing"
#pragma GCC diagnostic ignored "-Wredundant-decls"
#pragma GCC diagnostic ignored "-Wshadow"
#ifndef __clang__ //clang defines __GNUC__, but doesn't support this warning
#pragma GCC diagnostic ignored "-Wunused-local-typedefs"
#endif
#endif
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4702 4913) //unreachable code; user defined binary operator ',' exists but no overload could convert all operands, default built-in binary operator ',' used
#endif

#include <boost/thread.hpp>

#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
#ifdef _MSC_VER
#pragma warning(pop)
#endif

namespace zen
{
/*
std::async replacement without crappy semantics:
	1. guaranteed to run asynchronous
	2. does not follow C++11 [futures.async], Paragraph 5, where std::future waits for thread in destructor

Example:
        Zstring dirpath = ...
        auto ft = zen::async([=](){ return zen::dirExists(dirpath); });
        if (ft.timed_wait(boost::posix_time::milliseconds(200)) && ft.get())
            //dir exising
*/
template <class Function>
auto async(Function fun) -> boost::unique_future<decltype(fun())>;

//wait for all with a time limit: return true if *all* results are available!
template<class InputIterator, class Duration>
bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration);

//wait until first job is successful or all failed
template <class T>
class GetFirstResult
{
public:
    GetFirstResult();

    template <class Fun>
    void addJob(Fun f); //f must return a std::unique_ptr<T> containing a value if successful

    template <class Duration>
    bool timedWait(const Duration& duration) const; //true: "get()" is ready, false: time elapsed

    //return first value or none if all jobs failed; blocks until result is ready!
    std::unique_ptr<T> get() const; //may be called only once!

private:
    class AsyncResult;
    std::shared_ptr<AsyncResult> asyncResult;
    size_t jobsTotal_;
};







//###################### implementation ######################
#ifndef BOOST_HAS_THREADS
#error just some paranoia check...
#endif

template <class Function> inline
auto async(Function fun) -> boost::unique_future<decltype(fun())>
{
    typedef decltype(fun()) ResultType;

#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK //mirror "boost/thread/future.hpp", hopefully they know what they're doing
    boost::packaged_task<ResultType()> pt(std::move(fun)); //packaged task seems to even require r-value reference: https://sourceforge.net/p/freefilesync/bugs/234/
#else
    boost::packaged_task<ResultType> pt(std::move(fun));
#endif
    auto fut = pt.get_future();
    boost::thread(std::move(pt)).detach(); //we have to explicitly detach since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!!
    return std::move(fut); //compiler error without "move", why needed???
}


template<class InputIterator, class Duration> inline
bool wait_for_all_timed(InputIterator first, InputIterator last, const Duration& wait_duration)
{
    const boost::system_time endTime = boost::get_system_time() + wait_duration;
    for (; first != last; ++first)
        if (!first->timed_wait_until(endTime))
            return false; //time elapsed
    return true;
}


template <class T>
class GetFirstResult<T>::AsyncResult
{
public:
    AsyncResult() :
#ifndef NDEBUG
        returnedResult(false),
#endif
        jobsFinished(0) {}

    //context: worker threads
    void reportFinished(std::unique_ptr<T>&& result)
    {
        {
            boost::lock_guard<boost::mutex> dummy(lockResult);
            ++jobsFinished;
            if (!result_)
                result_ = std::move(result);
        }
        conditionJobDone.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796
        //condition handling, see: http://www.boost.org/doc/libs/1_43_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref
    }

    //context: main thread
    template <class Duration>
    bool waitForResult(size_t jobsTotal, const Duration& duration)
    {
        boost::unique_lock<boost::mutex> dummy(lockResult);
        return conditionJobDone.timed_wait(dummy, duration, [&] { return this->jobDone(jobsTotal); });
        //use timed_wait predicate if exitting before condition is reached: http://www.boost.org/doc/libs/1_49_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref.condition_variable.timed_wait_rel
    }

    std::unique_ptr<T> getResult(size_t jobsTotal)
    {
        boost::unique_lock<boost::mutex> dummy(lockResult);
        while (!jobDone(jobsTotal))
            conditionJobDone.timed_wait(dummy, boost::posix_time::milliseconds(50)); //interruption point!

#ifndef NDEBUG
        assert(!returnedResult);
        returnedResult = true;
#endif
        return std::move(result_);
    }

private:
    bool jobDone(size_t jobsTotal) const { return result_ || (jobsFinished >= jobsTotal); } //call while locked!

#ifndef NDEBUG
    bool returnedResult;
#endif

    boost::mutex lockResult;
    size_t jobsFinished;        //
    std::unique_ptr<T> result_; //our condition is: "have result" or "jobsFinished == jobsTotal"
    boost::condition_variable conditionJobDone;
};



template <class T> inline
GetFirstResult<T>::GetFirstResult() : asyncResult(std::make_shared<AsyncResult>()), jobsTotal_(0) {}


template <class T>
template <class Fun> inline
void GetFirstResult<T>::addJob(Fun f) //f must return a std::unique_ptr<T> containing a value on success
{
    auto asyncResult2 = asyncResult; //capture member variable, not "this"!
    boost::thread t([asyncResult2, f] { asyncResult2->reportFinished(f()); });
    ++jobsTotal_;
    t.detach(); //we have to be explicit since C++11: [thread.thread.destr] ~thread() calls std::terminate() if joinable()!!!
}


template <class T>
template <class Duration> inline
bool GetFirstResult<T>::timedWait(const Duration& duration) const { return asyncResult->waitForResult(jobsTotal_, duration); }


template <class T> inline
std::unique_ptr<T> GetFirstResult<T>::get() const { return asyncResult->getResult(jobsTotal_); }
}

#endif //BOOST_THREAD_WRAP_H_78963234
bgstack15