// ************************************************************************** // * This file is part of the FreeFileSync project. It is distributed under * // * GNU General Public License: http://www.gnu.org/licenses/gpl.html * // * Copyright (C) 2008-2011 ZenJu (zhnmju123 AT gmx.de) * // ************************************************************************** #include "parallel_scan.h" #include #include "db_file.h" #include "lock_holder.h" #include "../shared/i18n.h" #include "../shared/file_traverser.h" #include "../shared/file_error.h" #include "../shared/string_conv.h" #include "../shared/check_exist.h" #include "../shared/boost_thread_wrap.h" //include #include "loki/ScopeGuard.h" //#include "../shared/file_id.h" /* #ifdef FFS_WIN #include //includes "windows.h" #include "WinIoCtl.h" #elif defined FFS_LINUX #endif */ using namespace zen; #ifndef BOOST_HAS_THREADS #error just some paranoia check... #endif namespace { /* #ifdef FFS_WIN struct DiskInfo { DiskInfo() : driveType(DRIVE_UNKNOWN), diskID(-1) {} UINT driveType; int diskID; // -1 if id could not be determined, this one is filled if driveType == DRIVE_FIXED or DRIVE_REMOVABLE; }; inline bool operator<(const DiskInfo& lhs, const DiskInfo& rhs) { if (lhs.driveType != rhs.driveType) return lhs.driveType < rhs.driveType; if (lhs.diskID < 0 || rhs.diskID < 0) return false; //consider "same", reason: one volume may be uniquely associated with one disk, while the other volume is associated to the same disk AND another one! //volume <-> disk is 0..N:1..N return lhs.diskID < rhs.diskID ; } DiskInfo retrieveDiskInfo(const Zstring& pathName) { std::vector volName(std::max(pathName.size(), static_cast(10000))); DiskInfo output; //full pathName need not yet exist! if (!::GetVolumePathName(pathName.c_str(), //__in LPCTSTR lpszFileName, &volName[0], //__out LPTSTR lpszVolumePathName, static_cast(volName.size()))) //__in DWORD cchBufferLength return output; const Zstring rootPathName = &volName[0]; output.driveType = ::GetDriveType(rootPathName.c_str()); if (output.driveType == DRIVE_NO_ROOT_DIR) //these two should be the same error category output.driveType = DRIVE_UNKNOWN; if (output.driveType != DRIVE_FIXED && output.driveType != DRIVE_REMOVABLE) return output; //no reason to get disk ID //go and find disk id: //format into form: "\\.\C:" Zstring volnameFmt = rootPathName; if (endsWith(volnameFmt, FILE_NAME_SEPARATOR)) volnameFmt.resize(volnameFmt.size() - 1); volnameFmt = L"\\\\.\\" + volnameFmt; HANDLE hVolume = ::CreateFile(volnameFmt.c_str(), 0, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, 0, NULL); if (hVolume == INVALID_HANDLE_VALUE) return output; Loki::ScopeGuard dummy = Loki::MakeGuard(::CloseHandle, hVolume); (void)dummy; //silence warning "unused variable" std::vector buffer(sizeof(VOLUME_DISK_EXTENTS) + sizeof(DISK_EXTENT)); //reserve buffer for at most one disk! call below will then fail if volume spans multiple disks! DWORD bytesReturned = 0; if (!::DeviceIoControl(hVolume, // handle to device IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS, // dwIoControlCode NULL, // lpInBuffer 0, // nInBufferSize &buffer[0], // output buffer static_cast(buffer.size()), // size of output buffer &bytesReturned, // number of bytes returned NULL)) // OVERLAPPED structure return output; const VOLUME_DISK_EXTENTS& volDisks = *reinterpret_cast(&buffer[0]); if (volDisks.NumberOfDiskExtents != 1) return output; output.diskID = volDisks.Extents[0].DiskNumber; return output; } #elif defined FFS_LINUX #endif */ /* PERF NOTE -------------------------------------------- |Testcase: Reading from two different disks| -------------------------------------------- Windows 7: 1st(unbuffered) |2nd (OS buffered) ---------------------------------- 1 Thread: 57s | 8s 2 Threads: 39s | 7s -------------------------------------------------- |Testcase: Reading two directories from same disk| -------------------------------------------------- Windows 7: Windows XP: 1st(unbuffered) |2nd (OS buffered) 1st(unbuffered) |2nd (OS buffered) ---------------------------------- ---------------------------------- 1 Thread: 41s | 13s 1 Thread: 45s | 13s 2 Threads: 42s | 11s 2 Threads: 38s | 8s => Traversing does not take any advantage of file locality so that even multiple threads operating on the same disk impose no performance overhead. */ std::vector> separateByDistinctDisk(const std::set& dirkeys) { //see perf note: use one thread per dirkey: typedef std::map> DiskKeyMapping; DiskKeyMapping tmp; int index = 0; std::for_each(dirkeys.begin(), dirkeys.end(), [&](const DirectoryKey& key) { tmp[++index].insert(key); }); /* //use one thread per physical disk: typedef std::map> DiskKeyMapping; DiskKeyMapping tmp; std::for_each(dirkeys.begin(), dirkeys.end(), [&](const DirectoryKey& key) { tmp[retrieveDiskInfo(key.dirnameFull_)].insert(key); }); */ std::vector> buckets; std::transform(tmp.begin(), tmp.end(), std::back_inserter(buckets), [&](const DiskKeyMapping::value_type& diskToKey) { return diskToKey.second; }); return buckets; } //------------------------------------------------------------------------------------------ typedef Zbase BasicWString; //thread safe string class for UI texts class AsyncCallback { public: AsyncCallback() : notifyingThreadID(-1), textScanning(_("Scanning:")), itemsScanned(0), activeWorker(0) {} FillBufferCallback::HandleError reportError(const std::wstring& msg) //blocking call: context of worker thread { boost::unique_lock dummy(lockErrorMsg); while(!errorMsg.empty() || errorResponse.get()) conditionCanReportError.timed_wait(dummy, boost::posix_time::milliseconds(50)); //interruption point! errorMsg = BasicWString(msg); while(!errorResponse.get()) conditionGotResponse.timed_wait(dummy, boost::posix_time::milliseconds(50)); //interruption point! FillBufferCallback::HandleError rv = *errorResponse; errorMsg.clear(); errorResponse.reset(); conditionCanReportError.notify_one(); return rv; } void processErrors(FillBufferCallback& callback) //context of main thread, call repreatedly { boost::lock_guard dummy(lockErrorMsg); if (!errorMsg.empty() && !errorResponse.get()) { FillBufferCallback::HandleError rv = callback.reportError(cvrtString(errorMsg)); //throw! errorResponse.reset(new FillBufferCallback::HandleError(rv)); conditionGotResponse.notify_one(); } } void setNotifyingThread(int threadID) { notifyingThreadID = threadID; } //context of main thread void reportCurrentFile(const Zstring& filename, int threadID) //context of worker thread { if (threadID != notifyingThreadID) return; //only one thread may report status boost::lock_guard dummy(lockCurrentStatus); currentFile = filename; currentStatus.clear(); } void reportCurrentStatus(const std::wstring& status, int threadID) //context of worker thread { if (threadID != notifyingThreadID) return; //only one thread may report status boost::lock_guard dummy(lockCurrentStatus); currentFile.clear(); currentStatus = BasicWString(status); //we cannot assume std::wstring to be thread safe (yet)! } std::wstring getCurrentStatus() //context of main thread, call repreatedly { std::wstring filename; std::wstring statusMsg; { boost::lock_guard dummy(lockCurrentStatus); if (!currentFile.empty()) filename = utf8CvrtTo(currentFile); else if (!currentStatus.empty()) statusMsg = cvrtString(currentStatus); } if (!filename.empty()) { std::wstring statusText = cvrtString(textScanning); const long activeCount = activeWorker; if (activeCount >= 2) { statusText += L" " + _P("[1 Thread]", "[%x Threads]", activeCount); replace(statusText, L"%x", toString(activeCount)); } statusText += std::wstring(L" \n") + L'\"' + filename + L'\"'; return statusText; } else return statusMsg; } void incItemsScanned() { ++itemsScanned; } long getItemsScanned() const { return itemsScanned; } void incActiveWorker() { ++activeWorker; } void decActiveWorker() { --activeWorker; } long getActiveWorker() const { return activeWorker; } private: //---- error handling ---- boost::mutex lockErrorMsg; boost::condition_variable conditionCanReportError; boost::condition_variable conditionGotResponse; BasicWString errorMsg; std::unique_ptr errorResponse; //---- status updates ---- volatile int notifyingThreadID; //theoretically racy, but there is nothing that could go wrong... //CAVEAT: do NOT use boost::thread::id as long as this showstopper exists: https://svn.boost.org/trac/boost/ticket/5754 boost::mutex lockCurrentStatus; //use a different lock for current file: continue traversing while some thread may process an error Zstring currentFile; //only one of these two is filled at a time! BasicWString currentStatus; // const BasicWString textScanning; //this one is (currently) not shared and could be made a std::wstring, but we stay consistent and use thread-safe variables in this class only! //---- status updates II (lock free) ---- boost::detail::atomic_count itemsScanned; boost::detail::atomic_count activeWorker; }; //------------------------------------------------------------------------------------------------- class DirCallback; struct TraverserShared { public: TraverserShared(int threadID, SymLinkHandling handleSymlinks, const HardFilter::FilterRef& filter, std::set& failedReads, AsyncCallback& acb) : handleSymlinks_(handleSymlinks), filterInstance(filter), failedReads_(failedReads), acb_(acb), threadID_(threadID) {} typedef std::shared_ptr CallbackPointer; std::vector callBackBox; //collection of callback pointers to handle ownership const SymLinkHandling handleSymlinks_; const HardFilter::FilterRef filterInstance; //always bound! std::set& failedReads_; //relative postfixed names of directories that could not be read (empty for root) std::set excludedDirs; AsyncCallback& acb_; int threadID_; }; class DirCallback : public zen::TraverseCallback { public: DirCallback(TraverserShared& config, const Zstring& relNameParentPf, //postfixed with FILE_NAME_SEPARATOR! DirContainer& output) : cfg(config), relNameParentPf_(relNameParentPf), output_(output) {} virtual void onFile (const Zchar* shortName, const Zstring& fullName, const FileInfo& details); virtual void onSymlink(const Zchar* shortName, const Zstring& fullName, const SymlinkInfo& details); virtual ReturnValDir onDir (const Zchar* shortName, const Zstring& fullName); virtual HandleError onError (const std::wstring& errorText); private: TraverserShared& cfg; const Zstring relNameParentPf_; DirContainer& output_; }; void DirCallback::onFile(const Zchar* shortName, const Zstring& fullName, const FileInfo& details) { boost::this_thread::interruption_point(); const Zstring fileNameShort = shortName; //do not list the database file(s) sync.ffs_db, sync.x64.ffs_db, etc. or lock files if (endsWith(fileNameShort, SYNC_DB_FILE_ENDING) || endsWith(fileNameShort, LOCK_FILE_ENDING)) return; //update status information no matter whether object is excluded or not! cfg.acb_.reportCurrentFile(fullName, cfg.threadID_); //------------------------------------------------------------------------------------ //apply filter before processing (use relative name!) if (!cfg.filterInstance->passFileFilter(relNameParentPf_ + fileNameShort)) return; // std::string fileId = details.fileSize >= 1024 * 1024U ? // util::retrieveFileID(fullName) : // std::string(); /* Perf test Windows 7, SSD, 350k files, 50k dirs, files > 1MB: 7000 regular: 6.9s ID per file: 43.9s ID per file > 1MB: 7.2s ID per dir: 8.4s Linux: retrieveFileID takes about 50% longer in VM! (avoidable because of redundant stat() call!) */ output_.addSubFile(fileNameShort, FileDescriptor(details.lastWriteTimeRaw, details.fileSize)); cfg.acb_.incItemsScanned(); //add 1 element to the progress indicator } void DirCallback::onSymlink(const Zchar* shortName, const Zstring& fullName, const SymlinkInfo& details) { boost::this_thread::interruption_point(); if (cfg.handleSymlinks_ == SYMLINK_IGNORE) return; //update status information no matter whether object is excluded or not! cfg.acb_.reportCurrentFile(fullName, cfg.threadID_); //------------------------------------------------------------------------------------ const Zstring& relName = relNameParentPf_ + shortName; //apply filter before processing (use relative name!) if (!cfg.filterInstance->passFileFilter(relName)) //always use file filter: Link type may not be "stable" on Linux! return; output_.addSubLink(shortName, LinkDescriptor(details.lastWriteTimeRaw, details.targetPath, details.dirLink ? LinkDescriptor::TYPE_DIR : LinkDescriptor::TYPE_FILE)); cfg.acb_.incItemsScanned(); //add 1 element to the progress indicator } TraverseCallback::ReturnValDir DirCallback::onDir(const Zchar* shortName, const Zstring& fullName) { boost::this_thread::interruption_point(); //update status information no matter whether object is excluded or not! cfg.acb_.reportCurrentFile(fullName, cfg.threadID_); //------------------------------------------------------------------------------------ const Zstring& relName = relNameParentPf_ + shortName; //apply filter before processing (use relative name!) bool subObjMightMatch = true; const bool passFilter = cfg.filterInstance->passDirFilter(relName, &subObjMightMatch); if (!passFilter && !subObjMightMatch) return Loki::Int2Type(); //do NOT traverse subdirs //else: attention! ensure directory filtering is applied later to exclude actually filtered directories DirContainer& subDir = output_.addSubDir(shortName); if (passFilter) cfg.acb_.incItemsScanned(); //add 1 element to the progress indicator else cfg.excludedDirs.insert(&subDir); TraverserShared::CallbackPointer subDirCallback = std::make_shared(cfg, relName + FILE_NAME_SEPARATOR, subDir); cfg.callBackBox.push_back(subDirCallback); //handle lifetime return ReturnValDir(Loki::Int2Type(), *subDirCallback); } DirCallback::HandleError DirCallback::onError(const std::wstring& errorText) { switch (cfg.acb_.reportError(errorText)) { case FillBufferCallback::TRAV_ERROR_IGNORE: cfg.failedReads_.insert(relNameParentPf_); return TRAV_ERROR_IGNORE; case FillBufferCallback::TRAV_ERROR_RETRY: return TRAV_ERROR_RETRY; } assert(false); return TRAV_ERROR_IGNORE; } #ifdef FFS_WIN class DstHackCallbackImpl : public DstHackCallback { public: DstHackCallbackImpl(AsyncCallback& acb, int threadID) : acb_(acb), threadID_(threadID), textApplyingDstHack(toZ(_("Encoding extended time information: %x")).Replace(Zstr("%x"), Zstr("\n\"%x\""))) {} private: virtual void requestUiRefresh(const Zstring& filename) //applying DST hack imposes significant one-time performance drawback => callback to inform user { Zstring statusText = textApplyingDstHack; replace(statusText, Zstr("%x"), filename); acb_.reportCurrentStatus(utf8CvrtTo(statusText), threadID_); } AsyncCallback& acb_; int threadID_; const Zstring textApplyingDstHack; }; #endif //------------------------------------------------------------------------------------------ template inline void map_remove_if(M& map, Predicate p) { for (auto iter = map.begin(); iter != map.end();) if (p(*iter)) map.erase(iter++); else ++iter; } void removeFilteredDirs(DirContainer& dirCont, const std::set& excludedDirs) { //process subdirs recursively std::for_each(dirCont.dirs.begin(), dirCont.dirs.end(), [&](std::pair& item) { removeFilteredDirs(item.second, excludedDirs); }); //remove superfluous directories map_remove_if(dirCont.dirs, [&](std::pair& item) -> bool { DirContainer& subDir = item.second; return subDir.dirs .empty() && subDir.files.empty() && subDir.links.empty() && excludedDirs.find(&subDir) != excludedDirs.end(); }); } class WorkerThread { public: WorkerThread(int threadID, const std::shared_ptr& acb, const std::vector>& workload) : threadID_(threadID), acb_(acb), workload_(workload) {} void operator()() //thread entry { acb_->incActiveWorker(); Loki::ScopeGuard dummy = Loki::MakeGuard([&]() { acb_->decActiveWorker(); }); (void)dummy; std::for_each(workload_.begin(), workload_.end(), [&](std::pair& item) { const Zstring& directoryName = item.first.dirnameFull_; DirectoryValue& dirVal = *item.second; acb_->reportCurrentFile(directoryName, threadID_); //just in case directory existence check is blocking! if (!directoryName.empty() && util::dirExistsAsync(directoryName).get()) //blocking + interruption point! //folder existence already checked in startCompareProcess(): do not treat as error when arriving here! //perf note: missing network drives should not delay here, as Windows buffers result of last existence check for a short time { TraverserShared travCfg(threadID_, item.first.handleSymlinks_, //shared by all(!) instances of DirCallback while traversing a folder hierarchy item.first.filter_, dirVal.failedReads, *acb_); DirCallback traverser(travCfg, Zstring(), dirVal.dirCont); bool followSymlinks = false; switch (item.first.handleSymlinks_) { case SYMLINK_IGNORE: followSymlinks = false; //=> symlinks will be reported via onSymlink() where they are excluded break; case SYMLINK_USE_DIRECTLY: followSymlinks = false; break; case SYMLINK_FOLLOW_LINK: followSymlinks = true; break; } DstHackCallback* dstCallbackPtr = NULL; #ifdef FFS_WIN DstHackCallbackImpl dstCallback(*acb_, threadID_); dstCallbackPtr = &dstCallback; #endif //get all files and folders from directoryPostfixed (and subdirectories) traverseFolder(directoryName, followSymlinks, traverser, dstCallbackPtr); //exceptions may be thrown! //attention: some filtered directories are still in the comparison result! (see include filter handling!) if (!travCfg.excludedDirs.empty()) removeFilteredDirs(dirVal.dirCont, travCfg.excludedDirs); //remove all excluded directories (but keeps those serving as parent folders for not excl. elements) } }); } private: int threadID_; std::shared_ptr acb_; std::vector> workload_; }; } void zen::fillBuffer(const std::set& keysToRead, //in std::map& buf, //out FillBufferCallback& callback, size_t statusInterval) { buf.clear(); std::vector> buckets = separateByDistinctDisk(keysToRead); //one bucket per physical device std::vector worker; //note: GCC doesn't allow to construct an array of empty threads since they would be initialized by const boost::thread& worker.reserve(buckets.size()); Loki::ScopeGuard guardWorker = Loki::MakeGuard([&]() { std::for_each(worker.begin(), worker.end(), std::mem_fun_ref(&boost::thread::interrupt)); //interrupt all at once, then join std::for_each(worker.begin(), worker.end(), std::mem_fun_ref(&boost::thread::join)); }); std::shared_ptr acb = std::make_shared(); //init worker threads for (auto iter = buckets.begin(); iter != buckets.end(); ++iter) { int threadID = iter - buckets.begin(); const std::set& bucket = *iter; std::vector> workload; std::for_each(bucket.begin(), bucket.end(), [&](const DirectoryKey& key) { auto rv = buf.insert(std::make_pair(key, DirectoryValue())); assert(rv.second); workload.push_back(std::make_pair(key, &rv.first->second)); }); worker.push_back(boost::thread(WorkerThread(threadID, acb, workload))); } //wait until done for (auto iter = worker.begin(); iter != worker.end(); ++iter) { boost::thread& wt = *iter; int threadID = iter - worker.begin(); acb->setNotifyingThread(threadID); //process info messages of first (active) thread only do { //update status callback.reportStatus(acb->getCurrentStatus(), acb->getItemsScanned()); //throw! //process errors acb->processErrors(callback); } while (!wt.timed_join(boost::posix_time::milliseconds(statusInterval))); } guardWorker.Dismiss(); }