diff options
Diffstat (limited to 'zen/http.cpp')
-rw-r--r-- | zen/http.cpp | 124 |
1 files changed, 77 insertions, 47 deletions
diff --git a/zen/http.cpp b/zen/http.cpp index a26bb3a5..ee71e5b3 100644 --- a/zen/http.cpp +++ b/zen/http.cpp @@ -8,12 +8,18 @@ #include <libcurl/curl_wrap.h> //DON'T include <curl/curl.h> directly! #include "stream_buffer.h" - #include "thread.h" using namespace zen; + const int HTTP_ACCESS_TIME_OUT_SEC = 20; +const size_t HTTP_BLOCK_SIZE_DOWNLOAD = 64 * 1024; //libcurl returns blocks of only 16 kB as returned by recv() even if we request larger blocks via CURLOPT_BUFFERSIZE +//- InternetReadFile() is buffered + prefetching +//- libcurl returns blocks of only 16 kB as returned by recv() even if we request larger blocks via CURLOPT_BUFFERSIZE + const size_t HTTP_STREAM_BUFFER_SIZE = 1024 * 1024; //unit: [byte] + //stream buffer should be big enough to facilitate prefetching during alternating read/write operations => e.g. see serialize.h::unbufferedStreamCopy() + @@ -23,16 +29,14 @@ public: Impl(const Zstring& url, const std::string* postBuf, //issue POST if bound, GET otherwise const std::string& contentType, //required for POST + const IoCallback& onPostBytesSent /*throw X*/, bool disableGetCache, //not relevant for POST (= never cached) const Zstring& userAgent, - const Zstring& caCertFilePath, //optional: enable certificate validation - const IoCallback& notifyUnbufferedIO /*throw X*/) : //throw SysError, X - notifyUnbufferedIO_(notifyUnbufferedIO) + const Zstring& caCertFilePath /*optional: enable certificate validation*/) //throw SysError, X { ZEN_ON_SCOPE_FAIL(cleanup()); //destructor call would lead to member double clean-up!!! - //may be sending large POST: call back first - if (notifyUnbufferedIO_) notifyUnbufferedIO_(0); //throw X + assert(postBuf || !onPostBytesSent); const Zstring urlFmt = afterFirst(url, Zstr("://"), IfNotFoundReturn::none); const Zstring server = beforeFirst(urlFmt, Zstr('/'), IfNotFoundReturn::all); @@ -61,12 +65,14 @@ public: auto promiseHeader = std::make_shared<std::promise<std::string>>(); std::future<std::string> futHeader = promiseHeader->get_future(); - worker_ = InterruptibleThread([asyncStreamOut = this->asyncStreamIn_, promiseHeader, headers = std::move(headers), + auto postBytesSent = std::make_shared<std::atomic<int64_t>>(0); + + worker_ = InterruptibleThread([asyncStreamOut = this->asyncStreamIn_, promiseHeader, headers = std::move(headers), postBytesSent, server, useTls, caCertFilePath, userAgent = utfTo<std::string>(userAgent), postBuf = postBuf ? std::optional<std::string>(*postBuf) : std::nullopt, //[!] life-time! serverRelPath = utfTo<std::string>(page)] { - setCurrentThreadName(Zstr("HttpInputStream ") + server); + setCurrentThreadName(Zstr("Istream ") + server); bool headerReceived = false; try @@ -77,13 +83,22 @@ public: std::vector<CurlOption> extraOptions {{CURLOPT_USERAGENT, userAgent.c_str()}}; //CURLOPT_FOLLOWLOCATION already off by default :) + + + std::function<size_t(std::span<char> buf)> readRequest; if (postBuf) { - extraOptions.emplace_back(CURLOPT_POSTFIELDS, postBuf->c_str()); - extraOptions.emplace_back(CURLOPT_POSTFIELDSIZE_LARGE, postBuf->size()); //postBuf not necessarily null-terminated! + readRequest = [&, postBufStream{MemoryStreamIn(*postBuf)}](std::span<char> buf) mutable + { + const size_t bytesRead = postBufStream.read(buf.data(), buf.size()); + *postBytesSent += bytesRead; + return bytesRead; + }; + extraOptions.emplace_back(CURLOPT_POST, 1); + extraOptions.emplace_back(CURLOPT_POSTFIELDSIZE_LARGE, postBuf->size()); //avoid HTTP chunked transfer encoding? } - //carefully with these callbacks! First receive HTTP header without blocking, + //careful with these callbacks! First receive HTTP header without blocking, //and only then allow AsyncStreamBuffer::write() which can block! std::string headerBuf; @@ -109,13 +124,13 @@ public: if (!headerReceived) throw SysError(L"Received HTTP body without header."); - return asyncStreamOut->write(buf.data(), buf.size()); //throw ThreadStopRequest + asyncStreamOut->write(buf.data(), buf.size()); //throw ThreadStopRequest }; httpSession.perform(serverRelPath, curlHeaders, extraOptions, writeResponse /*throw ThreadStopRequest*/, - nullptr /*readRequest*/, + readRequest, onHeaderData /*throw SysError*/, HTTP_ACCESS_TIME_OUT_SEC); //throw SysError, ThreadStopRequest @@ -133,6 +148,19 @@ public: } }); + //------------------------------------------------------------------------------------ + if (postBuf && onPostBytesSent) + { + int64_t bytesReported = 0; + while (futHeader.wait_for(std::chrono::milliseconds(50)) == std::future_status::timeout) + { + const int64_t bytesDelta = *postBytesSent /*atomic shared access!*/- bytesReported; + bytesReported += bytesDelta; + onPostBytesSent(bytesDelta); //throw X + } + } + //------------------------------------------------------------------------------------ + const std::string headBuf = futHeader.get(); //throw SysError //parse header: https://www.w3.org/Protocols/HTTP/1.0/spec.html#Request-Line const std::string& statusBuf = beforeFirst(headBuf, "\r\n", IfNotFoundReturn::all); @@ -151,9 +179,6 @@ public: /* let's NOT consider "Content-Length" header: - may be unavailable ("Transfer-Encoding: chunked") - may refer to compressed data size ("Content-Encoding: gzip") */ - - //let's not get too finicky: at least report the logical amount of bytes sent/received (excluding HTTP headers) - if (notifyUnbufferedIO_) notifyUnbufferedIO_(postBuf ? postBuf->size() : 0); //throw X } ~Impl() { cleanup(); } @@ -166,39 +191,28 @@ public: return it != responseHeaders_.end() ? &it->second : nullptr; } - size_t read(void* buffer, size_t bytesToRead) //throw SysError, X; return "bytesToRead" bytes unless end of stream! + size_t getBlockSize() const { return HTTP_BLOCK_SIZE_DOWNLOAD; } + + size_t tryRead(void* buffer, size_t bytesToRead) //throw SysError; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0! { - const size_t bytesRead = asyncStreamIn_->read(buffer, bytesToRead); //throw SysError - reportBytesProcessed(); //throw X - return bytesRead; + return asyncStreamIn_->tryRead(buffer, bytesToRead); //throw SysError //no need for asyncStreamIn_->checkWriteErrors(): once end of stream is reached, asyncStreamOut->closeStream() was called => no errors occured } - size_t getBlockSize() const { return 64 * 1024; } - private: Impl (const Impl&) = delete; Impl& operator=(const Impl&) = delete; - void reportBytesProcessed() //throw X - { - const int64_t totalBytesDownloaded = asyncStreamIn_->getTotalBytesWritten(); - if (notifyUnbufferedIO_) notifyUnbufferedIO_(totalBytesDownloaded - totalBytesReported_); //throw X - totalBytesReported_ = totalBytesDownloaded; - } - void cleanup() { asyncStreamIn_->setReadError(std::make_exception_ptr(ThreadStopRequest())); + warn_static("log on error!") } - std::shared_ptr<AsyncStreamBuffer> asyncStreamIn_ = std::make_shared<AsyncStreamBuffer>(512 * 1024); + std::shared_ptr<AsyncStreamBuffer> asyncStreamIn_ = std::make_shared<AsyncStreamBuffer>(HTTP_STREAM_BUFFER_SIZE); InterruptibleThread worker_; - int64_t totalBytesReported_ = 0; int statusCode_ = 0; std::unordered_map<std::string, std::string, StringHashAsciiNoCase, StringEqualAsciiNoCase> responseHeaders_; - - const IoCallback notifyUnbufferedIO_; //throw X }; @@ -206,11 +220,20 @@ HttpInputStream::HttpInputStream(std::unique_ptr<Impl>&& pimpl) : pimpl_(std::mo HttpInputStream::~HttpInputStream() {} -size_t HttpInputStream::read(void* buffer, size_t bytesToRead) { return pimpl_->read(buffer, bytesToRead); } //throw SysError, X; return "bytesToRead" bytes unless end of stream! +size_t HttpInputStream::tryRead(void* buffer, size_t bytesToRead) { return pimpl_->tryRead(buffer, bytesToRead); } size_t HttpInputStream::getBlockSize() const { return pimpl_->getBlockSize(); } -std::string HttpInputStream::readAll() { return bufferedLoad<std::string>(*pimpl_); } //throw SysError, X +std::string HttpInputStream::readAll(const IoCallback& notifyUnbufferedIO /*throw X*/) //throw SysError, X +{ + return unbufferedLoad<std::string>([&](void* buffer, size_t bytesToRead) + { + const size_t bytesRead = pimpl_->tryRead(buffer, bytesToRead); //throw SysError; may return short, only 0 means EOF! => CONTRACT: bytesToRead > 0! + if (notifyUnbufferedIO) notifyUnbufferedIO(bytesRead); //throw X! + return bytesRead; + }, + pimpl_->getBlockSize()); //throw SysError, X +} namespace @@ -218,15 +241,16 @@ namespace std::unique_ptr<HttpInputStream::Impl> sendHttpRequestImpl(const Zstring& url, const std::string* postBuf /*issue POST if bound, GET otherwise*/, const std::string& contentType, //required for POST + const IoCallback& onPostBytesSent /*throw X*/, const Zstring& userAgent, - const Zstring& caCertFilePath /*optional: enable certificate validation*/, - const IoCallback& notifyUnbufferedIO) //throw SysError, X + const Zstring& caCertFilePath /*optional: enable certificate validation*/) //throw SysError, X { Zstring urlRed = url; //"A user agent should not automatically redirect a request more than five times, since such redirections usually indicate an infinite loop." for (int redirects = 0; redirects < 6; ++redirects) { - auto response = std::make_unique<HttpInputStream::Impl>(urlRed, postBuf, contentType, false /*disableGetCache*/, userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X + auto response = std::make_unique<HttpInputStream::Impl>(urlRed, postBuf, contentType, onPostBytesSent, false /*disableGetCache*/, + userAgent, caCertFilePath); //throw SysError, X //https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#3xx_Redirection const int httpStatus = response->getStatusCode(); @@ -319,24 +343,30 @@ std::vector<std::pair<std::string, std::string>> zen::xWwwFormUrlDecode(const st } -HttpInputStream zen::sendHttpGet(const Zstring& url, const Zstring& userAgent, const Zstring& caCertFilePath, const IoCallback& notifyUnbufferedIO) //throw SysError, X +HttpInputStream zen::sendHttpGet(const Zstring& url, const Zstring& userAgent, const Zstring& caCertFilePath) //throw SysError { - return sendHttpRequestImpl(url, nullptr /*postBuf*/, "" /*contentType*/, userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X, X + return sendHttpRequestImpl(url, nullptr /*postBuf*/, "" /*contentType*/, nullptr /*onPostBytesSent*/, userAgent, caCertFilePath); //throw SysError } HttpInputStream zen::sendHttpPost(const Zstring& url, const std::vector<std::pair<std::string, std::string>>& postParams, - const Zstring& userAgent, const Zstring& caCertFilePath, const IoCallback& notifyUnbufferedIO) //throw SysError, X + const IoCallback& notifyUnbufferedIO /*throw X*/, + const Zstring& userAgent, + const Zstring& caCertFilePath) //throw SysError, X { - return sendHttpPost(url, xWwwFormUrlEncode(postParams), "application/x-www-form-urlencoded", userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X + return sendHttpPost(url, xWwwFormUrlEncode(postParams), "application/x-www-form-urlencoded", notifyUnbufferedIO, userAgent, caCertFilePath); //throw SysError, X } -HttpInputStream zen::sendHttpPost(const Zstring& url, const std::string& postBuf, const std::string& contentType, - const Zstring& userAgent, const Zstring& caCertFilePath, const IoCallback& notifyUnbufferedIO) //throw SysError, X +HttpInputStream zen::sendHttpPost(const Zstring& url, + const std::string& postBuf, + const std::string& contentType, + const IoCallback& notifyUnbufferedIO /*throw X*/, + const Zstring& userAgent, + const Zstring& caCertFilePath) //throw SysError, X { - return sendHttpRequestImpl(url, &postBuf, contentType, userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X + return sendHttpRequestImpl(url, &postBuf, contentType, notifyUnbufferedIO, userAgent, caCertFilePath); //throw SysError, X } @@ -347,10 +377,10 @@ bool zen::internetIsAlive() //noexcept auto response = std::make_unique<HttpInputStream::Impl>(Zstr("https://www.google.com/"), //https more appropriate than http for testing? (different ports!) nullptr /*postParams*/, "" /*contentType*/, + nullptr /*onPostBytesSent*/, true /*disableGetCache*/, Zstr("FreeFileSync"), - Zstring() /*caCertFilePath*/, - nullptr /*notifyUnbufferedIO*/); //throw SysError + Zstring() /*caCertFilePath*/); //throw SysError const int statusCode = response->getStatusCode(); //attention: google.com might redirect to https://consent.google.com => don't follow, just return "true"!!! |