Commit c8a9780c authored by jan.koester's avatar jan.koester
Browse files

test

parent a7af54f1
Loading
Loading
Loading
Loading
+340 −0
Original line number Diff line number Diff line
@@ -640,6 +640,346 @@ std::vector<char> libhttppp::HttpClient::_h1ReadResponse(const std::string &labe
    return ret;
}

// ── Streaming API ──────────────────────────────────────────────────────

bool libhttppp::HttpClient::isStreaming() const {
    return _streamMode != STREAM_NONE;
}

libhttppp::HttpResponse libhttppp::HttpClient::GetStream(libhttppp::HttpRequest &nreq) {
    try {
        _ensureConnected();

        // Reset any previous streaming state
        _streamMode = STREAM_NONE;
        _streamBuf.clear();
        _streamBufPos = 0;
        _streamRemaining = 0;
        _streamChunkDone = false;
        _streamChunkRemaining = 0;

        std::stringstream host;
        host << _url.getHost() << ":" << _url.getPort();

        nreq.setHeaderData("host")->push_back(host.str());
        nreq.setRequestType(GETREQUEST);

        if (nreq.getRequestURL().empty())
            nreq.setRequestURL(_url.getPath());

        if (nreq.getRequestVersion().empty())
            nreq.setRequestVersion(HTTPVERSION(1.1));

        // Send request header
        {
            std::string header;
            nreq.printHeader(header);
            if (!_cltsock || _cltsock->fd() < 0)
                resetConnection();
            _sendAll(header);
        }

        // Read until we have full response header (\r\n\r\n)
        std::vector<char> raw;
        raw.reserve(16384);
        size_t header_end = std::string::npos;

        for (;;) {
            if (raw.size() >= 4) {
                for (size_t i = 0; i + 3 < raw.size(); ++i) {
                    if (raw[i] == '\r' && raw[i+1] == '\n' &&
                        raw[i+2] == '\r' && raw[i+3] == '\n')
                    {
                        header_end = i + 4;
                        break;
                    }
                }
            }
            if (header_end != std::string::npos)
                break;

            netplus::buffer buf(BLOCKSIZE);
            size_t n = _recvBlocking(buf);
            if (n == 0) {
                netplus::NetException ne;
                ne[netplus::NetException::Error] << "HTTP GetStream: EOF while reading header";
                throw ne;
            }
            raw.insert(raw.end(), buf.data.buf, buf.data.buf + n);
        }

        // Parse header
        libhttppp::HttpResponse res;
        size_t parsed_hsize = res.parse(raw.data(), raw.size());

        if (parsed_hsize == 0 || parsed_hsize > raw.size()) {
            libhttppp::HTTPException he;
            he[libhttppp::HTTPException::Error] << "HTTP GetStream: response header parse failed";
            throw he;
        }

        // Save leftover body data that was read together with headers
        size_t body_off = parsed_hsize;
        if (raw.size() > body_off) {
            _streamBuf.assign(raw.begin() + (ptrdiff_t)body_off, raw.end());
        }
        _streamBufPos = 0;

        // Determine transfer mode
        bool chunked = false;
        ptrdiff_t content_len = -1;

        try {
            for (libhttppp::HttpHeader::HeaderData::Values *v = res.getTransferEncoding();
                 v; v = v->nextvalue())
            {
                std::string val = tolower_copy(v->getvalue());
                if (val == "chunked") {
                    chunked = true;
                    break;
                }
            }
        } catch (...) {}

        if (!chunked) {
            try {
                content_len = res.getContentLength();
                if (content_len < 0) content_len = -1;
            } catch (...) {
                content_len = -1;
            }
        }

        if (chunked) {
            _streamMode = STREAM_CHUNKED;
            _streamChunkDone = false;
            _streamChunkRemaining = 0;
        } else if (content_len >= 0) {
            _streamMode = STREAM_CONTENT_LENGTH;
            _streamRemaining = (size_t)content_len;
            // Subtract what's already in the leftover buffer
            size_t have = _streamBuf.size() - _streamBufPos;
            if (have > _streamRemaining) {
                // More data than content-length (shouldn't happen, but be safe)
                _streamBuf.resize(_streamBufPos + _streamRemaining);
            }
        } else {
            _streamMode = STREAM_EOF;
        }

        return res;

    } catch (netplus::NetException &e) {
        libhttppp::HTTPException ee;
        ee[libhttppp::HTTPException::Error] << e.what();
        throw ee;
    }
}

size_t libhttppp::HttpClient::readBodyChunk(char *buf, size_t bufsize) {
    if (_streamMode == STREAM_NONE || bufsize == 0)
        return 0;

    // Helper: drain from internal leftover buffer
    auto drainBuf = [&](size_t maxBytes) -> size_t {
        size_t have = _streamBuf.size() - _streamBufPos;
        if (have == 0) return 0;
        size_t take = (std::min)(have, maxBytes);
        std::memcpy(buf, _streamBuf.data() + _streamBufPos, take);
        _streamBufPos += take;
        // Compact buffer when fully consumed
        if (_streamBufPos == _streamBuf.size()) {
            _streamBuf.clear();
            _streamBufPos = 0;
        }
        return take;
    };

    // Helper: receive more data into internal buffer
    auto recvMore = [&]() -> size_t {
        netplus::buffer nb(BLOCKSIZE);
        size_t n = _recvBlocking(nb);
        if (n > 0) {
            if (_streamBufPos > 0 && _streamBufPos < _streamBuf.size()) {
                _streamBuf.erase(_streamBuf.begin(),
                                 _streamBuf.begin() + (ptrdiff_t)_streamBufPos);
                _streamBufPos = 0;
            }
            _streamBuf.insert(_streamBuf.end(), nb.data.buf, nb.data.buf + n);
        }
        return n;
    };

    // ── Content-Length mode ──
    if (_streamMode == STREAM_CONTENT_LENGTH) {
        if (_streamRemaining == 0) {
            _streamMode = STREAM_NONE;
            return 0;
        }

        size_t want = (std::min)(bufsize, _streamRemaining);

        // First try leftover buffer
        size_t got = drainBuf(want);
        if (got > 0) {
            _streamRemaining -= got;
            if (_streamRemaining == 0)
                _streamMode = STREAM_NONE;
            return got;
        }

        // Read from socket
        netplus::buffer nb(BLOCKSIZE);
        size_t n = _recvBlocking(nb);
        if (n == 0) {
            _streamMode = STREAM_NONE;
            return 0;
        }
        size_t take = (std::min)({n, want, bufsize});
        std::memcpy(buf, nb.data.buf, take);
        _streamRemaining -= take;

        // Save excess data for next call
        if (take < n) {
            _streamBuf.assign(nb.data.buf + take, nb.data.buf + n);
            _streamBufPos = 0;
        }

        if (_streamRemaining == 0)
            _streamMode = STREAM_NONE;
        return take;
    }

    // ── Chunked mode ──
    if (_streamMode == STREAM_CHUNKED) {
        if (_streamChunkDone) {
            _streamMode = STREAM_NONE;
            return 0;
        }

        // Helper: read a line from the stream buffer (refilling as needed)
        auto readLine = [&]() -> std::string {
            for (;;) {
                size_t have = _streamBuf.size() - _streamBufPos;
                for (size_t i = _streamBufPos; i + 1 < _streamBuf.size(); ++i) {
                    if (_streamBuf[i] == '\r' && _streamBuf[i+1] == '\n') {
                        std::string line(_streamBuf.begin() + (ptrdiff_t)_streamBufPos,
                                         _streamBuf.begin() + (ptrdiff_t)i);
                        _streamBufPos = i + 2;
                        return line;
                    }
                }
                // Need more data
                if (recvMore() == 0) {
                    _streamChunkDone = true;
                    return {};
                }
            }
        };

        // If we're in the middle of a chunk, continue reading chunk data
        if (_streamChunkRemaining > 0) {
            size_t want = (std::min)(bufsize, _streamChunkRemaining);
            size_t got = drainBuf(want);
            if (got > 0) {
                _streamChunkRemaining -= got;
                if (_streamChunkRemaining == 0) {
                    // Skip chunk terminator \r\n
                    while (_streamBuf.size() - _streamBufPos < 2)
                        recvMore();
                    _streamBufPos += 2;
                }
                return got;
            }
            // Read from socket directly
            netplus::buffer nb(BLOCKSIZE);
            size_t n = _recvBlocking(nb);
            if (n == 0) {
                _streamChunkDone = true;
                _streamMode = STREAM_NONE;
                return 0;
            }
            size_t take = (std::min)({n, want, bufsize});
            std::memcpy(buf, nb.data.buf, take);
            _streamChunkRemaining -= take;
            if (take < n) {
                _streamBuf.assign(nb.data.buf + take, nb.data.buf + n);
                _streamBufPos = 0;
            }
            if (_streamChunkRemaining == 0) {
                // Skip chunk terminator \r\n
                while (_streamBuf.size() - _streamBufPos < 2)
                    recvMore();
                _streamBufPos += 2;
            }
            return take;
        }

        // Read next chunk size line
        std::string szline = readLine();
        if (szline.empty() && _streamChunkDone) {
            _streamMode = STREAM_NONE;
            return 0;
        }
        auto sem = szline.find(';');
        if (sem != std::string::npos) szline.resize(sem);

        size_t chunk_size = 0;
        try {
            chunk_size = (size_t)std::stoul(szline, nullptr, 16);
        } catch (...) {
            _streamChunkDone = true;
            _streamMode = STREAM_NONE;
            return 0;
        }

        if (chunk_size == 0) {
            // Final chunk — consume trailers
            for (;;) {
                std::string tline = readLine();
                if (tline.empty()) break;
            }
            _streamChunkDone = true;
            _streamMode = STREAM_NONE;
            return 0;
        }

        _streamChunkRemaining = chunk_size;
        // Recurse to actually read data from this chunk
        return readBodyChunk(buf, bufsize);
    }

    // ── EOF mode (no content-length, not chunked) ──
    if (_streamMode == STREAM_EOF) {
        // First drain leftover buffer
        size_t got = drainBuf(bufsize);
        if (got > 0) return got;

        // Read from socket; EOF signals end of body
        netplus::buffer nb(BLOCKSIZE);
        size_t n = 0;
        try {
            n = _recvBlocking(nb);
        } catch (...) {
            _streamMode = STREAM_NONE;
            return 0;
        }
        if (n == 0) {
            _streamMode = STREAM_NONE;
            return 0;
        }
        size_t take = (std::min)(n, bufsize);
        std::memcpy(buf, nb.data.buf, take);
        if (take < n) {
            _streamBuf.assign(nb.data.buf + take, nb.data.buf + n);
            _streamBufPos = 0;
        }
        return take;
    }

    return 0;
}

static bool is_redirect_status(int code) {
  return code == 301 || code == 302 || code == 303 || code == 307 || code == 308;
}
+22 −0
Original line number Diff line number Diff line
@@ -92,6 +92,17 @@ namespace libhttppp {
      const std::vector<char> Put(HttpRequest &nreq,const std::vector<char> &put);
      const std::vector<char> Delete(HttpRequest &nreq);

      // Streaming API: send request, return parsed response headers only.
      // After this call, use readBodyChunk() to read body data incrementally.
      HttpResponse GetStream(HttpRequest &nreq);

      // Read the next chunk of body data (up to bufsize bytes).
      // Returns number of bytes written to buf, 0 when body is complete.
      size_t readBodyChunk(char *buf, size_t bufsize);

      // True while a streaming read is in progress.
      bool isStreaming() const;

      // Shared TLS session cache — enables abbreviated TLS 1.2 handshakes
      // across reconnects to the same host.  One cache per process.
      static netplus::TlsSessionCache& tlsSessionCache();
@@ -123,6 +134,17 @@ namespace libhttppp {
      const std::vector<char> _h3Request(const std::string &method,
                  HttpRequest &nreq,
                  const std::vector<char> *postBody = nullptr);

      // Streaming state
      enum StreamMode { STREAM_NONE, STREAM_CONTENT_LENGTH, STREAM_CHUNKED, STREAM_EOF };
      StreamMode _streamMode = STREAM_NONE;
      size_t _streamRemaining = 0;        // bytes left for content-length mode
      std::vector<char> _streamBuf;       // leftover data from header read
      size_t _streamBufPos = 0;
      // Chunked streaming sub-state
      bool _streamChunkDone = false;      // true after final 0-length chunk
      size_t _streamChunkRemaining = 0;   // bytes left in current chunk

  private:
      HttpUrl _url;
      std::unique_ptr<netplus::socket> _cltsock;