summaryrefslogtreecommitdiff
path: root/lib/parallel_scan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/parallel_scan.cpp')
-rw-r--r--lib/parallel_scan.cpp37
1 files changed, 17 insertions, 20 deletions
diff --git a/lib/parallel_scan.cpp b/lib/parallel_scan.cpp
index 94f6b0f4..37dd350e 100644
--- a/lib/parallel_scan.cpp
+++ b/lib/parallel_scan.cpp
@@ -13,11 +13,11 @@
#include <zen/thread.h> //includes <boost/thread.hpp>
#include <zen/scope_guard.h>
#include <zen/fixed_list.h>
+#include <boost/detail/atomic_count.hpp>
using namespace zen;
-
namespace
{
/*
@@ -111,8 +111,6 @@ DiskInfo retrieveDiskInfo(const Zstring& pathName)
return output;
}
-
-#elif defined FFS_LINUX
#endif
*/
@@ -195,16 +193,16 @@ public:
if (!errorMsg.empty() && !errorResponse.get())
{
FillBufferCallback::HandleError rv = callback.reportError(copyStringTo<std::wstring>(errorMsg)); //throw!
- errorResponse.reset(new FillBufferCallback::HandleError(rv));
+ errorResponse = make_unique<FillBufferCallback::HandleError>(rv);
dummy.unlock(); //optimization for condition_variable::notify_one()
conditionGotResponse.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796
}
}
- void setNotifyingThread(size_t threadID) { notifyingThreadID = threadID; } //context of main thread
+ void incrementNotifyingThreadId() { ++notifyingThreadID; } //context of main thread
- void reportCurrentFile(const Zstring& filename, size_t threadID) //context of worker thread
+ void reportCurrentFile(const Zstring& filename, long threadID) //context of worker thread
{
if (threadID != notifyingThreadID) return; //only one thread at a time may report status
@@ -213,7 +211,7 @@ public:
currentStatus.clear();
}
- void reportCurrentStatus(const std::wstring& status, size_t threadID) //context of worker thread
+ void reportCurrentStatus(const std::wstring& status, long threadID) //context of worker thread
{
if (threadID != notifyingThreadID) return; //only one thread may report status
@@ -264,7 +262,7 @@ private:
std::unique_ptr<FillBufferCallback::HandleError> errorResponse;
//---- status updates ----
- volatile size_t notifyingThreadID; //theoretically racy, but there is nothing that could go wrong...
+ boost::detail::atomic_count notifyingThreadID;
//CAVEAT: do NOT use boost::thread::id as long as this showstopper exists: https://svn.boost.org/trac/boost/ticket/5754
boost::mutex lockCurrentStatus; //use a different lock for current file: continue traversing while some thread may process an error
Zstring currentFile; //only one of these two is filled at a time!
@@ -282,7 +280,7 @@ private:
struct TraverserShared
{
public:
- TraverserShared(size_t threadID,
+ TraverserShared(long threadID,
SymLinkHandling handleSymlinks,
const HardFilter::FilterRef& filter,
std::set<Zstring>& failedReads,
@@ -299,7 +297,7 @@ public:
std::set<Zstring>& failedReads_; //relative postfixed names of directories that could not be read (empty for root)
AsyncCallback& acb_;
- size_t threadID_;
+ long threadID_;
};
@@ -445,7 +443,7 @@ DirCallback::HandleError DirCallback::onError(const std::wstring& msg)
class DstHackCallbackImpl : public DstHackCallback
{
public:
- DstHackCallbackImpl(AsyncCallback& acb, size_t threadID) :
+ DstHackCallbackImpl(AsyncCallback& acb, long threadID) :
acb_(acb),
threadID_(threadID),
textApplyingDstHack(replaceCpy(_("Encoding extended time information: %x"), L"%x", L"\n%x")) {}
@@ -457,7 +455,7 @@ private:
}
AsyncCallback& acb_;
- size_t threadID_;
+ long threadID_;
const std::wstring textApplyingDstHack;
};
#endif
@@ -467,7 +465,7 @@ private:
class WorkerThread
{
public:
- WorkerThread(size_t threadID,
+ WorkerThread(long threadID,
const std::shared_ptr<AsyncCallback>& acb,
const DirectoryKey& dirKey,
DirectoryValue& dirOutput) :
@@ -504,7 +502,7 @@ public:
}
private:
- size_t threadID_;
+ long threadID_;
std::shared_ptr<AsyncCallback> acb_;
const DirectoryKey dirKey_;
DirectoryValue& dirOutput_;
@@ -531,7 +529,7 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in
});
});
- std::shared_ptr<AsyncCallback> acb = std::make_shared<AsyncCallback>();
+ auto acb = std::make_shared<AsyncCallback>();
//init worker threads
std::for_each(keysToRead.begin(), keysToRead.end(),
@@ -540,18 +538,15 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in
assert(buf.find(key) == buf.end());
DirectoryValue& dirOutput = buf[key];
- const size_t threadId = worker.size();
+ const long threadId = static_cast<long>(worker.size());
worker.emplace_back(WorkerThread(threadId, acb, key, dirOutput));
});
//wait until done
- size_t threadId = 0;
- for (auto it = worker.begin(); it != worker.end(); ++it, ++threadId)
+ for (auto it = worker.begin(); it != worker.end(); ++it)
{
boost::thread& wt = *it;
- acb->setNotifyingThread(threadId); //process info messages of first (active) thread only
-
do
{
//update status
@@ -561,6 +556,8 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in
acb->processErrors(callback);
}
while (!wt.timed_join(boost::posix_time::milliseconds(updateInterval)));
+
+ acb->incrementNotifyingThreadId(); //process info messages of one thread at a time only
}
guardWorker.dismiss();
bgstack15