summaryrefslogtreecommitdiff
path: root/zen/stream_buffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'zen/stream_buffer.h')
-rw-r--r--zen/stream_buffer.h161
1 files changed, 161 insertions, 0 deletions
diff --git a/zen/stream_buffer.h b/zen/stream_buffer.h
new file mode 100644
index 00000000..8b8cd0d7
--- /dev/null
+++ b/zen/stream_buffer.h
@@ -0,0 +1,161 @@
+// *****************************************************************************
+// * This file is part of the FreeFileSync project. It is distributed under *
+// * GNU General Public License: https://www.gnu.org/licenses/gpl-3.0 *
+// * Copyright (C) Zenju (zenju AT freefilesync DOT org) - All Rights Reserved *
+// *****************************************************************************
+
+#ifndef STREAM_BUFFER_H_08492572089560298
+#define STREAM_BUFFER_H_08492572089560298
+
+#include <condition_variable>
+#include "ring_buffer.h"
+#include "string_tools.h"
+
+
+namespace zen
+{
+/* implement streaming API on top of libcurl's icky callback-based design
+ => support copying arbitrarily-large files: https://freefilesync.org/forum/viewtopic.php?t=4471
+ => maximum performance through async processing (prefetching + output buffer!)
+ => cost per worker thread creation ~ 1/20 ms */
+class AsyncStreamBuffer
+{
+public:
+ explicit AsyncStreamBuffer(size_t bufferSize) : bufSize_(bufferSize) { ringBuf_.reserve(bufferSize); }
+
+ //context of input thread, blocking
+ //return "bytesToRead" bytes unless end of stream!
+ size_t read(void* buffer, size_t bytesToRead) //throw <write error>
+ {
+ if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check!
+ throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__));
+
+ auto it = static_cast<std::byte*>(buffer);
+ const auto itEnd = it + bytesToRead;
+
+ for (std::unique_lock dummy(lockStream_); it != itEnd;)
+ {
+ assert(!errorRead_);
+ conditionBytesWritten_.wait(dummy, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; });
+
+ if (errorWrite_)
+ std::rethrow_exception(errorWrite_); //throw <write error>
+
+ const size_t junkSize = std::min(static_cast<size_t>(itEnd - it), ringBuf_.size());
+ ringBuf_.extract_front(it, it + junkSize);
+ it += junkSize;
+
+ conditionBytesRead_.notify_all();
+
+ if (eof_) //end of file
+ break;
+ }
+
+ const size_t bytesRead = it - static_cast<std::byte*>(buffer);
+ totalBytesRead_ += bytesRead;
+ return bytesRead;
+ }
+
+ //context of output thread, blocking
+ void write(const void* buffer, size_t bytesToWrite) //throw <read error>
+ {
+ totalBytesWritten_ += bytesToWrite; //bytes already processed as far as raw FTP access is concerned
+
+ auto it = static_cast<const std::byte*>(buffer);
+ const auto itEnd = it + bytesToWrite;
+
+ for (std::unique_lock dummy(lockStream_); it != itEnd;)
+ {
+ assert(!eof_ && !errorWrite_);
+ /* => can't use InterruptibleThread's interruptibleWait() :(
+ -> AsyncStreamBuffer is used for input and output streaming
+ => both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait()
+ => one of these usually called from main thread
+ => but interruptibleWait() cannot be called from main thread! */
+ conditionBytesRead_.wait(dummy, [this] { return errorRead_ || ringBuf_.size() < bufSize_; });
+
+ if (errorRead_)
+ std::rethrow_exception(errorRead_); //throw <read error>
+
+ const size_t junkSize = std::min(static_cast<size_t>(itEnd - it), bufSize_ - ringBuf_.size());
+ ringBuf_.insert_back(it, it + junkSize);
+ it += junkSize;
+
+ conditionBytesWritten_.notify_all();
+ }
+ }
+
+ //context of output thread
+ void closeStream()
+ {
+ {
+ std::lock_guard dummy(lockStream_);
+ assert(!eof_ && !errorWrite_);
+ eof_ = true;
+ }
+ conditionBytesWritten_.notify_all();
+ }
+
+ //context of input thread
+ void setReadError(const std::exception_ptr& error)
+ {
+ {
+ std::lock_guard dummy(lockStream_);
+ assert(!errorRead_);
+ if (!errorRead_)
+ errorRead_ = error;
+ }
+ conditionBytesRead_.notify_all();
+ }
+
+ //context of output thread
+ void setWriteError(const std::exception_ptr& error)
+ {
+ {
+ std::lock_guard dummy(lockStream_);
+ assert(!errorWrite_);
+ if (!errorWrite_)
+ errorWrite_ = error;
+ }
+ conditionBytesWritten_.notify_all();
+ }
+
+ //context of *output* thread
+ void checkReadErrors() //throw <read error>
+ {
+ std::lock_guard dummy(lockStream_);
+ if (errorRead_)
+ std::rethrow_exception(errorRead_); //throw <read error>
+ }
+
+#if 0 //function not needed: when EOF is reached (without errors), reading is done => no further error can occur!
+ void checkWriteErrors() //throw <write error>
+ {
+ std::lock_guard dummy(lockStream_);
+ if (errorWrite_)
+ std::rethrow_exception(errorWrite_); //throw <write error>
+ }
+#endif
+
+ uint64_t getTotalBytesWritten() const { return totalBytesWritten_; }
+ uint64_t getTotalBytesRead () const { return totalBytesRead_; }
+
+private:
+ AsyncStreamBuffer (const AsyncStreamBuffer&) = delete;
+ AsyncStreamBuffer& operator=(const AsyncStreamBuffer&) = delete;
+
+ const size_t bufSize_;
+ std::mutex lockStream_;
+ RingBuffer<std::byte> ringBuf_; //prefetch/output buffer
+ bool eof_ = false;
+ std::exception_ptr errorWrite_;
+ std::exception_ptr errorRead_;
+ std::condition_variable conditionBytesWritten_;
+ std::condition_variable conditionBytesRead_;
+
+ std::atomic<uint64_t> totalBytesWritten_{0}; //std:atomic is uninitialized by default!
+ std::atomic<uint64_t> totalBytesRead_ {0}; //
+};
+}
+
+#endif //STREAM_BUFFER_H_08492572089560298
bgstack15