diff options
Diffstat (limited to 'lib/parallel_scan.cpp')
-rw-r--r-- | lib/parallel_scan.cpp | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/lib/parallel_scan.cpp b/lib/parallel_scan.cpp index 94f6b0f4..37dd350e 100644 --- a/lib/parallel_scan.cpp +++ b/lib/parallel_scan.cpp @@ -13,11 +13,11 @@ #include <zen/thread.h> //includes <boost/thread.hpp> #include <zen/scope_guard.h> #include <zen/fixed_list.h> +#include <boost/detail/atomic_count.hpp> using namespace zen; - namespace { /* @@ -111,8 +111,6 @@ DiskInfo retrieveDiskInfo(const Zstring& pathName) return output; } - -#elif defined FFS_LINUX #endif */ @@ -195,16 +193,16 @@ public: if (!errorMsg.empty() && !errorResponse.get()) { FillBufferCallback::HandleError rv = callback.reportError(copyStringTo<std::wstring>(errorMsg)); //throw! - errorResponse.reset(new FillBufferCallback::HandleError(rv)); + errorResponse = make_unique<FillBufferCallback::HandleError>(rv); dummy.unlock(); //optimization for condition_variable::notify_one() conditionGotResponse.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796 } } - void setNotifyingThread(size_t threadID) { notifyingThreadID = threadID; } //context of main thread + void incrementNotifyingThreadId() { ++notifyingThreadID; } //context of main thread - void reportCurrentFile(const Zstring& filename, size_t threadID) //context of worker thread + void reportCurrentFile(const Zstring& filename, long threadID) //context of worker thread { if (threadID != notifyingThreadID) return; //only one thread at a time may report status @@ -213,7 +211,7 @@ public: currentStatus.clear(); } - void reportCurrentStatus(const std::wstring& status, size_t threadID) //context of worker thread + void reportCurrentStatus(const std::wstring& status, long threadID) //context of worker thread { if (threadID != notifyingThreadID) return; //only one thread may report status @@ -264,7 +262,7 @@ private: std::unique_ptr<FillBufferCallback::HandleError> errorResponse; //---- status updates ---- - volatile size_t notifyingThreadID; //theoretically racy, but there is nothing that could go wrong... + boost::detail::atomic_count notifyingThreadID; //CAVEAT: do NOT use boost::thread::id as long as this showstopper exists: https://svn.boost.org/trac/boost/ticket/5754 boost::mutex lockCurrentStatus; //use a different lock for current file: continue traversing while some thread may process an error Zstring currentFile; //only one of these two is filled at a time! @@ -282,7 +280,7 @@ private: struct TraverserShared { public: - TraverserShared(size_t threadID, + TraverserShared(long threadID, SymLinkHandling handleSymlinks, const HardFilter::FilterRef& filter, std::set<Zstring>& failedReads, @@ -299,7 +297,7 @@ public: std::set<Zstring>& failedReads_; //relative postfixed names of directories that could not be read (empty for root) AsyncCallback& acb_; - size_t threadID_; + long threadID_; }; @@ -445,7 +443,7 @@ DirCallback::HandleError DirCallback::onError(const std::wstring& msg) class DstHackCallbackImpl : public DstHackCallback { public: - DstHackCallbackImpl(AsyncCallback& acb, size_t threadID) : + DstHackCallbackImpl(AsyncCallback& acb, long threadID) : acb_(acb), threadID_(threadID), textApplyingDstHack(replaceCpy(_("Encoding extended time information: %x"), L"%x", L"\n%x")) {} @@ -457,7 +455,7 @@ private: } AsyncCallback& acb_; - size_t threadID_; + long threadID_; const std::wstring textApplyingDstHack; }; #endif @@ -467,7 +465,7 @@ private: class WorkerThread { public: - WorkerThread(size_t threadID, + WorkerThread(long threadID, const std::shared_ptr<AsyncCallback>& acb, const DirectoryKey& dirKey, DirectoryValue& dirOutput) : @@ -504,7 +502,7 @@ public: } private: - size_t threadID_; + long threadID_; std::shared_ptr<AsyncCallback> acb_; const DirectoryKey dirKey_; DirectoryValue& dirOutput_; @@ -531,7 +529,7 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in }); }); - std::shared_ptr<AsyncCallback> acb = std::make_shared<AsyncCallback>(); + auto acb = std::make_shared<AsyncCallback>(); //init worker threads std::for_each(keysToRead.begin(), keysToRead.end(), @@ -540,18 +538,15 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in assert(buf.find(key) == buf.end()); DirectoryValue& dirOutput = buf[key]; - const size_t threadId = worker.size(); + const long threadId = static_cast<long>(worker.size()); worker.emplace_back(WorkerThread(threadId, acb, key, dirOutput)); }); //wait until done - size_t threadId = 0; - for (auto it = worker.begin(); it != worker.end(); ++it, ++threadId) + for (auto it = worker.begin(); it != worker.end(); ++it) { boost::thread& wt = *it; - acb->setNotifyingThread(threadId); //process info messages of first (active) thread only - do { //update status @@ -561,6 +556,8 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in acb->processErrors(callback); } while (!wt.timed_join(boost::posix_time::milliseconds(updateInterval))); + + acb->incrementNotifyingThreadId(); //process info messages of one thread at a time only } guardWorker.dismiss(); |