summaryrefslogtreecommitdiff
path: root/lib/parallel_scan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/parallel_scan.cpp')
-rw-r--r--lib/parallel_scan.cpp96
1 files changed, 37 insertions, 59 deletions
diff --git a/lib/parallel_scan.cpp b/lib/parallel_scan.cpp
index a31e30ee..94f6b0f4 100644
--- a/lib/parallel_scan.cpp
+++ b/lib/parallel_scan.cpp
@@ -138,30 +138,21 @@ Windows 7: Windows XP:
2 Threads: 42s | 11s 2 Threads: 38s | 8s
=> Traversing does not take any advantage of file locality so that even multiple threads operating on the same disk impose no performance overhead! (even faster on XP)
-*/
-
std::vector<std::set<DirectoryKey>> separateByDistinctDisk(const std::set<DirectoryKey>& dirkeys)
{
- //see perf note: use one thread per dirkey:
- typedef std::map<int, std::set<DirectoryKey>> DiskKeyMapping;
+ //use one thread per physical disk:
+ typedef std::map<DiskInfo, std::set<DirectoryKey>> DiskKeyMapping;
DiskKeyMapping tmp;
- int index = 0;
std::for_each(dirkeys.begin(), dirkeys.end(),
- [&](const DirectoryKey& key) { tmp[++index].insert(key); });
+ [&](const DirectoryKey& key) { tmp[retrieveDiskInfo(key.dirnameFull_)].insert(key); });
- /*
- //use one thread per physical disk:
- typedef std::map<DiskInfo, std::set<DirectoryKey>> DiskKeyMapping;
- DiskKeyMapping tmp;
- std::for_each(dirkeys.begin(), dirkeys.end(),
- [&](const DirectoryKey& key) { tmp[retrieveDiskInfo(key.dirnameFull_)].insert(key); });
- */
std::vector<std::set<DirectoryKey>> buckets;
std::transform(tmp.begin(), tmp.end(), std::back_inserter(buckets),
[&](const DiskKeyMapping::value_type& diskToKey) { return diskToKey.second; });
return buckets;
}
+*/
//------------------------------------------------------------------------------------------
typedef Zbase<wchar_t, StorageRefCountThreadSafe> BasicWString; //thread safe string class for UI texts
@@ -193,7 +184,7 @@ public:
errorResponse.reset();
dummy.unlock(); //optimization for condition_variable::notify_one()
- conditionCanReportError.notify_one();
+ conditionCanReportError.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796
return rv;
}
@@ -207,7 +198,7 @@ public:
errorResponse.reset(new FillBufferCallback::HandleError(rv));
dummy.unlock(); //optimization for condition_variable::notify_one()
- conditionGotResponse.notify_one();
+ conditionGotResponse.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796
}
}
@@ -470,57 +461,53 @@ private:
const std::wstring textApplyingDstHack;
};
#endif
-//------------------------------------------------------------------------------------------
+//------------------------------------------------------------------------------------------
class WorkerThread
{
public:
WorkerThread(size_t threadID,
const std::shared_ptr<AsyncCallback>& acb,
- const std::vector<std::pair<DirectoryKey, DirectoryValue*>>& workload) :
+ const DirectoryKey& dirKey,
+ DirectoryValue& dirOutput) :
threadID_(threadID),
acb_(acb),
- workload_(workload) {}
+ dirKey_(dirKey),
+ dirOutput_(dirOutput) {}
void operator()() //thread entry
{
acb_->incActiveWorker();
ZEN_ON_SCOPE_EXIT(acb_->decActiveWorker(););
- std::for_each(workload_.begin(), workload_.end(),
- [&](std::pair<DirectoryKey, DirectoryValue*>& item)
- {
- const Zstring& directoryName = item.first.dirnameFull_;
- DirectoryValue& dirVal = *item.second;
-
- acb_->reportCurrentFile(directoryName, threadID_); //just in case first directory access is blocking
+ acb_->reportCurrentFile(dirKey_.dirnameFull_, threadID_); //just in case first directory access is blocking
- TraverserShared travCfg(threadID_,
- item.first.handleSymlinks_, //shared by all(!) instances of DirCallback while traversing a folder hierarchy
- item.first.filter_,
- dirVal.failedReads,
- *acb_);
+ TraverserShared travCfg(threadID_,
+ dirKey_.handleSymlinks_, //shared by all(!) instances of DirCallback while traversing a folder hierarchy
+ dirKey_.filter_,
+ dirOutput_.failedReads,
+ *acb_);
- DirCallback traverser(travCfg,
- Zstring(),
- dirVal.dirCont);
+ DirCallback traverser(travCfg,
+ Zstring(),
+ dirOutput_.dirCont);
- DstHackCallback* dstCallbackPtr = nullptr;
+ DstHackCallback* dstCallbackPtr = nullptr;
#ifdef FFS_WIN
- DstHackCallbackImpl dstCallback(*acb_, threadID_);
- dstCallbackPtr = &dstCallback;
+ DstHackCallbackImpl dstCallback(*acb_, threadID_);
+ dstCallbackPtr = &dstCallback;
#endif
- //get all files and folders from directoryPostfixed (and subdirectories)
- traverseFolder(directoryName, traverser, dstCallbackPtr); //exceptions may be thrown!
- });
+ //get all files and folders from directoryPostfixed (and subdirectories)
+ traverseFolder(dirKey_.dirnameFull_, traverser, dstCallbackPtr); //exceptions may be thrown!
}
private:
size_t threadID_;
std::shared_ptr<AsyncCallback> acb_;
- std::vector<std::pair<DirectoryKey, DirectoryValue*>> workload_;
+ const DirectoryKey dirKey_;
+ DirectoryValue& dirOutput_;
};
}
@@ -532,13 +519,11 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in
{
buf.clear();
- std::vector<std::set<DirectoryKey>> buckets = separateByDistinctDisk(keysToRead); //one bucket per physical device
-
FixedList<boost::thread> worker; //note: we cannot use std::vector<boost::thread>: compiler error on GCC 4.7, probably a boost screw-up
zen::ScopeGuard guardWorker = zen::makeGuard([&]
{
- std::for_each(worker.begin(), worker.end(), [](boost::thread& wt) { wt.interrupt(); }); //interrupt all at once, then join
+ std::for_each(worker.begin(), worker.end(), [](boost::thread& wt) { wt.interrupt(); }); //interrupt all at once first, then join
std::for_each(worker.begin(), worker.end(), [](boost::thread& wt)
{
if (wt.joinable()) //= precondition of thread::join(), which throws an exception if violated!
@@ -549,28 +534,21 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in
std::shared_ptr<AsyncCallback> acb = std::make_shared<AsyncCallback>();
//init worker threads
- for (auto iter = buckets.begin(); iter != buckets.end(); ++iter)
+ std::for_each(keysToRead.begin(), keysToRead.end(),
+ [&](const DirectoryKey& key)
{
- const std::set<DirectoryKey>& bucket = *iter;
-
- std::vector<std::pair<DirectoryKey, DirectoryValue*>> workload;
- std::for_each(bucket.begin(), bucket.end(),
- [&](const DirectoryKey& key)
- {
- auto rv = buf.insert(std::make_pair(key, DirectoryValue()));
- assert(rv.second);
- workload.push_back(std::make_pair(key, &rv.first->second));
- });
+ assert(buf.find(key) == buf.end());
+ DirectoryValue& dirOutput = buf[key];
- const size_t threadId = iter - buckets.begin();
- worker.emplace_back(WorkerThread(threadId, acb, workload));
- }
+ const size_t threadId = worker.size();
+ worker.emplace_back(WorkerThread(threadId, acb, key, dirOutput));
+ });
//wait until done
size_t threadId = 0;
- for (auto iter = worker.begin(); iter != worker.end(); ++iter, ++threadId)
+ for (auto it = worker.begin(); it != worker.end(); ++it, ++threadId)
{
- boost::thread& wt = *iter;
+ boost::thread& wt = *it;
acb->setNotifyingThread(threadId); //process info messages of first (active) thread only
bgstack15