summaryrefslogtreecommitdiff
path: root/zen/serialize.h
diff options
context:
space:
mode:
Diffstat (limited to 'zen/serialize.h')
-rw-r--r--zen/serialize.h300
1 files changed, 230 insertions, 70 deletions
diff --git a/zen/serialize.h b/zen/serialize.h
index 26202d96..53a6fc62 100644
--- a/zen/serialize.h
+++ b/zen/serialize.h
@@ -7,6 +7,7 @@
#ifndef SERIALIZE_H_839405783574356
#define SERIALIZE_H_839405783574356
+//#include <bit>
#include <functional>
#include "sys_error.h"
//keep header clean from specific stream implementations! (e.g.file_io.h)! used by abstract.h!
@@ -22,38 +23,44 @@ namespace zen
binary container for data storage: must support "basic" std::vector interface (e.g. std::vector<std::byte>, std::string, Zbase<char>)
---------------------------------
- | Buffered Input Stream Concept |
+ | Unbuffered Input Stream Concept |
---------------------------------
- struct BufferedInputStream
- {
- size_t read(void* buffer, size_t bytesToRead); //throw X; return "bytesToRead" bytes unless end of stream!
+ size_t getBlockSize(); //throw X
+ size_t tryRead(void* buffer, size_t bytesToRead); //throw X; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0!
+
+ ----------------------------------
+ | Unbuffered Output Stream Concept |
+ ----------------------------------
+ size_t getBlockSize(); //throw X
+ size_t tryWrite(const void* buffer, size_t bytesToWrite); //throw X; may return short! CONTRACT: bytesToWrite > 0
+
+ ===============================================================================================
- Optional: support stream-copying
- --------------------------------
- size_t getBlockSize() const;
- const IoCallback& notifyUnbufferedIO
- };
+ ---------------------------------
+ | Buffered Input Stream Concept |
+ ---------------------------------
+ size_t read(void* buffer, size_t bytesToRead); //throw X; return "bytesToRead" bytes unless end of stream!
----------------------------------
| Buffered Output Stream Concept |
----------------------------------
- struct BufferedOutputStream
- {
- void write(const void* buffer, size_t bytesToWrite); //throw X
+ void write(const void* buffer, size_t bytesToWrite); //throw X */
- Optional: support stream-copying
- --------------------------------
- const IoCallback& notifyUnbufferedIO
- }; */
using IoCallback = std::function<void(int64_t bytesDelta)>; //throw X
-//functions based on buffered stream abstraction
-template <class BufferedInputStream, class BufferedOutputStream>
-void bufferedStreamCopy(BufferedInputStream& streamIn, BufferedOutputStream& streamOut); //throw X
+template <class BinContainer, class Function>
+BinContainer unbufferedLoad(Function tryRead/*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/,
+ size_t blockSize); //throw X
+
+template <class BinContainer, class Function>
+void unbufferedSave(const BinContainer& cont, Function tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/,
+ size_t blockSize); //throw X
+
+template <class Function1, class Function2>
+void unbufferedStreamCopy(Function1 tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/, size_t blockSizeIn,
+ Function2 tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/, size_t blockSizeOut); //throw X
-template <class BinContainer, class BufferedInputStream> BinContainer
-bufferedLoad(BufferedInputStream& streamIn); //throw X
template <class N, class BufferedOutputStream> void writeNumber (BufferedOutputStream& stream, const N& num); //
template <class C, class BufferedOutputStream> void writeContainer(BufferedOutputStream& stream, const C& str); //noexcept
@@ -71,124 +78,277 @@ template < class BufferedInputStream> void readArray (BufferedInputSt
struct IOCallbackDivider
{
- IOCallbackDivider(const IoCallback& notifyUnbufferedIO, int64_t& totalUnbufferedIO) : totalUnbufferedIO_(totalUnbufferedIO), notifyUnbufferedIO_(notifyUnbufferedIO) {}
+ IOCallbackDivider(const IoCallback& notifyUnbufferedIO, int64_t& totalBytesNotified) :
+ totalBytesNotified_(totalBytesNotified),
+ notifyUnbufferedIO_(notifyUnbufferedIO) { assert(totalBytesNotified == 0); }
- void operator()(int64_t bytesDelta)
+ void operator()(int64_t bytesDelta) //throw X!
{
- if (notifyUnbufferedIO_) notifyUnbufferedIO_((totalUnbufferedIO_ - totalUnbufferedIO_ / 2 * 2 + bytesDelta) / 2); //throw X!
- totalUnbufferedIO_ += bytesDelta;
+ if (notifyUnbufferedIO_) notifyUnbufferedIO_((totalBytesNotified_ + bytesDelta) / 2 - totalBytesNotified_ / 2); //throw X!
+ totalBytesNotified_ += bytesDelta;
}
private:
- int64_t& totalUnbufferedIO_;
+ int64_t& totalBytesNotified_;
const IoCallback& notifyUnbufferedIO_;
};
+//-------------------------------------------------------------------------------------
//buffered input/output stream reference implementations:
-template <class BinContainer>
struct MemoryStreamIn
{
- explicit MemoryStreamIn(const BinContainer& cont) : buffer_(cont) {} //this better be cheap!
+ explicit MemoryStreamIn(const std::string_view& stream) : memRef_(stream) {}
+
+ MemoryStreamIn(std::string&&) = delete; //careful: do NOT store reference to a temporary!
size_t read(void* buffer, size_t bytesToRead) //return "bytesToRead" bytes unless end of stream!
{
- using Byte = typename BinContainer::value_type;
- static_assert(sizeof(Byte) == 1);
- const size_t bytesRead = std::min(bytesToRead, buffer_.size() - pos_);
- auto itFirst = buffer_.begin() + pos_;
- std::copy(itFirst, itFirst + bytesRead, static_cast<Byte*>(buffer));
- pos_ += bytesRead;
- return bytesRead;
+ const size_t junkSize = std::min(bytesToRead, memRef_.size() - pos_);
+ std::memcpy(buffer, memRef_.data() + pos_, junkSize);
+ pos_ += junkSize;
+ return junkSize;
}
size_t pos() const { return pos_; }
private:
- MemoryStreamIn (const MemoryStreamIn&) = delete;
+ //MemoryStreamIn (const MemoryStreamIn&) = delete; -> why not allow copying?
MemoryStreamIn& operator=(const MemoryStreamIn&) = delete;
- const BinContainer buffer_;
+ const std::string_view memRef_;
size_t pos_ = 0;
};
-template <class BinContainer>
struct MemoryStreamOut
{
MemoryStreamOut() = default;
void write(const void* buffer, size_t bytesToWrite)
{
- using Byte = typename BinContainer::value_type;
- static_assert(sizeof(Byte) == 1);
- buffer_.resize(buffer_.size() + bytesToWrite);
- const auto it = static_cast<const Byte*>(buffer);
- std::copy(it, it + bytesToWrite, buffer_.end() - bytesToWrite);
+ memBuf_.append(static_cast<const char*>(buffer), bytesToWrite);
}
- const BinContainer& ref() const { return buffer_; }
- /**/ BinContainer& ref() { return buffer_; }
+ const std::string& ref() const { return memBuf_; }
+ /**/ std::string& ref() { return memBuf_; }
private:
MemoryStreamOut (const MemoryStreamOut&) = delete;
MemoryStreamOut& operator=(const MemoryStreamOut&) = delete;
- BinContainer buffer_;
+ std::string memBuf_;
};
+//-------------------------------------------------------------------------------------
+template <class Function>
+struct BufferedInputStream
+{
+ BufferedInputStream(Function tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/,
+ size_t blockSize) :
+ tryRead_(tryRead), blockSize_(blockSize) {}
+ size_t read(void* buffer, size_t bytesToRead) //throw X; return "bytesToRead" bytes unless end of stream!
+ {
+ assert(memBuf_.size() >= blockSize_);
+ assert(bufPos_ <= bufPosEnd_ && bufPosEnd_ <= memBuf_.size());
+ const auto bufStart = buffer;
+ for (;;)
+ {
+ const size_t junkSize = std::min(bytesToRead, bufPosEnd_ - bufPos_);
+ std::memcpy(buffer, memBuf_.data() + bufPos_ /*caveat: vector debug checks*/, junkSize);
+ bufPos_ += junkSize;
+ buffer = static_cast<std::byte*>(buffer) + junkSize;
+ bytesToRead -= junkSize;
+
+ if (bytesToRead == 0)
+ break;
+ //--------------------------------------------------------------------
+ const size_t bytesRead = tryRead_(memBuf_.data(), blockSize_); //throw X; may return short, only 0 means EOF! => CONTRACT: bytesToRead > 0
+ bufPos_ = 0;
+ bufPosEnd_ = bytesRead;
+
+ if (bytesRead == 0) //end of file
+ break;
+ }
+ return static_cast<std::byte*>(buffer) -
+ static_cast<std::byte*>(bufStart);
+ }
+
+private:
+ BufferedInputStream (const BufferedInputStream&) = delete;
+ BufferedInputStream& operator=(const BufferedInputStream&) = delete;
+ Function tryRead_;
+ const size_t blockSize_;
-//-----------------------implementation-------------------------------
-template <class BufferedInputStream, class BufferedOutputStream> inline
-void bufferedStreamCopy(BufferedInputStream& streamIn, //throw X
- BufferedOutputStream& streamOut) //
+ size_t bufPos_ = 0;
+ size_t bufPosEnd_= 0;
+ std::vector<std::byte> memBuf_{blockSize_};
+};
+
+
+template <class Function>
+struct BufferedOutputStream
{
- const size_t blockSize = streamIn.getBlockSize();
+ BufferedOutputStream(Function tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/,
+ size_t blockSize) :
+ tryWrite_(tryWrite), blockSize_(blockSize) {}
+
+ ~BufferedOutputStream()
+ {
+ }
+
+ void write(const void* buffer, size_t bytesToWrite) //throw X
+ {
+ assert(memBuf_.size() >= blockSize_);
+ assert(bufPos_ <= bufPosEnd_ && bufPosEnd_ <= memBuf_.size());
+
+ for (;;)
+ {
+ const size_t junkSize = std::min(bytesToWrite, blockSize_ - (bufPosEnd_ - bufPos_));
+ std::memcpy(memBuf_.data() + bufPosEnd_, buffer, junkSize);
+ bufPosEnd_ += junkSize;
+ buffer = static_cast<const std::byte*>(buffer) + junkSize;
+ bytesToWrite -= junkSize;
+
+ if (bytesToWrite == 0)
+ return;
+ //--------------------------------------------------------------------
+ bufPos_ += tryWrite_(memBuf_.data() + bufPos_, blockSize_); //throw X; may return short
+
+ if (memBuf_.size() - bufPos_ < blockSize_ || //support memBuf_.size() > blockSize to avoid memmove()s
+ bufPos_ == bufPosEnd_)
+ {
+ std::memmove(memBuf_.data(), memBuf_.data() + bufPos_, bufPosEnd_ - bufPos_);
+ bufPosEnd_ -= bufPos_;
+ bufPos_ = 0;
+ }
+ }
+ }
+
+ void flushBuffer() //throw X
+ {
+ assert(bufPosEnd_ - bufPos_ <= blockSize_);
+ assert(bufPos_ <= bufPosEnd_ && bufPosEnd_ <= memBuf_.size());
+ while (bufPos_ != bufPosEnd_)
+ bufPos_ += tryWrite_(memBuf_.data() + bufPos_, bufPosEnd_ - bufPos_); //throw X
+ }
+
+private:
+ BufferedOutputStream (const BufferedOutputStream&) = delete;
+ BufferedOutputStream& operator=(const BufferedOutputStream&) = delete;
+
+ Function tryWrite_;
+ const size_t blockSize_;
+
+ size_t bufPos_ = 0;
+ size_t bufPosEnd_ = 0;
+ std::vector<std::byte> memBuf_{2 * /*=> mitigate memmove()*/ blockSize_}; //throw FileError
+};
+
+//-------------------------------------------------------------------------------------
+
+template <class BinContainer, class Function> inline
+BinContainer unbufferedLoad(Function tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/,
+ size_t blockSize) //throw X
+{
+ static_assert(sizeof(typename BinContainer::value_type) == 1); //expect: bytes
if (blockSize == 0)
throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__));
- std::vector<std::byte> buffer(blockSize);
+ BinContainer buf;
for (;;)
{
- const size_t bytesRead = streamIn.read(&buffer[0], blockSize); //throw X; return "bytesToRead" bytes unless end of stream!
- streamOut.write(&buffer[0], bytesRead); //throw X
+ warn_static("don't need zero-initialization!")
+ buf.resize(buf.size() + blockSize);
+ const size_t bytesRead = tryRead(buf.data() + buf.size() - blockSize, blockSize); //throw X; may return short; only 0 means EOF
+ buf.resize(buf.size() - blockSize + bytesRead); //caveat: unsigned arithmetics
+
+ if (bytesRead == 0) //end of file
+ {
+ //caveat: memory consumption of returned string!
+ if (buf.capacity() > buf.size() * 3 / 2) //reference: in worst case, std::vector with growth factor 1.5 "wastes" 50% of its size as unused capacity
+ buf.shrink_to_fit(); //=> shrink if buffer is wasting more than that!
- if (bytesRead < blockSize) //end of file
- break;
+ return buf;
+ }
}
}
-template <class BinContainer, class BufferedInputStream> inline
-BinContainer bufferedLoad(BufferedInputStream& streamIn) //throw X
+template <class BinContainer, class Function> inline
+void unbufferedSave(const BinContainer& cont,
+ Function tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/,
+ size_t blockSize) //throw X
{
static_assert(sizeof(typename BinContainer::value_type) == 1); //expect: bytes
-
- const size_t blockSize = streamIn.getBlockSize();
if (blockSize == 0)
throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__));
- BinContainer buffer;
+ const size_t bufPosEnd = cont.size();
+ size_t bufPos = 0;
+
+ while (bufPos < bufPosEnd)
+ bufPos += tryWrite(cont.data() + bufPos, std::min(bufPosEnd - bufPos, blockSize)); //throw X
+}
+
+
+template <class Function1, class Function2> inline
+void unbufferedStreamCopy(Function1 tryRead /*(void* buffer, size_t bytesToRead) throw X; may return short; only 0 means EOF*/,
+ size_t blockSizeIn,
+ Function2 tryWrite /*(const void* buffer, size_t bytesToWrite) throw X; may return short*/,
+ size_t blockSizeOut) //throw X
+{
+ /* caveat: buffer block sizes might not be power of 2:
+ - f_iosize for network share on macOS
+ - libssh2 uses weird packet sizes like MAX_SFTP_OUTGOING_SIZE (30000), and will send incomplete packages if block size is not an exact multiple :(
+ => that's a problem because we want input/output sizes to be multiples of each other to help avoid the std::memmove() below */
+#if 0
+ blockSizeIn = std::bit_ceil(blockSizeIn);
+ blockSizeOut = std::bit_ceil(blockSizeOut);
+#endif
+ if (blockSizeIn <= 1 || blockSizeOut <= 1)
+ throw std::logic_error("Contract violation! " + std::string(__FILE__) + ':' + numberTo<std::string>(__LINE__));
+
+ const size_t bufCapacity = blockSizeOut - 1 + blockSizeIn;
+ const size_t alignment = ::sysconf(_SC_PAGESIZE); //-1 on error => posix_memalign() will fail
+ assert(alignment >= sizeof(void*) && std::has_single_bit(alignment)); //required by posix_memalign()
+ std::byte* buf = nullptr;
+ errno = ::posix_memalign(reinterpret_cast<void**>(&buf), alignment, bufCapacity);
+ ZEN_ON_SCOPE_EXIT(::free(buf));
+
+ size_t bufPosEnd = 0;
for (;;)
{
- buffer.resize(buffer.size() + blockSize);
- const size_t bytesRead = streamIn.read(&*(buffer.end() - blockSize), blockSize); //throw X; return "blockSize" bytes unless end of stream!
- if (bytesRead < blockSize) //end of file
+ const size_t bytesRead = tryRead(buf + bufPosEnd, blockSizeIn); //throw X; may return short; only 0 means EOF
+
+ if (bytesRead == 0) //end of file
{
- buffer.resize(buffer.size() - (blockSize - bytesRead)); //caveat: unsigned arithmetics
+ size_t bufPos = 0;
+ while (bufPos < bufPosEnd)
+ bufPos += tryWrite(buf + bufPos, bufPosEnd - bufPos); //throw X; may return short
+ return;
+ }
+ else
+ {
+ bufPosEnd += bytesRead;
- //caveat: memory consumption of returned string!
- if (buffer.capacity() > buffer.size() * 3 / 2) //reference: in worst case, std::vector with growth factor 1.5 "wastes" 50% of its size as unused capacity
- buffer.shrink_to_fit(); //=> shrink if buffer is wasting more than that!
+ size_t bufPos = 0;
+ while (bufPosEnd - bufPos >= blockSizeOut)
+ bufPos += tryWrite(buf + bufPos, blockSizeOut); //throw X; may return short
- return buffer;
+ if (bufPos > 0)
+ {
+ bufPosEnd -= bufPos;
+ std::memmove(buf, buf + bufPos, bufPosEnd);
+ }
}
}
}
+//-------------------------------------------------------------------------------------
template <class BufferedOutputStream> inline
void writeArray(BufferedOutputStream& stream, const void* buffer, size_t len)
@@ -232,7 +392,7 @@ template <class N, class BufferedInputStream> inline
N readNumber(BufferedInputStream& stream) //throw SysErrorUnexpectedEos
{
static_assert(isArithmetic<N> || std::is_same_v<N, bool> || std::is_enum_v<N>);
- N num{};
+ N num; //uninitialized
readArray(stream, &num, sizeof(N)); //throw SysErrorUnexpectedEos
return num;
}
bgstack15