diff options
Diffstat (limited to 'zen/stream_buffer.h')
-rw-r--r-- | zen/stream_buffer.h | 147 |
1 files changed, 97 insertions, 50 deletions
diff --git a/zen/stream_buffer.h b/zen/stream_buffer.h index 8b8cd0d7..ee9e18fd 100644 --- a/zen/stream_buffer.h +++ b/zen/stream_buffer.h @@ -10,81 +10,78 @@ #include <condition_variable> #include "ring_buffer.h" #include "string_tools.h" +#include "thread.h" namespace zen { -/* implement streaming API on top of libcurl's icky callback-based design +/* implement streaming API on top of libcurl's icky callback-based design + + curl uses READBUFFER_SIZE download buffer size, but returns via a retarded sendf.c::chop_write() writing in small junks of CURL_MAX_WRITE_SIZE (16 kB) => support copying arbitrarily-large files: https://freefilesync.org/forum/viewtopic.php?t=4471 => maximum performance through async processing (prefetching + output buffer!) => cost per worker thread creation ~ 1/20 ms */ class AsyncStreamBuffer { public: - explicit AsyncStreamBuffer(size_t bufferSize) : bufSize_(bufferSize) { ringBuf_.reserve(bufferSize); } + explicit AsyncStreamBuffer(size_t capacity) { ringBuf_.reserve(capacity); } //context of input thread, blocking - //return "bytesToRead" bytes unless end of stream! - size_t read(void* buffer, size_t bytesToRead) //throw <write error> + size_t read(void* buffer, size_t bytesToRead) //throw <write error>; return "bytesToRead" bytes unless end of stream! { - if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check! - throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); + std::unique_lock dummy(lockStream_); + const auto bufStart = buffer; - auto it = static_cast<std::byte*>(buffer); - const auto itEnd = it + bytesToRead; - - for (std::unique_lock dummy(lockStream_); it != itEnd;) + while (bytesToRead > 0) { - assert(!errorRead_); - conditionBytesWritten_.wait(dummy, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; }); - - if (errorWrite_) - std::rethrow_exception(errorWrite_); //throw <write error> - - const size_t junkSize = std::min(static_cast<size_t>(itEnd - it), ringBuf_.size()); - ringBuf_.extract_front(it, it + junkSize); - it += junkSize; - - conditionBytesRead_.notify_all(); - - if (eof_) //end of file + const size_t bytesRead = tryReadImpl(dummy, buffer, bytesToRead); //throw <write error> + if (bytesRead == 0) //end of file break; + conditionBytesRead_.notify_all(); + buffer = static_cast<std::byte*>(buffer) + bytesRead; + bytesToRead -= bytesRead; } + return static_cast<std::byte*>(buffer) - + static_cast<std::byte*>(bufStart); + } - const size_t bytesRead = it - static_cast<std::byte*>(buffer); - totalBytesRead_ += bytesRead; + //context of input thread, blocking + size_t tryRead(void* buffer, size_t bytesToRead) //throw <write error>; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0! + { + size_t bytesRead = 0; + { + std::unique_lock dummy(lockStream_); + bytesRead = tryReadImpl(dummy, buffer, bytesToRead); + } + if (bytesRead > 0) + conditionBytesRead_.notify_all(); //...*outside* the lock return bytesRead; } //context of output thread, blocking void write(const void* buffer, size_t bytesToWrite) //throw <read error> { - totalBytesWritten_ += bytesToWrite; //bytes already processed as far as raw FTP access is concerned - - auto it = static_cast<const std::byte*>(buffer); - const auto itEnd = it + bytesToWrite; - - for (std::unique_lock dummy(lockStream_); it != itEnd;) + std::unique_lock dummy(lockStream_); + while (bytesToWrite > 0) { - assert(!eof_ && !errorWrite_); - /* => can't use InterruptibleThread's interruptibleWait() :( - -> AsyncStreamBuffer is used for input and output streaming - => both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait() - => one of these usually called from main thread - => but interruptibleWait() cannot be called from main thread! */ - conditionBytesRead_.wait(dummy, [this] { return errorRead_ || ringBuf_.size() < bufSize_; }); - - if (errorRead_) - std::rethrow_exception(errorRead_); //throw <read error> - - const size_t junkSize = std::min(static_cast<size_t>(itEnd - it), bufSize_ - ringBuf_.size()); - ringBuf_.insert_back(it, it + junkSize); - it += junkSize; - + const size_t bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite); //throw <read error> conditionBytesWritten_.notify_all(); + buffer = static_cast<const std::byte*>(buffer) + bytesWritten; + bytesToWrite -= bytesWritten; } } + //context of output thread, blocking + size_t tryWrite(const void* buffer, size_t bytesToWrite) //throw <read error>; may return short! CONTRACT: bytesToWrite > 0 + { + size_t bytesWritten = 0; + { + std::unique_lock dummy(lockStream_); + bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite); + } + conditionBytesWritten_.notify_all(); //...*outside* the lock + return bytesWritten; + } + //context of output thread void closeStream() { @@ -101,7 +98,7 @@ public: { { std::lock_guard dummy(lockStream_); - assert(!errorRead_); + assert(error && !errorRead_); if (!errorRead_) errorRead_ = error; } @@ -113,13 +110,16 @@ public: { { std::lock_guard dummy(lockStream_); - assert(!errorWrite_); + assert(error && !errorWrite_); if (!errorWrite_) errorWrite_ = error; } conditionBytesWritten_.notify_all(); } +#if 0 + //function not needed: when writing is completed successfully, no further error can occur! + // => caveat: writing is NOT done (yet) when closeStream() is called! //context of *output* thread void checkReadErrors() //throw <read error> { @@ -128,7 +128,8 @@ public: std::rethrow_exception(errorRead_); //throw <read error> } -#if 0 //function not needed: when EOF is reached (without errors), reading is done => no further error can occur! + //function not needed: when EOF is reached (without errors), reading is done => no further error can occur! + //context of *input* thread void checkWriteErrors() //throw <write error> { std::lock_guard dummy(lockStream_); @@ -144,7 +145,53 @@ private: AsyncStreamBuffer (const AsyncStreamBuffer&) = delete; AsyncStreamBuffer& operator=(const AsyncStreamBuffer&) = delete; - const size_t bufSize_; + //context of input thread, blocking + size_t tryReadImpl(std::unique_lock<std::mutex>& ul, void* buffer, size_t bytesToRead) //throw <write error>; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0! + { + if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check! + throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); + + assert(isLocked(lockStream_)); + assert(!errorRead_); + + conditionBytesWritten_.wait(ul, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; }); + + if (errorWrite_) + std::rethrow_exception(errorWrite_); //throw <write error> + + const size_t junkSize = std::min(bytesToRead, ringBuf_.size()); + ringBuf_.extract_front(static_cast<std::byte*>(buffer), + static_cast<std::byte*>(buffer)+ junkSize); + totalBytesRead_ += junkSize; + return junkSize; + } + + //context of output thread, blocking + size_t tryWriteWhileImpl(std::unique_lock<std::mutex>& ul, const void* buffer, size_t bytesToWrite) //throw <read error>; may return short! CONTRACT: bytesToWrite > 0 + { + if (bytesToWrite == 0) + throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); + + assert(isLocked(lockStream_)); + assert(!eof_ && !errorWrite_); + /* => can't use InterruptibleThread's interruptibleWait() :( + -> AsyncStreamBuffer is used for input and output streaming + => both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait() + => one of these usually called from main thread + => but interruptibleWait() cannot be called from main thread! */ + conditionBytesRead_.wait(ul, [this] { return errorRead_ || ringBuf_.size() < ringBuf_.capacity(); }); + + if (errorRead_) + std::rethrow_exception(errorRead_); //throw <read error> + + const size_t junkSize = std::min(bytesToWrite, ringBuf_.capacity() - ringBuf_.size()); + + ringBuf_.insert_back(static_cast<const std::byte*>(buffer), + static_cast<const std::byte*>(buffer) + junkSize); + totalBytesWritten_ += junkSize; + return junkSize; + } + std::mutex lockStream_; RingBuffer<std::byte> ringBuf_; //prefetch/output buffer bool eof_ = false; |