diff options
Diffstat (limited to 'zen/serialize.h')
-rw-r--r-- | zen/serialize.h | 300 |
1 files changed, 230 insertions, 70 deletions
diff --git a/zen/serialize.h b/zen/serialize.h index 26202d96..53a6fc62 100644 --- a/zen/serialize.h +++ b/zen/serialize.h @@ -7,6 +7,7 @@ #ifndef SERIALIZE_H_839405783574356 #define SERIALIZE_H_839405783574356 +//#include <bit> #include <functional> #include "sys_error.h" //keep header clean from specific stream implementations! (e.g.file_io.h)! used by abstract.h! @@ -22,38 +23,44 @@ namespace zen binary container for data storage: must support "basic" std::vector interface (e.g. std::vector<std::byte>, std::string, Zbase<char>) --------------------------------- - | Buffered Input Stream Concept | + | Unbuffered Input Stream Concept | --------------------------------- - struct BufferedInputStream - { - size_t read(void* buffer, size_t bytesToRead); //throw X; return "bytesToRead" bytes unless end of stream! + size_t getBlockSize(); //throw X + size_t tryRead(void* buffer, size_t bytesToRead); //throw X; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0! + + ---------------------------------- + | Unbuffered Output Stream Concept | + ---------------------------------- + size_t getBlockSize(); //throw X + size_t tryWrite(const void* buffer, size_t bytesToWrite); //throw X; may return short! CONTRACT: bytesToWrite > 0 + + =============================================================================================== - Optional: support stream-copying - -------------------------------- - size_t getBlockSize() const; - const IoCallback& notifyUnbufferedIO - }; + --------------------------------- + | Buffered Input Stream Concept | + --------------------------------- + size_t read(void* buffer, size_t bytesToRead); //throw X; return "bytesToRead" bytes unless end of stream! ---------------------------------- | Buffered Output Stream Concept | ---------------------------------- - struct BufferedOutputStream - { - void write(const void* buffer, size_t bytesToWrite); //throw X + void write(const void* buffer, size_t bytesToWrite); //throw X */ - Optional: support stream-copying - -------------------------------- - const IoCallback& notifyUnbufferedIO - }; */ using IoCallback = std::function<void(int64_t bytesDelta)>; //throw X -//functions based on buffered stream abstraction -template <class BufferedInputStream, class BufferedOutputStream> -void bufferedStreamCopy(BufferedInputStream& streamIn, BufferedOutputStream& streamOut); //throw X +template <class BinContainer, class Function> +BinContainer unbufferedLoad(Function tryRead/*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/, + size_t blockSize); //throw X + +template <class BinContainer, class Function> +void unbufferedSave(const BinContainer& cont, Function tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/, + size_t blockSize); //throw X + +template <class Function1, class Function2> +void unbufferedStreamCopy(Function1 tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/, size_t blockSizeIn, + Function2 tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/, size_t blockSizeOut); //throw X -template <class BinContainer, class BufferedInputStream> BinContainer -bufferedLoad(BufferedInputStream& streamIn); //throw X template <class N, class BufferedOutputStream> void writeNumber (BufferedOutputStream& stream, const N& num); // template <class C, class BufferedOutputStream> void writeContainer(BufferedOutputStream& stream, const C& str); //noexcept @@ -71,124 +78,277 @@ template < class BufferedInputStream> void readArray (BufferedInputSt struct IOCallbackDivider { - IOCallbackDivider(const IoCallback& notifyUnbufferedIO, int64_t& totalUnbufferedIO) : totalUnbufferedIO_(totalUnbufferedIO), notifyUnbufferedIO_(notifyUnbufferedIO) {} + IOCallbackDivider(const IoCallback& notifyUnbufferedIO, int64_t& totalBytesNotified) : + totalBytesNotified_(totalBytesNotified), + notifyUnbufferedIO_(notifyUnbufferedIO) { assert(totalBytesNotified == 0); } - void operator()(int64_t bytesDelta) + void operator()(int64_t bytesDelta) //throw X! { - if (notifyUnbufferedIO_) notifyUnbufferedIO_((totalUnbufferedIO_ - totalUnbufferedIO_ / 2 * 2 + bytesDelta) / 2); //throw X! - totalUnbufferedIO_ += bytesDelta; + if (notifyUnbufferedIO_) notifyUnbufferedIO_((totalBytesNotified_ + bytesDelta) / 2 - totalBytesNotified_ / 2); //throw X! + totalBytesNotified_ += bytesDelta; } private: - int64_t& totalUnbufferedIO_; + int64_t& totalBytesNotified_; const IoCallback& notifyUnbufferedIO_; }; +//------------------------------------------------------------------------------------- //buffered input/output stream reference implementations: -template <class BinContainer> struct MemoryStreamIn { - explicit MemoryStreamIn(const BinContainer& cont) : buffer_(cont) {} //this better be cheap! + explicit MemoryStreamIn(const std::string_view& stream) : memRef_(stream) {} + + MemoryStreamIn(std::string&&) = delete; //careful: do NOT store reference to a temporary! size_t read(void* buffer, size_t bytesToRead) //return "bytesToRead" bytes unless end of stream! { - using Byte = typename BinContainer::value_type; - static_assert(sizeof(Byte) == 1); - const size_t bytesRead = std::min(bytesToRead, buffer_.size() - pos_); - auto itFirst = buffer_.begin() + pos_; - std::copy(itFirst, itFirst + bytesRead, static_cast<Byte*>(buffer)); - pos_ += bytesRead; - return bytesRead; + const size_t junkSize = std::min(bytesToRead, memRef_.size() - pos_); + std::memcpy(buffer, memRef_.data() + pos_, junkSize); + pos_ += junkSize; + return junkSize; } size_t pos() const { return pos_; } private: - MemoryStreamIn (const MemoryStreamIn&) = delete; + //MemoryStreamIn (const MemoryStreamIn&) = delete; -> why not allow copying? MemoryStreamIn& operator=(const MemoryStreamIn&) = delete; - const BinContainer buffer_; + const std::string_view memRef_; size_t pos_ = 0; }; -template <class BinContainer> struct MemoryStreamOut { MemoryStreamOut() = default; void write(const void* buffer, size_t bytesToWrite) { - using Byte = typename BinContainer::value_type; - static_assert(sizeof(Byte) == 1); - buffer_.resize(buffer_.size() + bytesToWrite); - const auto it = static_cast<const Byte*>(buffer); - std::copy(it, it + bytesToWrite, buffer_.end() - bytesToWrite); + memBuf_.append(static_cast<const char*>(buffer), bytesToWrite); } - const BinContainer& ref() const { return buffer_; } - /**/ BinContainer& ref() { return buffer_; } + const std::string& ref() const { return memBuf_; } + /**/ std::string& ref() { return memBuf_; } private: MemoryStreamOut (const MemoryStreamOut&) = delete; MemoryStreamOut& operator=(const MemoryStreamOut&) = delete; - BinContainer buffer_; + std::string memBuf_; }; +//------------------------------------------------------------------------------------- +template <class Function> +struct BufferedInputStream +{ + BufferedInputStream(Function tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/, + size_t blockSize) : + tryRead_(tryRead), blockSize_(blockSize) {} + size_t read(void* buffer, size_t bytesToRead) //throw X; return "bytesToRead" bytes unless end of stream! + { + assert(memBuf_.size() >= blockSize_); + assert(bufPos_ <= bufPosEnd_ && bufPosEnd_ <= memBuf_.size()); + const auto bufStart = buffer; + for (;;) + { + const size_t junkSize = std::min(bytesToRead, bufPosEnd_ - bufPos_); + std::memcpy(buffer, memBuf_.data() + bufPos_ /*caveat: vector debug checks*/, junkSize); + bufPos_ += junkSize; + buffer = static_cast<std::byte*>(buffer) + junkSize; + bytesToRead -= junkSize; + + if (bytesToRead == 0) + break; + //-------------------------------------------------------------------- + const size_t bytesRead = tryRead_(memBuf_.data(), blockSize_); //throw X; may return short, only 0 means EOF! => CONTRACT: bytesToRead > 0 + bufPos_ = 0; + bufPosEnd_ = bytesRead; + + if (bytesRead == 0) //end of file + break; + } + return static_cast<std::byte*>(buffer) - + static_cast<std::byte*>(bufStart); + } + +private: + BufferedInputStream (const BufferedInputStream&) = delete; + BufferedInputStream& operator=(const BufferedInputStream&) = delete; + Function tryRead_; + const size_t blockSize_; -//-----------------------implementation------------------------------- -template <class BufferedInputStream, class BufferedOutputStream> inline -void bufferedStreamCopy(BufferedInputStream& streamIn, //throw X - BufferedOutputStream& streamOut) // + size_t bufPos_ = 0; + size_t bufPosEnd_= 0; + std::vector<std::byte> memBuf_{blockSize_}; +}; + + +template <class Function> +struct BufferedOutputStream { - const size_t blockSize = streamIn.getBlockSize(); + BufferedOutputStream(Function tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/, + size_t blockSize) : + tryWrite_(tryWrite), blockSize_(blockSize) {} + + ~BufferedOutputStream() + { + } + + void write(const void* buffer, size_t bytesToWrite) //throw X + { + assert(memBuf_.size() >= blockSize_); + assert(bufPos_ <= bufPosEnd_ && bufPosEnd_ <= memBuf_.size()); + + for (;;) + { + const size_t junkSize = std::min(bytesToWrite, blockSize_ - (bufPosEnd_ - bufPos_)); + std::memcpy(memBuf_.data() + bufPosEnd_, buffer, junkSize); + bufPosEnd_ += junkSize; + buffer = static_cast<const std::byte*>(buffer) + junkSize; + bytesToWrite -= junkSize; + + if (bytesToWrite == 0) + return; + //-------------------------------------------------------------------- + bufPos_ += tryWrite_(memBuf_.data() + bufPos_, blockSize_); //throw X; may return short + + if (memBuf_.size() - bufPos_ < blockSize_ || //support memBuf_.size() > blockSize to avoid memmove()s + bufPos_ == bufPosEnd_) + { + std::memmove(memBuf_.data(), memBuf_.data() + bufPos_, bufPosEnd_ - bufPos_); + bufPosEnd_ -= bufPos_; + bufPos_ = 0; + } + } + } + + void flushBuffer() //throw X + { + assert(bufPosEnd_ - bufPos_ <= blockSize_); + assert(bufPos_ <= bufPosEnd_ && bufPosEnd_ <= memBuf_.size()); + while (bufPos_ != bufPosEnd_) + bufPos_ += tryWrite_(memBuf_.data() + bufPos_, bufPosEnd_ - bufPos_); //throw X + } + +private: + BufferedOutputStream (const BufferedOutputStream&) = delete; + BufferedOutputStream& operator=(const BufferedOutputStream&) = delete; + + Function tryWrite_; + const size_t blockSize_; + + size_t bufPos_ = 0; + size_t bufPosEnd_ = 0; + std::vector<std::byte> memBuf_{2 * /*=> mitigate memmove()*/ blockSize_}; //throw FileError +}; + +//------------------------------------------------------------------------------------- + +template <class BinContainer, class Function> inline +BinContainer unbufferedLoad(Function tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/, + size_t blockSize) //throw X +{ + static_assert(sizeof(typename BinContainer::value_type) == 1); //expect: bytes if (blockSize == 0) throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); - std::vector<std::byte> buffer(blockSize); + BinContainer buf; for (;;) { - const size_t bytesRead = streamIn.read(&buffer[0], blockSize); //throw X; return "bytesToRead" bytes unless end of stream! - streamOut.write(&buffer[0], bytesRead); //throw X + warn_static("don't need zero-initialization!") + buf.resize(buf.size() + blockSize); + const size_t bytesRead = tryRead(buf.data() + buf.size() - blockSize, blockSize); //throw X; may return short; only 0 means EOF + buf.resize(buf.size() - blockSize + bytesRead); //caveat: unsigned arithmetics + + if (bytesRead == 0) //end of file + { + //caveat: memory consumption of returned string! + if (buf.capacity() > buf.size() * 3 / 2) //reference: in worst case, std::vector with growth factor 1.5 "wastes" 50% of its size as unused capacity + buf.shrink_to_fit(); //=> shrink if buffer is wasting more than that! - if (bytesRead < blockSize) //end of file - break; + return buf; + } } } -template <class BinContainer, class BufferedInputStream> inline -BinContainer bufferedLoad(BufferedInputStream& streamIn) //throw X +template <class BinContainer, class Function> inline +void unbufferedSave(const BinContainer& cont, + Function tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/, + size_t blockSize) //throw X { static_assert(sizeof(typename BinContainer::value_type) == 1); //expect: bytes - - const size_t blockSize = streamIn.getBlockSize(); if (blockSize == 0) throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); - BinContainer buffer; + const size_t bufPosEnd = cont.size(); + size_t bufPos = 0; + + while (bufPos < bufPosEnd) + bufPos += tryWrite(cont.data() + bufPos, std::min(bufPosEnd - bufPos, blockSize)); //throw X +} + + +template <class Function1, class Function2> inline +void unbufferedStreamCopy(Function1 tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/, + size_t blockSizeIn, + Function2 tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/, + size_t blockSizeOut) //throw X +{ + /* caveat: buffer block sizes might not be power of 2: + - f_iosize for network share on macOS + - libssh2 uses weird packet sizes like MAX_SFTP_OUTGOING_SIZE (30000), and will send incomplete packages if block size is not an exact multiple :( + => that's a problem because we want input/output sizes to be multiples of each other to help avoid the std::memmove() below */ +#if 0 + blockSizeIn = std::bit_ceil(blockSizeIn); + blockSizeOut = std::bit_ceil(blockSizeOut); +#endif + if (blockSizeIn <= 1 || blockSizeOut <= 1) + throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__)); + + const size_t bufCapacity = blockSizeOut - 1 + blockSizeIn; + const size_t alignment = ::sysconf(_SC_PAGESIZE); //-1 on error => posix_memalign() will fail + assert(alignment >= sizeof(void*) && std::has_single_bit(alignment)); //required by posix_memalign() + std::byte* buf = nullptr; + errno = ::posix_memalign(reinterpret_cast<void**>(&buf), alignment, bufCapacity); + ZEN_ON_SCOPE_EXIT(::free(buf)); + + size_t bufPosEnd = 0; for (;;) { - buffer.resize(buffer.size() + blockSize); - const size_t bytesRead = streamIn.read(&*(buffer.end() - blockSize), blockSize); //throw X; return "blockSize" bytes unless end of stream! - if (bytesRead < blockSize) //end of file + const size_t bytesRead = tryRead(buf + bufPosEnd, blockSizeIn); //throw X; may return short; only 0 means EOF + + if (bytesRead == 0) //end of file { - buffer.resize(buffer.size() - (blockSize - bytesRead)); //caveat: unsigned arithmetics + size_t bufPos = 0; + while (bufPos < bufPosEnd) + bufPos += tryWrite(buf + bufPos, bufPosEnd - bufPos); //throw X; may return short + return; + } + else + { + bufPosEnd += bytesRead; - //caveat: memory consumption of returned string! - if (buffer.capacity() > buffer.size() * 3 / 2) //reference: in worst case, std::vector with growth factor 1.5 "wastes" 50% of its size as unused capacity - buffer.shrink_to_fit(); //=> shrink if buffer is wasting more than that! + size_t bufPos = 0; + while (bufPosEnd - bufPos >= blockSizeOut) + bufPos += tryWrite(buf + bufPos, blockSizeOut); //throw X; may return short - return buffer; + if (bufPos > 0) + { + bufPosEnd -= bufPos; + std::memmove(buf, buf + bufPos, bufPosEnd); + } } } } +//------------------------------------------------------------------------------------- template <class BufferedOutputStream> inline void writeArray(BufferedOutputStream& stream, const void* buffer, size_t len) @@ -232,7 +392,7 @@ template <class N, class BufferedInputStream> inline N readNumber(BufferedInputStream& stream) //throw SysErrorUnexpectedEos { static_assert(isArithmetic<N> || std::is_same_v<N, bool> || std::is_enum_v<N>); - N num{}; + N num; //uninitialized readArray(stream, &num, sizeof(N)); //throw SysErrorUnexpectedEos return num; } |