summaryrefslogtreecommitdiff
path: root/shared/parallelCall.cpp
blob: ce0a3633ec25687b877d1631f20ab1fbc3aff304 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// **************************************************************************
// * This file is part of the FreeFileSync project. It is distributed under *
// * GNU General Public License: http://www.gnu.org/licenses/gpl.html       *
// * Copyright (C) 2008-2010 ZenJu (zhnmju123 AT gmx.de)                    *
// **************************************************************************
//
#include "parallelCall.h"
#include <wx/thread.h>
#include <stdexcept>
#include <wx/string.h>
#include <wx/msgdlg.h>

namespace
{
class WorkerThread : public wxThread
{
public:
    WorkerThread(const Async::AsyncProcess& proc,
                 const boost::shared_ptr<wxMutex>& mainIsListening,
                 const boost::shared_ptr<wxCondition>& procHasFinished) :
        wxThread(wxTHREAD_DETACHED),
        proc_(proc),                       //copy input data by value
        mainIsListening_(mainIsListening), //
        procHasFinished_(procHasFinished)  //
    {
        if (Create() != wxTHREAD_NO_ERROR)
            throw std::runtime_error("Error creating async worker thread!");

        if (Run() != wxTHREAD_NO_ERROR)
            throw std::runtime_error("Error starting async worker thread!");
    }

    ExitCode Entry()
    {
        try
        {
            proc_->doWork();

            //notify that work is done
            wxMutexLocker dummy(*mainIsListening_);
            procHasFinished_->Signal();
        }
        catch (const std::exception& e) //exceptions must be catched per thread
        {
            wxMessageBox(wxString::FromAscii(e.what()), _("An exception occured!"), wxOK | wxICON_ERROR);
        }
        catch (...) //exceptions must be catched per thread
        {
            wxMessageBox(wxT("Unknown exception in worker thread"), _("An exception occured!"), wxOK | wxICON_ERROR);
        }

        return 0;
    }

private:
    Async::AsyncProcess proc_;
    boost::shared_ptr<wxMutex> mainIsListening_;      //shared pointer is safe to use in MT context (same guarantee like builtin types!)
    boost::shared_ptr<wxCondition> procHasFinished_;  //http://www.boost.org/doc/libs/1_43_0/libs/smart_ptr/shared_ptr.htm#ThreadSafety
};
}


Async::Result Async::execute(AsyncProcess proc, size_t maxWait) //maxWait = max. wait time in milliseconds
{
    boost::shared_ptr<wxMutex> mainIsListening(new wxMutex);
    boost::shared_ptr<wxCondition> procHasFinished(new wxCondition(*mainIsListening));
    wxMutexLocker dummy(*mainIsListening); //the mutex should be initially locked (= "main not listening")

    new WorkerThread(proc, mainIsListening, procHasFinished);

    return procHasFinished->WaitTimeout(static_cast<unsigned long>(maxWait)) == wxCOND_NO_ERROR ? WORK_DONE : TIMEOUT;
}


//    ------------------------------------------------------
//    |Pattern: workload queue and multiple worker threads |
//    ------------------------------------------------------
//typedef std::vector<DirectoryDescrType*> Workload;
//
//class ThreadSorting : public wxThread
//{
//public:
//    ThreadSorting(wxCriticalSection& syncWorkload, Workload& workload) :
//            wxThread(wxTHREAD_JOINABLE),
//            syncWorkload_(syncWorkload),
//            workload_(workload)
//    {
//        if (Create() != wxTHREAD_NO_ERROR)
//            throw RuntimeException(wxString(wxT("Error creating thread for sorting!")));
//    }
//
//    ~ThreadSorting() {}
//
//
//    ExitCode Entry()
//    {
//        while (true)
//        {
//            DirectoryDescrType* descr = NULL;
//            {  //see if there is work to do...
//                wxCriticalSectionLocker dummy(syncWorkload_);
//                if (workload_.empty())
//                    return 0;
//                else
//                {
//                    descr = workload_.back();
//                    workload_.pop_back();
//                }
//            }
//            //do work
//            std::sort(descr->begin(), descr->end());
//        }
//    }
//
//private:
//    wxCriticalSection& syncWorkload_;
//    Workload& workload_;
//};
//
//
//void DirectoryDescrBuffer::preFillBuffers(const std::vector<FolderPairCfg>& fpConfigFormatted)
//{
//    //assemble workload
//	...
//
//    //we use binary search when comparing the directory structures: so sort() first
//    const int CPUCount = wxThread::GetCPUCount();
//    if (CPUCount >= 2) //do it the multithreaded way:
//    {
//        wxCriticalSection syncWorkload;
//
//        typedef std::vector<boost::shared_ptr<ThreadSorting> > ThreadContainer;
//        ThreadContainer sortThreads;
//        sortThreads.reserve(CPUCount);
//
//        //start CPUCount worker threads
//        for (size_t i = 0; i < std::min(static_cast<size_t>(CPUCount), workload.size()); ++i)
//        {
//            boost::shared_ptr<ThreadSorting> newWorker(new ThreadSorting(syncWorkload, workload));
//
//            if (newWorker->Run() != wxTHREAD_NO_ERROR)
//                throw RuntimeException(wxString(wxT("Error starting thread for sorting!")));
//
//            sortThreads.push_back(newWorker);
//        }
//
//        //wait until all worker are finished
//        for (ThreadContainer::iterator i = sortThreads.begin(); i != sortThreads.end(); ++i)
//        {
//            if ((*i)->Wait() != 0)
//                throw RuntimeException(wxString(wxT("Error waiting for thread (sorting)!")));
//        }
//    }
//    else //single threaded
//    {
//        for (Workload::iterator i = workload.begin(); i != workload.end(); ++i)
//            std::sort((*i)->begin(), (*i)->end());
//    }
//}
bgstack15