From cab22f2dc3c5f41b5163f74cbb233e390edff6ff Mon Sep 17 00:00:00 2001 From: "B. Stack" Date: Tue, 11 Oct 2022 11:16:39 -0400 Subject: add upstream 11.26 --- zen/stream_buffer.h | 147 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 97 insertions(+), 50 deletions(-) (limited to 'zen/stream_buffer.h') diff --git a/zen/stream_buffer.h b/zen/stream_buffer.h index 8b8cd0d7..ee9e18fd 100644 --- a/zen/stream_buffer.h +++ b/zen/stream_buffer.h @@ -10,81 +10,78 @@ #include #include "ring_buffer.h" #include "string_tools.h" +#include "thread.h" namespace zen { -/* implement streaming API on top of libcurl's icky callback-based design +/* implement streaming API on top of libcurl's icky callback-based design + + curl uses READBUFFER_SIZE download buffer size, but returns via a retarded sendf.c::chop_write() writing in small junks of CURL_MAX_WRITE_SIZE (16 kB) => support copying arbitrarily-large files: https://freefilesync.org/forum/viewtopic.php?t=4471 => maximum performance through async processing (prefetching + output buffer!) => cost per worker thread creation ~ 1/20 ms */ class AsyncStreamBuffer { public: - explicit AsyncStreamBuffer(size_t bufferSize) : bufSize_(bufferSize) { ringBuf_.reserve(bufferSize); } + explicit AsyncStreamBuffer(size_t capacity) { ringBuf_.reserve(capacity); } //context of input thread, blocking - //return "bytesToRead" bytes unless end of stream! - size_t read(void* buffer, size_t bytesToRead) //throw + size_t read(void* buffer, size_t bytesToRead) //throw ; return "bytesToRead" bytes unless end of stream! { - if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check! - throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo(__LINE__)); + std::unique_lock dummy(lockStream_); + const auto bufStart = buffer; - auto it = static_cast(buffer); - const auto itEnd = it + bytesToRead; - - for (std::unique_lock dummy(lockStream_); it != itEnd;) + while (bytesToRead > 0) { - assert(!errorRead_); - conditionBytesWritten_.wait(dummy, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; }); - - if (errorWrite_) - std::rethrow_exception(errorWrite_); //throw - - const size_t junkSize = std::min(static_cast(itEnd - it), ringBuf_.size()); - ringBuf_.extract_front(it, it + junkSize); - it += junkSize; - - conditionBytesRead_.notify_all(); - - if (eof_) //end of file + const size_t bytesRead = tryReadImpl(dummy, buffer, bytesToRead); //throw + if (bytesRead == 0) //end of file break; + conditionBytesRead_.notify_all(); + buffer = static_cast(buffer) + bytesRead; + bytesToRead -= bytesRead; } + return static_cast(buffer) - + static_cast(bufStart); + } - const size_t bytesRead = it - static_cast(buffer); - totalBytesRead_ += bytesRead; + //context of input thread, blocking + size_t tryRead(void* buffer, size_t bytesToRead) //throw ; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0! + { + size_t bytesRead = 0; + { + std::unique_lock dummy(lockStream_); + bytesRead = tryReadImpl(dummy, buffer, bytesToRead); + } + if (bytesRead > 0) + conditionBytesRead_.notify_all(); //...*outside* the lock return bytesRead; } //context of output thread, blocking void write(const void* buffer, size_t bytesToWrite) //throw { - totalBytesWritten_ += bytesToWrite; //bytes already processed as far as raw FTP access is concerned - - auto it = static_cast(buffer); - const auto itEnd = it + bytesToWrite; - - for (std::unique_lock dummy(lockStream_); it != itEnd;) + std::unique_lock dummy(lockStream_); + while (bytesToWrite > 0) { - assert(!eof_ && !errorWrite_); - /* => can't use InterruptibleThread's interruptibleWait() :( - -> AsyncStreamBuffer is used for input and output streaming - => both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait() - => one of these usually called from main thread - => but interruptibleWait() cannot be called from main thread! */ - conditionBytesRead_.wait(dummy, [this] { return errorRead_ || ringBuf_.size() < bufSize_; }); - - if (errorRead_) - std::rethrow_exception(errorRead_); //throw - - const size_t junkSize = std::min(static_cast(itEnd - it), bufSize_ - ringBuf_.size()); - ringBuf_.insert_back(it, it + junkSize); - it += junkSize; - + const size_t bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite); //throw conditionBytesWritten_.notify_all(); + buffer = static_cast(buffer) + bytesWritten; + bytesToWrite -= bytesWritten; } } + //context of output thread, blocking + size_t tryWrite(const void* buffer, size_t bytesToWrite) //throw ; may return short! CONTRACT: bytesToWrite > 0 + { + size_t bytesWritten = 0; + { + std::unique_lock dummy(lockStream_); + bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite); + } + conditionBytesWritten_.notify_all(); //...*outside* the lock + return bytesWritten; + } + //context of output thread void closeStream() { @@ -101,7 +98,7 @@ public: { { std::lock_guard dummy(lockStream_); - assert(!errorRead_); + assert(error && !errorRead_); if (!errorRead_) errorRead_ = error; } @@ -113,13 +110,16 @@ public: { { std::lock_guard dummy(lockStream_); - assert(!errorWrite_); + assert(error && !errorWrite_); if (!errorWrite_) errorWrite_ = error; } conditionBytesWritten_.notify_all(); } +#if 0 + //function not needed: when writing is completed successfully, no further error can occur! + // => caveat: writing is NOT done (yet) when closeStream() is called! //context of *output* thread void checkReadErrors() //throw { @@ -128,7 +128,8 @@ public: std::rethrow_exception(errorRead_); //throw } -#if 0 //function not needed: when EOF is reached (without errors), reading is done => no further error can occur! + //function not needed: when EOF is reached (without errors), reading is done => no further error can occur! + //context of *input* thread void checkWriteErrors() //throw { std::lock_guard dummy(lockStream_); @@ -144,7 +145,53 @@ private: AsyncStreamBuffer (const AsyncStreamBuffer&) = delete; AsyncStreamBuffer& operator=(const AsyncStreamBuffer&) = delete; - const size_t bufSize_; + //context of input thread, blocking + size_t tryReadImpl(std::unique_lock& ul, void* buffer, size_t bytesToRead) //throw ; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0! + { + if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check! + throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo(__LINE__)); + + assert(isLocked(lockStream_)); + assert(!errorRead_); + + conditionBytesWritten_.wait(ul, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; }); + + if (errorWrite_) + std::rethrow_exception(errorWrite_); //throw + + const size_t junkSize = std::min(bytesToRead, ringBuf_.size()); + ringBuf_.extract_front(static_cast(buffer), + static_cast(buffer)+ junkSize); + totalBytesRead_ += junkSize; + return junkSize; + } + + //context of output thread, blocking + size_t tryWriteWhileImpl(std::unique_lock& ul, const void* buffer, size_t bytesToWrite) //throw ; may return short! CONTRACT: bytesToWrite > 0 + { + if (bytesToWrite == 0) + throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo(__LINE__)); + + assert(isLocked(lockStream_)); + assert(!eof_ && !errorWrite_); + /* => can't use InterruptibleThread's interruptibleWait() :( + -> AsyncStreamBuffer is used for input and output streaming + => both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait() + => one of these usually called from main thread + => but interruptibleWait() cannot be called from main thread! */ + conditionBytesRead_.wait(ul, [this] { return errorRead_ || ringBuf_.size() < ringBuf_.capacity(); }); + + if (errorRead_) + std::rethrow_exception(errorRead_); //throw + + const size_t junkSize = std::min(bytesToWrite, ringBuf_.capacity() - ringBuf_.size()); + + ringBuf_.insert_back(static_cast(buffer), + static_cast(buffer) + junkSize); + totalBytesWritten_ += junkSize; + return junkSize; + } + std::mutex lockStream_; RingBuffer ringBuf_; //prefetch/output buffer bool eof_ = false; -- cgit