summaryrefslogtreecommitdiff
path: root/zen/http.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zen/http.cpp')
-rw-r--r--zen/http.cpp124
1 files changed, 77 insertions, 47 deletions
diff --git a/zen/http.cpp b/zen/http.cpp
index a26bb3a5..ee71e5b3 100644
--- a/zen/http.cpp
+++ b/zen/http.cpp
@@ -8,12 +8,18 @@
#include <libcurl/curl_wrap.h> //DON'T include <curl/curl.h> directly!
#include "stream_buffer.h"
- #include "thread.h"
using namespace zen;
+
const int HTTP_ACCESS_TIME_OUT_SEC = 20;
+const size_t HTTP_BLOCK_SIZE_DOWNLOAD = 64 * 1024; //libcurl returns blocks of only 16 kB as returned by recv() even if we request larger blocks via CURLOPT_BUFFERSIZE
+//- InternetReadFile() is buffered + prefetching
+//- libcurl returns blocks of only 16 kB as returned by recv() even if we request larger blocks via CURLOPT_BUFFERSIZE
+ const size_t HTTP_STREAM_BUFFER_SIZE = 1024 * 1024; //unit: [byte]
+ //stream buffer should be big enough to facilitate prefetching during alternating read/write operations => e.g. see serialize.h::unbufferedStreamCopy()
+
@@ -23,16 +29,14 @@ public:
Impl(const Zstring& url,
const std::string* postBuf, //issue POST if bound, GET otherwise
const std::string& contentType, //required for POST
+ const IoCallback& onPostBytesSent /*throw X*/,
bool disableGetCache, //not relevant for POST (= never cached)
const Zstring& userAgent,
- const Zstring& caCertFilePath, //optional: enable certificate validation
- const IoCallback& notifyUnbufferedIO /*throw X*/) : //throw SysError, X
- notifyUnbufferedIO_(notifyUnbufferedIO)
+ const Zstring& caCertFilePath /*optional: enable certificate validation*/) //throw SysError, X
{
ZEN_ON_SCOPE_FAIL(cleanup()); //destructor call would lead to member double clean-up!!!
- //may be sending large POST: call back first
- if (notifyUnbufferedIO_) notifyUnbufferedIO_(0); //throw X
+ assert(postBuf || !onPostBytesSent);
const Zstring urlFmt = afterFirst(url, Zstr("://"), IfNotFoundReturn::none);
const Zstring server = beforeFirst(urlFmt, Zstr('/'), IfNotFoundReturn::all);
@@ -61,12 +65,14 @@ public:
auto promiseHeader = std::make_shared<std::promise<std::string>>();
std::future<std::string> futHeader = promiseHeader->get_future();
- worker_ = InterruptibleThread([asyncStreamOut = this->asyncStreamIn_, promiseHeader, headers = std::move(headers),
+ auto postBytesSent = std::make_shared<std::atomic<int64_t>>(0);
+
+ worker_ = InterruptibleThread([asyncStreamOut = this->asyncStreamIn_, promiseHeader, headers = std::move(headers), postBytesSent,
server, useTls, caCertFilePath, userAgent = utfTo<std::string>(userAgent),
postBuf = postBuf ? std::optional<std::string>(*postBuf) : std::nullopt, //[!] life-time!
serverRelPath = utfTo<std::string>(page)]
{
- setCurrentThreadName(Zstr("HttpInputStream ") + server);
+ setCurrentThreadName(Zstr("Istream ") + server);
bool headerReceived = false;
try
@@ -77,13 +83,22 @@ public:
std::vector<CurlOption> extraOptions {{CURLOPT_USERAGENT, userAgent.c_str()}};
//CURLOPT_FOLLOWLOCATION already off by default :)
+
+
+ std::function<size_t(std::span<char> buf)> readRequest;
if (postBuf)
{
- extraOptions.emplace_back(CURLOPT_POSTFIELDS, postBuf->c_str());
- extraOptions.emplace_back(CURLOPT_POSTFIELDSIZE_LARGE, postBuf->size()); //postBuf not necessarily null-terminated!
+ readRequest = [&, postBufStream{MemoryStreamIn(*postBuf)}](std::span<char> buf) mutable
+ {
+ const size_t bytesRead = postBufStream.read(buf.data(), buf.size());
+ *postBytesSent += bytesRead;
+ return bytesRead;
+ };
+ extraOptions.emplace_back(CURLOPT_POST, 1);
+ extraOptions.emplace_back(CURLOPT_POSTFIELDSIZE_LARGE, postBuf->size()); //avoid HTTP chunked transfer encoding?
}
- //carefully with these callbacks! First receive HTTP header without blocking,
+ //careful with these callbacks! First receive HTTP header without blocking,
//and only then allow AsyncStreamBuffer::write() which can block!
std::string headerBuf;
@@ -109,13 +124,13 @@ public:
if (!headerReceived)
throw SysError(L"Received HTTP body without header.");
- return asyncStreamOut->write(buf.data(), buf.size()); //throw ThreadStopRequest
+ asyncStreamOut->write(buf.data(), buf.size()); //throw ThreadStopRequest
};
httpSession.perform(serverRelPath,
curlHeaders, extraOptions,
writeResponse /*throw ThreadStopRequest*/,
- nullptr /*readRequest*/,
+ readRequest,
onHeaderData /*throw SysError*/,
HTTP_ACCESS_TIME_OUT_SEC); //throw SysError, ThreadStopRequest
@@ -133,6 +148,19 @@ public:
}
});
+ //------------------------------------------------------------------------------------
+ if (postBuf && onPostBytesSent)
+ {
+ int64_t bytesReported = 0;
+ while (futHeader.wait_for(std::chrono::milliseconds(50)) == std::future_status::timeout)
+ {
+ const int64_t bytesDelta = *postBytesSent /*atomic shared access!*/- bytesReported;
+ bytesReported += bytesDelta;
+ onPostBytesSent(bytesDelta); //throw X
+ }
+ }
+ //------------------------------------------------------------------------------------
+
const std::string headBuf = futHeader.get(); //throw SysError
//parse header: https://www.w3.org/Protocols/HTTP/1.0/spec.html#Request-Line
const std::string& statusBuf = beforeFirst(headBuf, "\r\n", IfNotFoundReturn::all);
@@ -151,9 +179,6 @@ public:
/* let's NOT consider "Content-Length" header:
- may be unavailable ("Transfer-Encoding: chunked")
- may refer to compressed data size ("Content-Encoding: gzip") */
-
- //let's not get too finicky: at least report the logical amount of bytes sent/received (excluding HTTP headers)
- if (notifyUnbufferedIO_) notifyUnbufferedIO_(postBuf ? postBuf->size() : 0); //throw X
}
~Impl() { cleanup(); }
@@ -166,39 +191,28 @@ public:
return it != responseHeaders_.end() ? &it->second : nullptr;
}
- size_t read(void* buffer, size_t bytesToRead) //throw SysError, X; return "bytesToRead" bytes unless end of stream!
+ size_t getBlockSize() const { return HTTP_BLOCK_SIZE_DOWNLOAD; }
+
+ size_t tryRead(void* buffer, size_t bytesToRead) //throw SysError; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0!
{
- const size_t bytesRead = asyncStreamIn_->read(buffer, bytesToRead); //throw SysError
- reportBytesProcessed(); //throw X
- return bytesRead;
+ return asyncStreamIn_->tryRead(buffer, bytesToRead); //throw SysError
//no need for asyncStreamIn_->checkWriteErrors(): once end of stream is reached, asyncStreamOut->closeStream() was called => no errors occured
}
- size_t getBlockSize() const { return 64 * 1024; }
-
private:
Impl (const Impl&) = delete;
Impl& operator=(const Impl&) = delete;
- void reportBytesProcessed() //throw X
- {
- const int64_t totalBytesDownloaded = asyncStreamIn_->getTotalBytesWritten();
- if (notifyUnbufferedIO_) notifyUnbufferedIO_(totalBytesDownloaded - totalBytesReported_); //throw X
- totalBytesReported_ = totalBytesDownloaded;
- }
-
void cleanup()
{
asyncStreamIn_->setReadError(std::make_exception_ptr(ThreadStopRequest()));
+ warn_static("log on error!")
}
- std::shared_ptr<AsyncStreamBuffer> asyncStreamIn_ = std::make_shared<AsyncStreamBuffer>(512 * 1024);
+ std::shared_ptr<AsyncStreamBuffer> asyncStreamIn_ = std::make_shared<AsyncStreamBuffer>(HTTP_STREAM_BUFFER_SIZE);
InterruptibleThread worker_;
- int64_t totalBytesReported_ = 0;
int statusCode_ = 0;
std::unordered_map<std::string, std::string, StringHashAsciiNoCase, StringEqualAsciiNoCase> responseHeaders_;
-
- const IoCallback notifyUnbufferedIO_; //throw X
};
@@ -206,11 +220,20 @@ HttpInputStream::HttpInputStream(std::unique_ptr<Impl>&& pimpl) : pimpl_(std::mo
HttpInputStream::~HttpInputStream() {}
-size_t HttpInputStream::read(void* buffer, size_t bytesToRead) { return pimpl_->read(buffer, bytesToRead); } //throw SysError, X; return "bytesToRead" bytes unless end of stream!
+size_t HttpInputStream::tryRead(void* buffer, size_t bytesToRead) { return pimpl_->tryRead(buffer, bytesToRead); }
size_t HttpInputStream::getBlockSize() const { return pimpl_->getBlockSize(); }
-std::string HttpInputStream::readAll() { return bufferedLoad<std::string>(*pimpl_); } //throw SysError, X
+std::string HttpInputStream::readAll(const IoCallback& notifyUnbufferedIO /*throw X*/) //throw SysError, X
+{
+ return unbufferedLoad<std::string>([&](void* buffer, size_t bytesToRead)
+ {
+ const size_t bytesRead = pimpl_->tryRead(buffer, bytesToRead); //throw SysError; may return short, only 0 means EOF! => CONTRACT: bytesToRead > 0!
+ if (notifyUnbufferedIO) notifyUnbufferedIO(bytesRead); //throw X!
+ return bytesRead;
+ },
+ pimpl_->getBlockSize()); //throw SysError, X
+}
namespace
@@ -218,15 +241,16 @@ namespace
std::unique_ptr<HttpInputStream::Impl> sendHttpRequestImpl(const Zstring& url,
const std::string* postBuf /*issue POST if bound, GET otherwise*/,
const std::string& contentType, //required for POST
+ const IoCallback& onPostBytesSent /*throw X*/,
const Zstring& userAgent,
- const Zstring& caCertFilePath /*optional: enable certificate validation*/,
- const IoCallback& notifyUnbufferedIO) //throw SysError, X
+ const Zstring& caCertFilePath /*optional: enable certificate validation*/) //throw SysError, X
{
Zstring urlRed = url;
//"A user agent should not automatically redirect a request more than five times, since such redirections usually indicate an infinite loop."
for (int redirects = 0; redirects < 6; ++redirects)
{
- auto response = std::make_unique<HttpInputStream::Impl>(urlRed, postBuf, contentType, false /*disableGetCache*/, userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X
+ auto response = std::make_unique<HttpInputStream::Impl>(urlRed, postBuf, contentType, onPostBytesSent, false /*disableGetCache*/,
+ userAgent, caCertFilePath); //throw SysError, X
//https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#3xx_Redirection
const int httpStatus = response->getStatusCode();
@@ -319,24 +343,30 @@ std::vector<std::pair<std::string, std::string>> zen::xWwwFormUrlDecode(const st
}
-HttpInputStream zen::sendHttpGet(const Zstring& url, const Zstring& userAgent, const Zstring& caCertFilePath, const IoCallback& notifyUnbufferedIO) //throw SysError, X
+HttpInputStream zen::sendHttpGet(const Zstring& url, const Zstring& userAgent, const Zstring& caCertFilePath) //throw SysError
{
- return sendHttpRequestImpl(url, nullptr /*postBuf*/, "" /*contentType*/, userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X, X
+ return sendHttpRequestImpl(url, nullptr /*postBuf*/, "" /*contentType*/, nullptr /*onPostBytesSent*/, userAgent, caCertFilePath); //throw SysError
}
HttpInputStream zen::sendHttpPost(const Zstring& url, const std::vector<std::pair<std::string, std::string>>& postParams,
- const Zstring& userAgent, const Zstring& caCertFilePath, const IoCallback& notifyUnbufferedIO) //throw SysError, X
+ const IoCallback& notifyUnbufferedIO /*throw X*/,
+ const Zstring& userAgent,
+ const Zstring& caCertFilePath) //throw SysError, X
{
- return sendHttpPost(url, xWwwFormUrlEncode(postParams), "application/x-www-form-urlencoded", userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X
+ return sendHttpPost(url, xWwwFormUrlEncode(postParams), "application/x-www-form-urlencoded", notifyUnbufferedIO, userAgent, caCertFilePath); //throw SysError, X
}
-HttpInputStream zen::sendHttpPost(const Zstring& url, const std::string& postBuf, const std::string& contentType,
- const Zstring& userAgent, const Zstring& caCertFilePath, const IoCallback& notifyUnbufferedIO) //throw SysError, X
+HttpInputStream zen::sendHttpPost(const Zstring& url,
+ const std::string& postBuf,
+ const std::string& contentType,
+ const IoCallback& notifyUnbufferedIO /*throw X*/,
+ const Zstring& userAgent,
+ const Zstring& caCertFilePath) //throw SysError, X
{
- return sendHttpRequestImpl(url, &postBuf, contentType, userAgent, caCertFilePath, notifyUnbufferedIO); //throw SysError, X
+ return sendHttpRequestImpl(url, &postBuf, contentType, notifyUnbufferedIO, userAgent, caCertFilePath); //throw SysError, X
}
@@ -347,10 +377,10 @@ bool zen::internetIsAlive() //noexcept
auto response = std::make_unique<HttpInputStream::Impl>(Zstr("https://www.google.com/"), //https more appropriate than http for testing? (different ports!)
nullptr /*postParams*/,
"" /*contentType*/,
+ nullptr /*onPostBytesSent*/,
true /*disableGetCache*/,
Zstr("FreeFileSync"),
- Zstring() /*caCertFilePath*/,
- nullptr /*notifyUnbufferedIO*/); //throw SysError
+ Zstring() /*caCertFilePath*/); //throw SysError
const int statusCode = response->getStatusCode();
//attention: google.com might redirect to https://consent.google.com => don't follow, just return "true"!!!
bgstack15