summaryrefslogtreecommitdiff
path: root/zen/stream_buffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'zen/stream_buffer.h')
-rw-r--r--zen/stream_buffer.h147
1 files changed, 97 insertions, 50 deletions
diff --git a/zen/stream_buffer.h b/zen/stream_buffer.h
index 8b8cd0d7..ee9e18fd 100644
--- a/zen/stream_buffer.h
+++ b/zen/stream_buffer.h
@@ -10,81 +10,78 @@
#include <condition_variable>
#include "ring_buffer.h"
#include "string_tools.h"
+#include "thread.h"
namespace zen
{
-/* implement streaming API on top of libcurl's icky callback-based design
+/* implement streaming API on top of libcurl's icky callback-based design
+ + curl uses READBUFFER_SIZE download buffer size, but returns via a retarded sendf.c::chop_write() writing in small junks of CURL_MAX_WRITE_SIZE (16 kB)
=> support copying arbitrarily-large files: https://freefilesync.org/forum/viewtopic.php?t=4471
=> maximum performance through async processing (prefetching + output buffer!)
=> cost per worker thread creation ~ 1/20 ms */
class AsyncStreamBuffer
{
public:
- explicit AsyncStreamBuffer(size_t bufferSize) : bufSize_(bufferSize) { ringBuf_.reserve(bufferSize); }
+ explicit AsyncStreamBuffer(size_t capacity) { ringBuf_.reserve(capacity); }
//context of input thread, blocking
- //return "bytesToRead" bytes unless end of stream!
- size_t read(void* buffer, size_t bytesToRead) //throw <write error>
+ size_t read(void* buffer, size_t bytesToRead) //throw <write error>; return "bytesToRead" bytes unless end of stream!
{
- if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check!
- throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__));
+ std::unique_lock dummy(lockStream_);
+ const auto bufStart = buffer;
- auto it = static_cast<std::byte*>(buffer);
- const auto itEnd = it + bytesToRead;
-
- for (std::unique_lock dummy(lockStream_); it != itEnd;)
+ while (bytesToRead > 0)
{
- assert(!errorRead_);
- conditionBytesWritten_.wait(dummy, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; });
-
- if (errorWrite_)
- std::rethrow_exception(errorWrite_); //throw <write error>
-
- const size_t junkSize = std::min(static_cast<size_t>(itEnd - it), ringBuf_.size());
- ringBuf_.extract_front(it, it + junkSize);
- it += junkSize;
-
- conditionBytesRead_.notify_all();
-
- if (eof_) //end of file
+ const size_t bytesRead = tryReadImpl(dummy, buffer, bytesToRead); //throw <write error>
+ if (bytesRead == 0) //end of file
break;
+ conditionBytesRead_.notify_all();
+ buffer = static_cast<std::byte*>(buffer) + bytesRead;
+ bytesToRead -= bytesRead;
}
+ return static_cast<std::byte*>(buffer) -
+ static_cast<std::byte*>(bufStart);
+ }
- const size_t bytesRead = it - static_cast<std::byte*>(buffer);
- totalBytesRead_ += bytesRead;
+ //context of input thread, blocking
+ size_t tryRead(void* buffer, size_t bytesToRead) //throw <write error>; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0!
+ {
+ size_t bytesRead = 0;
+ {
+ std::unique_lock dummy(lockStream_);
+ bytesRead = tryReadImpl(dummy, buffer, bytesToRead);
+ }
+ if (bytesRead > 0)
+ conditionBytesRead_.notify_all(); //...*outside* the lock
return bytesRead;
}
//context of output thread, blocking
void write(const void* buffer, size_t bytesToWrite) //throw <read error>
{
- totalBytesWritten_ += bytesToWrite; //bytes already processed as far as raw FTP access is concerned
-
- auto it = static_cast<const std::byte*>(buffer);
- const auto itEnd = it + bytesToWrite;
-
- for (std::unique_lock dummy(lockStream_); it != itEnd;)
+ std::unique_lock dummy(lockStream_);
+ while (bytesToWrite > 0)
{
- assert(!eof_ && !errorWrite_);
- /* => can't use InterruptibleThread's interruptibleWait() :(
- -> AsyncStreamBuffer is used for input and output streaming
- => both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait()
- => one of these usually called from main thread
- => but interruptibleWait() cannot be called from main thread! */
- conditionBytesRead_.wait(dummy, [this] { return errorRead_ || ringBuf_.size() < bufSize_; });
-
- if (errorRead_)
- std::rethrow_exception(errorRead_); //throw <read error>
-
- const size_t junkSize = std::min(static_cast<size_t>(itEnd - it), bufSize_ - ringBuf_.size());
- ringBuf_.insert_back(it, it + junkSize);
- it += junkSize;
-
+ const size_t bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite); //throw <read error>
conditionBytesWritten_.notify_all();
+ buffer = static_cast<const std::byte*>(buffer) + bytesWritten;
+ bytesToWrite -= bytesWritten;
}
}
+ //context of output thread, blocking
+ size_t tryWrite(const void* buffer, size_t bytesToWrite) //throw <read error>; may return short! CONTRACT: bytesToWrite > 0
+ {
+ size_t bytesWritten = 0;
+ {
+ std::unique_lock dummy(lockStream_);
+ bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite);
+ }
+ conditionBytesWritten_.notify_all(); //...*outside* the lock
+ return bytesWritten;
+ }
+
//context of output thread
void closeStream()
{
@@ -101,7 +98,7 @@ public:
{
{
std::lock_guard dummy(lockStream_);
- assert(!errorRead_);
+ assert(error && !errorRead_);
if (!errorRead_)
errorRead_ = error;
}
@@ -113,13 +110,16 @@ public:
{
{
std::lock_guard dummy(lockStream_);
- assert(!errorWrite_);
+ assert(error && !errorWrite_);
if (!errorWrite_)
errorWrite_ = error;
}
conditionBytesWritten_.notify_all();
}
+#if 0
+ //function not needed: when writing is completed successfully, no further error can occur!
+ // => caveat: writing is NOT done (yet) when closeStream() is called!
//context of *output* thread
void checkReadErrors() //throw <read error>
{
@@ -128,7 +128,8 @@ public:
std::rethrow_exception(errorRead_); //throw <read error>
}
-#if 0 //function not needed: when EOF is reached (without errors), reading is done => no further error can occur!
+ //function not needed: when EOF is reached (without errors), reading is done => no further error can occur!
+ //context of *input* thread
void checkWriteErrors() //throw <write error>
{
std::lock_guard dummy(lockStream_);
@@ -144,7 +145,53 @@ private:
AsyncStreamBuffer (const AsyncStreamBuffer&) = delete;
AsyncStreamBuffer& operator=(const AsyncStreamBuffer&) = delete;
- const size_t bufSize_;
+ //context of input thread, blocking
+ size_t tryReadImpl(std::unique_lock<std::mutex>& ul, void* buffer, size_t bytesToRead) //throw <write error>; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0!
+ {
+ if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check!
+ throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__));
+
+ assert(isLocked(lockStream_));
+ assert(!errorRead_);
+
+ conditionBytesWritten_.wait(ul, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; });
+
+ if (errorWrite_)
+ std::rethrow_exception(errorWrite_); //throw <write error>
+
+ const size_t junkSize = std::min(bytesToRead, ringBuf_.size());
+ ringBuf_.extract_front(static_cast<std::byte*>(buffer),
+ static_cast<std::byte*>(buffer)+ junkSize);
+ totalBytesRead_ += junkSize;
+ return junkSize;
+ }
+
+ //context of output thread, blocking
+ size_t tryWriteWhileImpl(std::unique_lock<std::mutex>& ul, const void* buffer, size_t bytesToWrite) //throw <read error>; may return short! CONTRACT: bytesToWrite > 0
+ {
+ if (bytesToWrite == 0)
+ throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__));
+
+ assert(isLocked(lockStream_));
+ assert(!eof_ && !errorWrite_);
+ /* => can't use InterruptibleThread's interruptibleWait() :(
+ -> AsyncStreamBuffer is used for input and output streaming
+ => both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait()
+ => one of these usually called from main thread
+ => but interruptibleWait() cannot be called from main thread! */
+ conditionBytesRead_.wait(ul, [this] { return errorRead_ || ringBuf_.size() < ringBuf_.capacity(); });
+
+ if (errorRead_)
+ std::rethrow_exception(errorRead_); //throw <read error>
+
+ const size_t junkSize = std::min(bytesToWrite, ringBuf_.capacity() - ringBuf_.size());
+
+ ringBuf_.insert_back(static_cast<const std::byte*>(buffer),
+ static_cast<const std::byte*>(buffer) + junkSize);
+ totalBytesWritten_ += junkSize;
+ return junkSize;
+ }
+
std::mutex lockStream_;
RingBuffer<std::byte> ringBuf_; //prefetch/output buffer
bool eof_ = false;
bgstack15