diff options
Diffstat (limited to 'lib/parallel_scan.cpp')
-rw-r--r-- | lib/parallel_scan.cpp | 600 |
1 files changed, 600 insertions, 0 deletions
diff --git a/lib/parallel_scan.cpp b/lib/parallel_scan.cpp new file mode 100644 index 00000000..29d87ee7 --- /dev/null +++ b/lib/parallel_scan.cpp @@ -0,0 +1,600 @@ +// ************************************************************************** +// * This file is part of the FreeFileSync project. It is distributed under * +// * GNU General Public License: http://www.gnu.org/licenses/gpl.html * +// * Copyright (C) 2008-2011 ZenJu (zhnmju123 AT gmx.de) * +// ************************************************************************** + +#include "parallel_scan.h" +#include <boost/detail/atomic_count.hpp> +#include "db_file.h" +#include "lock_holder.h" +#include <zen/file_traverser.h> +#include <zen/file_error.h> +#include <wx+/string_conv.h> +#include <zen/thread.h> //includes <boost/thread.hpp> +#include <zen/scope_guard.h> + +using namespace zen; + + +#ifndef BOOST_HAS_THREADS +#error just some paranoia check... +#endif + + +namespace +{ +/* +#ifdef FFS_WIN + +struct DiskInfo +{ + DiskInfo() : + driveType(DRIVE_UNKNOWN), + diskID(-1) {} + + UINT driveType; + int diskID; // -1 if id could not be determined, this one is filled if driveType == DRIVE_FIXED or DRIVE_REMOVABLE; +}; + +inline +bool operator<(const DiskInfo& lhs, const DiskInfo& rhs) +{ + if (lhs.driveType != rhs.driveType) + return lhs.driveType < rhs.driveType; + + if (lhs.diskID < 0 || rhs.diskID < 0) + return false; + //consider "same", reason: one volume may be uniquely associated with one disk, while the other volume is associated to the same disk AND another one! + //volume <-> disk is 0..N:1..N + + return lhs.diskID < rhs.diskID ; +} + + +DiskInfo retrieveDiskInfo(const Zstring& pathName) +{ + std::vector<wchar_t> volName(std::max(pathName.size(), static_cast<size_t>(10000))); + + DiskInfo output; + + //full pathName need not yet exist! + if (!::GetVolumePathName(pathName.c_str(), //__in LPCTSTR lpszFileName, + &volName[0], //__out LPTSTR lpszVolumePathName, + static_cast<DWORD>(volName.size()))) //__in DWORD cchBufferLength + return output; + + const Zstring rootPathName = &volName[0]; + + output.driveType = ::GetDriveType(rootPathName.c_str()); + + if (output.driveType == DRIVE_NO_ROOT_DIR) //these two should be the same error category + output.driveType = DRIVE_UNKNOWN; + + if (output.driveType != DRIVE_FIXED && output.driveType != DRIVE_REMOVABLE) + return output; //no reason to get disk ID + + //go and find disk id: + + //format into form: "\\.\C:" + Zstring volnameFmt = rootPathName; + if (endsWith(volnameFmt, FILE_NAME_SEPARATOR)) + volnameFmt.resize(volnameFmt.size() - 1); + volnameFmt = L"\\\\.\\" + volnameFmt; + + HANDLE hVolume = ::CreateFile(volnameFmt.c_str(), + 0, + FILE_SHARE_READ | FILE_SHARE_WRITE, + 0, + OPEN_EXISTING, + 0, + NULL); + if (hVolume == INVALID_HANDLE_VALUE) + return output; + ZEN_ON_BLOCK_EXIT(::CloseHandle(hVolume)); + + std::vector<char> buffer(sizeof(VOLUME_DISK_EXTENTS) + sizeof(DISK_EXTENT)); //reserve buffer for at most one disk! call below will then fail if volume spans multiple disks! + + DWORD bytesReturned = 0; + if (!::DeviceIoControl(hVolume, // handle to device + IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS, // dwIoControlCode + NULL, // lpInBuffer + 0, // nInBufferSize + &buffer[0], // output buffer + static_cast<DWORD>(buffer.size()), // size of output buffer + &bytesReturned, // number of bytes returned + NULL)) // OVERLAPPED structure + return output; + + const VOLUME_DISK_EXTENTS& volDisks = *reinterpret_cast<VOLUME_DISK_EXTENTS*>(&buffer[0]); + + if (volDisks.NumberOfDiskExtents != 1) + return output; + + output.diskID = volDisks.Extents[0].DiskNumber; + + return output; +} + +#elif defined FFS_LINUX +#endif +*/ + +/* +PERF NOTE + +-------------------------------------------- +|Testcase: Reading from two different disks| +-------------------------------------------- +Windows 7: + 1st(unbuffered) |2nd (OS buffered) + ---------------------------------- +1 Thread: 57s | 8s +2 Threads: 39s | 7s + +-------------------------------------------------- +|Testcase: Reading two directories from same disk| +-------------------------------------------------- +Windows 7: Windows XP: + 1st(unbuffered) |2nd (OS buffered) 1st(unbuffered) |2nd (OS buffered) + ---------------------------------- ---------------------------------- +1 Thread: 41s | 13s 1 Thread: 45s | 13s +2 Threads: 42s | 11s 2 Threads: 38s | 8s + +=> Traversing does not take any advantage of file locality so that even multiple threads operating on the same disk impose no performance overhead! +*/ + + +std::vector<std::set<DirectoryKey>> separateByDistinctDisk(const std::set<DirectoryKey>& dirkeys) +{ + //see perf note: use one thread per dirkey: + typedef std::map<int, std::set<DirectoryKey>> DiskKeyMapping; + DiskKeyMapping tmp; + int index = 0; + std::for_each(dirkeys.begin(), dirkeys.end(), + [&](const DirectoryKey& key) { tmp[++index].insert(key); }); + + /* + //use one thread per physical disk: + typedef std::map<DiskInfo, std::set<DirectoryKey>> DiskKeyMapping; + DiskKeyMapping tmp; + std::for_each(dirkeys.begin(), dirkeys.end(), + [&](const DirectoryKey& key) { tmp[retrieveDiskInfo(key.dirnameFull_)].insert(key); }); + */ + std::vector<std::set<DirectoryKey>> buckets; + std::transform(tmp.begin(), tmp.end(), std::back_inserter(buckets), + [&](const DiskKeyMapping::value_type& diskToKey) { return diskToKey.second; }); + return buckets; +} + +//------------------------------------------------------------------------------------------ +typedef Zbase<wchar_t, StorageRefCountThreadSafe> BasicWString; //thread safe string class for UI texts + + +class AsyncCallback +{ +public: + AsyncCallback() : + notifyingThreadID(-1), + textScanning(_("Scanning:")), + itemsScanned(0), + activeWorker(0) {} + + FillBufferCallback::HandleError reportError(const std::wstring& msg) //blocking call: context of worker thread + { + boost::unique_lock<boost::mutex> dummy(lockErrorMsg); + while (!errorMsg.empty() || errorResponse.get()) + conditionCanReportError.timed_wait(dummy, boost::posix_time::milliseconds(50)); //interruption point! + + errorMsg = BasicWString(msg); + + while (!errorResponse.get()) + conditionGotResponse.timed_wait(dummy, boost::posix_time::milliseconds(50)); //interruption point! + + FillBufferCallback::HandleError rv = *errorResponse; + + errorMsg.clear(); + errorResponse.reset(); + + //dummy.unlock(); + conditionCanReportError.notify_one(); + + return rv; + } + + void processErrors(FillBufferCallback& callback) //context of main thread, call repreatedly + { + boost::lock_guard<boost::mutex> dummy(lockErrorMsg); + if (!errorMsg.empty() && !errorResponse.get()) + { + FillBufferCallback::HandleError rv = callback.reportError(cvrtString<std::wstring>(errorMsg)); //throw! + errorResponse.reset(new FillBufferCallback::HandleError(rv)); + + //dummy.unlock(); + conditionGotResponse.notify_one(); + } + } + + void setNotifyingThread(int threadID) { notifyingThreadID = threadID; } //context of main thread + + void reportCurrentFile(const Zstring& filename, int threadID) //context of worker thread + { + if (threadID != notifyingThreadID) return; //only one thread may report status + + boost::lock_guard<boost::mutex> dummy(lockCurrentStatus); + currentFile = filename; + currentStatus.clear(); + } + + void reportCurrentStatus(const std::wstring& status, int threadID) //context of worker thread + { + if (threadID != notifyingThreadID) return; //only one thread may report status + + boost::lock_guard<boost::mutex> dummy(lockCurrentStatus); + currentFile.clear(); + currentStatus = BasicWString(status); //we cannot assume std::wstring to be thread safe (yet)! + } + + std::wstring getCurrentStatus() //context of main thread, call repreatedly + { + std::wstring filename; + std::wstring statusMsg; + { + boost::lock_guard<boost::mutex> dummy(lockCurrentStatus); + if (!currentFile.empty()) + filename = utf8CvrtTo<std::wstring>(currentFile); + else if (!currentStatus.empty()) + statusMsg = cvrtString<std::wstring>(currentStatus); + } + + if (!filename.empty()) + { + std::wstring statusText = cvrtString<std::wstring>(textScanning); + const long activeCount = activeWorker; + if (activeCount >= 2) + { + statusText += L" " + _P("[1 Thread]", "[%x Threads]", activeCount); + replace(statusText, L"%x", toString<std::wstring>(activeCount)); + } + statusText += std::wstring(L" \n") + L'\"' + filename + L'\"'; + return statusText; + } + else + return statusMsg; + } + + void incItemsScanned() { ++itemsScanned; } + long getItemsScanned() const { return itemsScanned; } + + void incActiveWorker() { ++activeWorker; } + void decActiveWorker() { --activeWorker; } + long getActiveWorker() const { return activeWorker; } + +private: + //---- error handling ---- + boost::mutex lockErrorMsg; + boost::condition_variable conditionCanReportError; + boost::condition_variable conditionGotResponse; + BasicWString errorMsg; + std::unique_ptr<FillBufferCallback::HandleError> errorResponse; + + //---- status updates ---- + volatile int notifyingThreadID; //theoretically racy, but there is nothing that could go wrong... + //CAVEAT: do NOT use boost::thread::id as long as this showstopper exists: https://svn.boost.org/trac/boost/ticket/5754 + boost::mutex lockCurrentStatus; //use a different lock for current file: continue traversing while some thread may process an error + Zstring currentFile; //only one of these two is filled at a time! + BasicWString currentStatus; // + + const BasicWString textScanning; //this one is (currently) not shared and could be made a std::wstring, but we stay consistent and use thread-safe variables in this class only! + + //---- status updates II (lock free) ---- + boost::detail::atomic_count itemsScanned; + boost::detail::atomic_count activeWorker; +}; +//------------------------------------------------------------------------------------------------- + +class DirCallback; + +struct TraverserShared +{ +public: + TraverserShared(int threadID, + SymLinkHandling handleSymlinks, + const HardFilter::FilterRef& filter, + std::set<Zstring>& failedReads, + AsyncCallback& acb) : + handleSymlinks_(handleSymlinks), + filterInstance(filter), + failedReads_(failedReads), + acb_(acb), + threadID_(threadID) {} + + typedef std::shared_ptr<DirCallback> CallbackPointer; + std::vector<CallbackPointer> callBackBox; //collection of callback pointers to handle ownership + + const SymLinkHandling handleSymlinks_; + const HardFilter::FilterRef filterInstance; //always bound! + + std::set<Zstring>& failedReads_; //relative postfixed names of directories that could not be read (empty for root) + + AsyncCallback& acb_; + int threadID_; +}; + + +class DirCallback : public zen::TraverseCallback +{ +public: + DirCallback(TraverserShared& config, + const Zstring& relNameParentPf, //postfixed with FILE_NAME_SEPARATOR! + DirContainer& output) : + cfg(config), + relNameParentPf_(relNameParentPf), + output_(output) {} + + virtual void onFile (const Zchar* shortName, const Zstring& fullName, const FileInfo& details); + virtual void onSymlink(const Zchar* shortName, const Zstring& fullName, const SymlinkInfo& details); + virtual ReturnValDir onDir (const Zchar* shortName, const Zstring& fullName); + virtual HandleError onError (const std::wstring& errorText); + +private: + TraverserShared& cfg; + const Zstring relNameParentPf_; + DirContainer& output_; +}; + + +void DirCallback::onFile(const Zchar* shortName, const Zstring& fullName, const FileInfo& details) +{ + boost::this_thread::interruption_point(); + + const Zstring fileNameShort = shortName; + + //do not list the database file(s) sync.ffs_db, sync.x64.ffs_db, etc. or lock files + if (endsWith(fileNameShort, SYNC_DB_FILE_ENDING) || + endsWith(fileNameShort, LOCK_FILE_ENDING)) + return; + + //update status information no matter whether object is excluded or not! + cfg.acb_.reportCurrentFile(fullName, cfg.threadID_); + + //------------------------------------------------------------------------------------ + //apply filter before processing (use relative name!) + if (!cfg.filterInstance->passFileFilter(relNameParentPf_ + fileNameShort)) + return; + + // std::string fileId = details.fileSize >= 1024 * 1024U ? + // util::retrieveFileID(fullName) : + // std::string(); + /* + Perf test Windows 7, SSD, 350k files, 50k dirs, files > 1MB: 7000 + regular: 6.9s + ID per file: 43.9s + ID per file > 1MB: 7.2s + ID per dir: 8.4s + + Linux: retrieveFileID takes about 50% longer in VM! (avoidable because of redundant stat() call!) + */ + + output_.addSubFile(fileNameShort, FileDescriptor(details.lastWriteTimeRaw, details.fileSize)); + + cfg.acb_.incItemsScanned(); //add 1 element to the progress indicator +} + + +void DirCallback::onSymlink(const Zchar* shortName, const Zstring& fullName, const SymlinkInfo& details) +{ + boost::this_thread::interruption_point(); + + if (cfg.handleSymlinks_ == SYMLINK_IGNORE) + return; + + //update status information no matter whether object is excluded or not! + cfg.acb_.reportCurrentFile(fullName, cfg.threadID_); + + //------------------------------------------------------------------------------------ + const Zstring& relName = relNameParentPf_ + shortName; + + //apply filter before processing (use relative name!) + if (!cfg.filterInstance->passFileFilter(relName)) //always use file filter: Link type may not be "stable" on Linux! + return; + + output_.addSubLink(shortName, LinkDescriptor(details.lastWriteTimeRaw, details.targetPath, details.dirLink ? LinkDescriptor::TYPE_DIR : LinkDescriptor::TYPE_FILE)); + + cfg.acb_.incItemsScanned(); //add 1 element to the progress indicator +} + + +TraverseCallback::ReturnValDir DirCallback::onDir(const Zchar* shortName, const Zstring& fullName) +{ + boost::this_thread::interruption_point(); + + //update status information no matter whether object is excluded or not! + cfg.acb_.reportCurrentFile(fullName, cfg.threadID_); + + //------------------------------------------------------------------------------------ + const Zstring& relName = relNameParentPf_ + shortName; + + //apply filter before processing (use relative name!) + bool subObjMightMatch = true; + const bool passFilter = cfg.filterInstance->passDirFilter(relName, &subObjMightMatch); + if (!passFilter && !subObjMightMatch) + return Int2Type<ReturnValDir::TRAVERSING_DIR_IGNORE>(); //do NOT traverse subdirs + //else: attention! ensure directory filtering is applied later to exclude actually filtered directories + + DirContainer& subDir = output_.addSubDir(shortName); + if (passFilter) + cfg.acb_.incItemsScanned(); //add 1 element to the progress indicator + + TraverserShared::CallbackPointer subDirCallback = std::make_shared<DirCallback>(cfg, relName + FILE_NAME_SEPARATOR, subDir); + cfg.callBackBox.push_back(subDirCallback); //handle lifetime + return ReturnValDir(Int2Type<ReturnValDir::TRAVERSING_DIR_CONTINUE>(), *subDirCallback); +} + + +DirCallback::HandleError DirCallback::onError(const std::wstring& errorText) +{ + switch (cfg.acb_.reportError(errorText)) + { + case FillBufferCallback::TRAV_ERROR_IGNORE: + cfg.failedReads_.insert(relNameParentPf_); + return TRAV_ERROR_IGNORE; + + case FillBufferCallback::TRAV_ERROR_RETRY: + return TRAV_ERROR_RETRY; + } + + assert(false); + return TRAV_ERROR_IGNORE; +} + + +#ifdef FFS_WIN +class DstHackCallbackImpl : public DstHackCallback +{ +public: + DstHackCallbackImpl(AsyncCallback& acb, int threadID) : + acb_(acb), + threadID_(threadID), + textApplyingDstHack(toZ(replaceCpy(_("Encoding extended time information: %x"), L"%x", L"\n\"%x\""))) {} + +private: + virtual void requestUiRefresh(const Zstring& filename) //applying DST hack imposes significant one-time performance drawback => callback to inform user + { + const Zstring statusText = replaceCpy(textApplyingDstHack, Zstr("%x"), filename); + acb_.reportCurrentStatus(utf8CvrtTo<std::wstring>(statusText), threadID_); + } + + AsyncCallback& acb_; + int threadID_; + const Zstring textApplyingDstHack; +}; +#endif +//------------------------------------------------------------------------------------------ + + +class WorkerThread +{ +public: + WorkerThread(int threadID, + const std::shared_ptr<AsyncCallback>& acb, + const std::vector<std::pair<DirectoryKey, DirectoryValue*>>& workload) : + threadID_(threadID), + acb_(acb), + workload_(workload) {} + + void operator()() //thread entry + { + acb_->incActiveWorker(); + ZEN_ON_BLOCK_EXIT(acb_->decActiveWorker();); + + std::for_each(workload_.begin(), workload_.end(), + [&](std::pair<DirectoryKey, DirectoryValue*>& item) + { + const Zstring& directoryName = item.first.dirnameFull_; + DirectoryValue& dirVal = *item.second; + + acb_->reportCurrentFile(directoryName, threadID_); //just in case first directory access is blocking + + TraverserShared travCfg(threadID_, + item.first.handleSymlinks_, //shared by all(!) instances of DirCallback while traversing a folder hierarchy + item.first.filter_, + dirVal.failedReads, + *acb_); + + DirCallback traverser(travCfg, + Zstring(), + dirVal.dirCont); + + bool followSymlinks = false; + switch (item.first.handleSymlinks_) + { + case SYMLINK_IGNORE: + followSymlinks = false; //=> symlinks will be reported via onSymlink() where they are excluded + break; + case SYMLINK_USE_DIRECTLY: + followSymlinks = false; + break; + case SYMLINK_FOLLOW_LINK: + followSymlinks = true; + break; + } + + DstHackCallback* dstCallbackPtr = NULL; +#ifdef FFS_WIN + DstHackCallbackImpl dstCallback(*acb_, threadID_); + dstCallbackPtr = &dstCallback; +#endif + + //get all files and folders from directoryPostfixed (and subdirectories) + traverseFolder(directoryName, followSymlinks, traverser, dstCallbackPtr); //exceptions may be thrown! + }); + } + +private: + int threadID_; + std::shared_ptr<AsyncCallback> acb_; + std::vector<std::pair<DirectoryKey, DirectoryValue*>> workload_; +}; +} + + +void zen::fillBuffer(const std::set<DirectoryKey>& keysToRead, //in + std::map<DirectoryKey, DirectoryValue>& buf, //out + FillBufferCallback& callback, + size_t updateInterval) +{ + buf.clear(); + + std::vector<std::set<DirectoryKey>> buckets = separateByDistinctDisk(keysToRead); //one bucket per physical device + + std::vector<boost::thread> worker; //note: GCC doesn't allow to construct an array of empty threads since they would be initialized by const boost::thread& + worker.reserve(buckets.size()); + + zen::ScopeGuard guardWorker = zen::makeGuard([&]() + { + std::for_each(worker.begin(), worker.end(), std::mem_fun_ref(&boost::thread::interrupt)); //interrupt all at once, then join + std::for_each(worker.begin(), worker.end(), std::mem_fun_ref(&boost::thread::join)); + }); + + std::shared_ptr<AsyncCallback> acb = std::make_shared<AsyncCallback>(); + + //init worker threads + for (auto iter = buckets.begin(); iter != buckets.end(); ++iter) + { + int threadID = iter - buckets.begin(); + const std::set<DirectoryKey>& bucket = *iter; + + std::vector<std::pair<DirectoryKey, DirectoryValue*>> workload; + std::for_each(bucket.begin(), bucket.end(), + [&](const DirectoryKey& key) + { + auto rv = buf.insert(std::make_pair(key, DirectoryValue())); + assert(rv.second); + workload.push_back(std::make_pair(key, &rv.first->second)); + }); + + worker.push_back(boost::thread(WorkerThread(threadID, acb, workload))); + } + + //wait until done + for (auto iter = worker.begin(); iter != worker.end(); ++iter) + { + boost::thread& wt = *iter; + int threadID = iter - worker.begin(); + + acb->setNotifyingThread(threadID); //process info messages of first (active) thread only + + do + { + //update status + callback.reportStatus(acb->getCurrentStatus(), acb->getItemsScanned()); //throw! + + //process errors + acb->processErrors(callback); + } + while (!wt.timed_join(boost::posix_time::milliseconds(updateInterval))); + } + + guardWorker.dismiss(); +} |