diff options
Diffstat (limited to 'lib/parallel_scan.cpp')
-rw-r--r-- | lib/parallel_scan.cpp | 96 |
1 files changed, 37 insertions, 59 deletions
diff --git a/lib/parallel_scan.cpp b/lib/parallel_scan.cpp index a31e30ee..94f6b0f4 100644 --- a/lib/parallel_scan.cpp +++ b/lib/parallel_scan.cpp @@ -138,30 +138,21 @@ Windows 7: Windows XP: 2 Threads: 42s | 11s 2 Threads: 38s | 8s => Traversing does not take any advantage of file locality so that even multiple threads operating on the same disk impose no performance overhead! (even faster on XP) -*/ - std::vector<std::set<DirectoryKey>> separateByDistinctDisk(const std::set<DirectoryKey>& dirkeys) { - //see perf note: use one thread per dirkey: - typedef std::map<int, std::set<DirectoryKey>> DiskKeyMapping; + //use one thread per physical disk: + typedef std::map<DiskInfo, std::set<DirectoryKey>> DiskKeyMapping; DiskKeyMapping tmp; - int index = 0; std::for_each(dirkeys.begin(), dirkeys.end(), - [&](const DirectoryKey& key) { tmp[++index].insert(key); }); + [&](const DirectoryKey& key) { tmp[retrieveDiskInfo(key.dirnameFull_)].insert(key); }); - /* - //use one thread per physical disk: - typedef std::map<DiskInfo, std::set<DirectoryKey>> DiskKeyMapping; - DiskKeyMapping tmp; - std::for_each(dirkeys.begin(), dirkeys.end(), - [&](const DirectoryKey& key) { tmp[retrieveDiskInfo(key.dirnameFull_)].insert(key); }); - */ std::vector<std::set<DirectoryKey>> buckets; std::transform(tmp.begin(), tmp.end(), std::back_inserter(buckets), [&](const DiskKeyMapping::value_type& diskToKey) { return diskToKey.second; }); return buckets; } +*/ //------------------------------------------------------------------------------------------ typedef Zbase<wchar_t, StorageRefCountThreadSafe> BasicWString; //thread safe string class for UI texts @@ -193,7 +184,7 @@ public: errorResponse.reset(); dummy.unlock(); //optimization for condition_variable::notify_one() - conditionCanReportError.notify_one(); + conditionCanReportError.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796 return rv; } @@ -207,7 +198,7 @@ public: errorResponse.reset(new FillBufferCallback::HandleError(rv)); dummy.unlock(); //optimization for condition_variable::notify_one() - conditionGotResponse.notify_one(); + conditionGotResponse.notify_all(); //instead of notify_one(); workaround bug: https://svn.boost.org/trac/boost/ticket/7796 } } @@ -470,57 +461,53 @@ private: const std::wstring textApplyingDstHack; }; #endif -//------------------------------------------------------------------------------------------ +//------------------------------------------------------------------------------------------ class WorkerThread { public: WorkerThread(size_t threadID, const std::shared_ptr<AsyncCallback>& acb, - const std::vector<std::pair<DirectoryKey, DirectoryValue*>>& workload) : + const DirectoryKey& dirKey, + DirectoryValue& dirOutput) : threadID_(threadID), acb_(acb), - workload_(workload) {} + dirKey_(dirKey), + dirOutput_(dirOutput) {} void operator()() //thread entry { acb_->incActiveWorker(); ZEN_ON_SCOPE_EXIT(acb_->decActiveWorker();); - std::for_each(workload_.begin(), workload_.end(), - [&](std::pair<DirectoryKey, DirectoryValue*>& item) - { - const Zstring& directoryName = item.first.dirnameFull_; - DirectoryValue& dirVal = *item.second; - - acb_->reportCurrentFile(directoryName, threadID_); //just in case first directory access is blocking + acb_->reportCurrentFile(dirKey_.dirnameFull_, threadID_); //just in case first directory access is blocking - TraverserShared travCfg(threadID_, - item.first.handleSymlinks_, //shared by all(!) instances of DirCallback while traversing a folder hierarchy - item.first.filter_, - dirVal.failedReads, - *acb_); + TraverserShared travCfg(threadID_, + dirKey_.handleSymlinks_, //shared by all(!) instances of DirCallback while traversing a folder hierarchy + dirKey_.filter_, + dirOutput_.failedReads, + *acb_); - DirCallback traverser(travCfg, - Zstring(), - dirVal.dirCont); + DirCallback traverser(travCfg, + Zstring(), + dirOutput_.dirCont); - DstHackCallback* dstCallbackPtr = nullptr; + DstHackCallback* dstCallbackPtr = nullptr; #ifdef FFS_WIN - DstHackCallbackImpl dstCallback(*acb_, threadID_); - dstCallbackPtr = &dstCallback; + DstHackCallbackImpl dstCallback(*acb_, threadID_); + dstCallbackPtr = &dstCallback; #endif - //get all files and folders from directoryPostfixed (and subdirectories) - traverseFolder(directoryName, traverser, dstCallbackPtr); //exceptions may be thrown! - }); + //get all files and folders from directoryPostfixed (and subdirectories) + traverseFolder(dirKey_.dirnameFull_, traverser, dstCallbackPtr); //exceptions may be thrown! } private: size_t threadID_; std::shared_ptr<AsyncCallback> acb_; - std::vector<std::pair<DirectoryKey, DirectoryValue*>> workload_; + const DirectoryKey dirKey_; + DirectoryValue& dirOutput_; }; } @@ -532,13 +519,11 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in { buf.clear(); - std::vector<std::set<DirectoryKey>> buckets = separateByDistinctDisk(keysToRead); //one bucket per physical device - FixedList<boost::thread> worker; //note: we cannot use std::vector<boost::thread>: compiler error on GCC 4.7, probably a boost screw-up zen::ScopeGuard guardWorker = zen::makeGuard([&] { - std::for_each(worker.begin(), worker.end(), [](boost::thread& wt) { wt.interrupt(); }); //interrupt all at once, then join + std::for_each(worker.begin(), worker.end(), [](boost::thread& wt) { wt.interrupt(); }); //interrupt all at once first, then join std::for_each(worker.begin(), worker.end(), [](boost::thread& wt) { if (wt.joinable()) //= precondition of thread::join(), which throws an exception if violated! @@ -549,28 +534,21 @@ void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in std::shared_ptr<AsyncCallback> acb = std::make_shared<AsyncCallback>(); //init worker threads - for (auto iter = buckets.begin(); iter != buckets.end(); ++iter) + std::for_each(keysToRead.begin(), keysToRead.end(), + [&](const DirectoryKey& key) { - const std::set<DirectoryKey>& bucket = *iter; - - std::vector<std::pair<DirectoryKey, DirectoryValue*>> workload; - std::for_each(bucket.begin(), bucket.end(), - [&](const DirectoryKey& key) - { - auto rv = buf.insert(std::make_pair(key, DirectoryValue())); - assert(rv.second); - workload.push_back(std::make_pair(key, &rv.first->second)); - }); + assert(buf.find(key) == buf.end()); + DirectoryValue& dirOutput = buf[key]; - const size_t threadId = iter - buckets.begin(); - worker.emplace_back(WorkerThread(threadId, acb, workload)); - } + const size_t threadId = worker.size(); + worker.emplace_back(WorkerThread(threadId, acb, key, dirOutput)); + }); //wait until done size_t threadId = 0; - for (auto iter = worker.begin(); iter != worker.end(); ++iter, ++threadId) + for (auto it = worker.begin(); it != worker.end(); ++it, ++threadId) { - boost::thread& wt = *iter; + boost::thread& wt = *it; acb->setNotifyingThread(threadId); //process info messages of first (active) thread only |