Loading src/http.cpp +267 −14 Original line number Diff line number Diff line Loading @@ -657,6 +657,13 @@ libhttppp::HttpResponse libhttppp::HttpClient::GetStream(libhttppp::HttpRequest _streamRemaining = 0; _streamChunkDone = false; _streamChunkRemaining = 0; _streamH2Sid = 0; _streamH2EndStream = false; _streamH2Raw.clear(); _streamH3Sid = 0; _streamH3EndStream = false; _streamH3Raw.clear(); _streamH3Body.clear(); std::stringstream host; host << _url.getHost() << ":" << _url.getPort(); Loading @@ -667,30 +674,276 @@ libhttppp::HttpResponse libhttppp::HttpClient::GetStream(libhttppp::HttpRequest if (nreq.getRequestURL().empty()) nreq.setRequestURL(_url.getPath()); // HTTP/3 (QUIC) or HTTP/2: use full request, buffer response for streaming if (dynamic_cast<netplus::quic*>(_cltsock.get())) { auto data = _h3Request("GET", nreq); // ── HTTP/3 (QUIC) streaming path ── if (auto *q = dynamic_cast<netplus::quic*>(_cltsock.get())) { // Send HTTP/3 control streams once per connection if (!_h3ControlSent) { uint64_t ctrl_id = q->openStream(false); std::vector<uint8_t> ctrl_payload; ctrl_payload.push_back(0x00); ctrl_payload.push_back(0x04); ctrl_payload.push_back(0x00); q->sendStreamData(ctrl_id, ctrl_payload, false); uint64_t enc_id = q->openStream(false); std::vector<uint8_t> enc_payload = {0x02}; q->sendStreamData(enc_id, enc_payload, false); uint64_t dec_id = q->openStream(false); std::vector<uint8_t> dec_payload = {0x03}; q->sendStreamData(dec_id, dec_payload, false); _h3ControlSent = true; } std::string path = nreq.getRequestURL(); if (path.empty()) path = _url.getPath(); std::string scheme = (_url.getProtocol() == HttpUrl::HTTP) ? "http" : "https"; std::stringstream authority; authority << _url.getHost() << ":" << _url.getPort(); std::vector<qpack::HeaderField> extra; for (HttpHeader::HeaderData *hd = nreq.getfirstHeaderData(); hd; hd = hd->nextHeaderData()) { const std::string &key = hd->getkey(); if (key.empty() || key[0] == ':') continue; if (key == "host" || key == "connection") continue; for (HttpHeader::HeaderData::Values *v = hd->getfirstValue(); v; v = v->nextvalue()) extra.push_back({key, v->getvalue(), false}); } std::vector<uint8_t> headers = qpack::Encoder::encodeRequestHeaders( "GET", scheme, authority.str(), path, extra, false, 0); std::vector<uint8_t> stream_payload; h3AppendFrame(stream_payload, 0x01, headers); uint64_t stream_id = q->openStream(true); q->sendStreamData(stream_id, stream_payload, true); // Read until we have the HEADERS frame libhttppp::HttpResponse res; res.setContentLength(data.size()); _streamBuf = std::move(data); _streamBufPos = 0; _streamMode = STREAM_CONTENT_LENGTH; _streamRemaining = _streamBuf.size(); bool got_headers = false; _streamH3Sid = stream_id; auto h3_pump = [&]() { for (int pump_i = 0; pump_i < 256; ++pump_i) { try { q->pumpNetwork(MSG_DONTWAIT); } catch (netplus::NetException &e) { if (e.getErrorType() != netplus::NetException::Note) throw; break; } } }; while (!got_headers) { uint8_t rbuf[4096]; size_t n = q->recvStreamData(stream_id, rbuf, sizeof(rbuf)); if (n == 0) { h3_pump(); n = q->recvStreamData(stream_id, rbuf, sizeof(rbuf)); if (n == 0) { _sw.waitRead(*_cltsock, 100); continue; } } _streamH3Raw.insert(_streamH3Raw.end(), rbuf, rbuf + n); // Try to parse frames size_t pos = 0; while (pos < _streamH3Raw.size()) { size_t frame_start = pos; size_t bytes = 0; uint64_t frame_type = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) break; pos += bytes; if (pos >= _streamH3Raw.size()) { pos = frame_start; break; } uint64_t frame_len = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) { pos = frame_start; break; } pos += bytes; if (pos + frame_len > _streamH3Raw.size()) { pos = frame_start; break; } if (frame_type == 0x01) { // HEADERS auto decoded = qpack::Decoder::decode(_streamH3Raw.data() + pos, frame_len); for (const auto &h : decoded) { if (h.name == ":status") { try { int code = std::stoi(h.value); res.setState(std::string("HTTP/3 ") + h.value); } catch (...) {} } else if (h.name == "content-length") { try { res.setContentLength(static_cast<size_t>(std::stoul(h.value))); _streamRemaining = static_cast<size_t>(std::stoul(h.value)); } catch (...) {} } else if (h.name == "content-type") { res.setContentType(h.value); } else { res.setHeaderData(h.name.c_str())->push_back(h.value); } } got_headers = true; } else if (frame_type == 0x00) { // DATA — came with headers _streamH3Body.insert(_streamH3Body.end(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos + frame_len)); } pos += frame_len; } if (pos > 0) _streamH3Raw.erase(_streamH3Raw.begin(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos)); } _streamMode = STREAM_H3; _streamH3EndStream = q->isStreamComplete(stream_id) && _streamH3Raw.empty(); return res; } // ── HTTP/2 streaming path ── if (_isH2) { auto data = _h2Request("GET", nreq); if (!_h2PrefaceSent) { std::string preface(H2C_CLIENT_PREFACE, H2C_CLIENT_PREFACE_LEN); preface += h2cBuildSettings(); _sendAll(preface); _h2PrefaceSent = true; } std::string path = nreq.getRequestURL(); if (path.empty()) path = _url.getPath(); std::string scheme = (_url.getProtocol() == HttpUrl::HTTPS) ? "https" : "http"; std::stringstream auth; auth << _url.getHost() << ":" << _url.getPort(); std::vector<hpack::HeaderField> extra; for (HttpHeader::HeaderData *hd = nreq.getfirstHeaderData(); hd; hd = hd->nextHeaderData()) { const std::string &key = hd->getkey(); if (key.empty() || key[0] == ':') continue; if (key == "host") continue; for (HttpHeader::HeaderData::Values *v = hd->getfirstValue(); v; v = v->nextvalue()) extra.push_back({key, v->getvalue(), false}); } std::string hpack_block = hpack::Encoder::encodeRequestHeaders( "GET", path, scheme, auth.str(), extra); uint32_t stream_id = _h2NextStreamId; _h2NextStreamId += 2; uint8_t hdr_flags = H2C_FLAG_END_HEADERS | H2C_FLAG_END_STREAM; std::string headers_frame = h2cBuildFrame(H2C_FRAME_HEADERS, hdr_flags, stream_id, hpack_block); _sendAll(headers_frame); if (!_h2Decoder) _h2Decoder = std::make_unique<hpack::Decoder>(); // Read until we get response HEADERS for our stream libhttppp::HttpResponse res; res.setContentLength(data.size()); _streamBuf = std::move(data); bool got_headers = false; _streamH2Sid = stream_id; _streamH2EndStream = false; auto h2_ensure_bytes = [&](size_t need) { while (_streamH2Raw.size() < need) { netplus::buffer buf(BLOCKSIZE); size_t n = _recvBlocking(buf); if (n == 0) { HTTPException he; he[HTTPException::Error] << "HTTP/2 GetStream: EOF while reading frames"; throw he; } _streamH2Raw.insert(_streamH2Raw.end(), reinterpret_cast<uint8_t*>(buf.data.buf), reinterpret_cast<uint8_t*>(buf.data.buf) + n); } }; while (!got_headers) { h2_ensure_bytes(H2C_FRAME_HEADER_LEN); uint32_t frame_len = (static_cast<uint32_t>(_streamH2Raw[0]) << 16) | (static_cast<uint32_t>(_streamH2Raw[1]) << 8) | static_cast<uint32_t>(_streamH2Raw[2]); uint8_t frame_type = _streamH2Raw[3]; uint8_t frame_flags = _streamH2Raw[4]; uint32_t frame_stream = ((static_cast<uint32_t>(_streamH2Raw[5]) & 0x7f) << 24) | (static_cast<uint32_t>(_streamH2Raw[6]) << 16) | (static_cast<uint32_t>(_streamH2Raw[7]) << 8) | static_cast<uint32_t>(_streamH2Raw[8]); h2_ensure_bytes(H2C_FRAME_HEADER_LEN + frame_len); const uint8_t *payload = _streamH2Raw.data() + H2C_FRAME_HEADER_LEN; switch (frame_type) { case H2C_FRAME_SETTINGS: { if (!(frame_flags & H2C_FLAG_ACK)) _sendAll(h2cBuildSettingsAck()); break; } case H2C_FRAME_HEADERS: { if (frame_stream == stream_id) { auto headers = _h2Decoder->decode(payload, frame_len); for (const auto &h : headers) { if (h.name == ":status") { try { res.setState(std::string("HTTP/2 ") + h.value); } catch (...) {} } else if (h.name == "content-length") { try { res.setContentLength(static_cast<size_t>(std::stoul(h.value))); _streamRemaining = static_cast<size_t>(std::stoul(h.value)); } catch (...) {} } else if (h.name == "content-type") { res.setContentType(h.value); } else { res.setHeaderData(h.name.c_str())->push_back(h.value); } } got_headers = true; if (frame_flags & H2C_FLAG_END_STREAM) _streamH2EndStream = true; } break; } case H2C_FRAME_DATA: { if (frame_stream == stream_id) { // Body data arrived with headers — save for readBodyChunk _streamBuf.insert(_streamBuf.end(), payload, payload + frame_len); if (frame_len > 0) { _sendAll(h2cBuildWindowUpdate(0, frame_len)); _sendAll(h2cBuildWindowUpdate(stream_id, frame_len)); } if (frame_flags & H2C_FLAG_END_STREAM) _streamH2EndStream = true; } break; } case H2C_FRAME_PING: { if (!(frame_flags & H2C_FLAG_ACK)) { std::string ping_payload(reinterpret_cast<const char*>(payload), frame_len); _sendAll(h2cBuildFrame(H2C_FRAME_PING, H2C_FLAG_ACK, 0, ping_payload)); } break; } case H2C_FRAME_GOAWAY: _streamH2EndStream = true; got_headers = true; break; default: break; } _streamH2Raw.erase(_streamH2Raw.begin(), _streamH2Raw.begin() + H2C_FRAME_HEADER_LEN + frame_len); } _streamMode = STREAM_H2; _streamBufPos = 0; _streamMode = STREAM_CONTENT_LENGTH; _streamRemaining = _streamBuf.size(); return res; } // HTTP/1.x streaming path // ── HTTP/1.x streaming path ── if (nreq.getRequestVersion().empty()) nreq.setRequestVersion(HTTPVERSION(1.1)); Loading Loading
src/http.cpp +267 −14 Original line number Diff line number Diff line Loading @@ -657,6 +657,13 @@ libhttppp::HttpResponse libhttppp::HttpClient::GetStream(libhttppp::HttpRequest _streamRemaining = 0; _streamChunkDone = false; _streamChunkRemaining = 0; _streamH2Sid = 0; _streamH2EndStream = false; _streamH2Raw.clear(); _streamH3Sid = 0; _streamH3EndStream = false; _streamH3Raw.clear(); _streamH3Body.clear(); std::stringstream host; host << _url.getHost() << ":" << _url.getPort(); Loading @@ -667,30 +674,276 @@ libhttppp::HttpResponse libhttppp::HttpClient::GetStream(libhttppp::HttpRequest if (nreq.getRequestURL().empty()) nreq.setRequestURL(_url.getPath()); // HTTP/3 (QUIC) or HTTP/2: use full request, buffer response for streaming if (dynamic_cast<netplus::quic*>(_cltsock.get())) { auto data = _h3Request("GET", nreq); // ── HTTP/3 (QUIC) streaming path ── if (auto *q = dynamic_cast<netplus::quic*>(_cltsock.get())) { // Send HTTP/3 control streams once per connection if (!_h3ControlSent) { uint64_t ctrl_id = q->openStream(false); std::vector<uint8_t> ctrl_payload; ctrl_payload.push_back(0x00); ctrl_payload.push_back(0x04); ctrl_payload.push_back(0x00); q->sendStreamData(ctrl_id, ctrl_payload, false); uint64_t enc_id = q->openStream(false); std::vector<uint8_t> enc_payload = {0x02}; q->sendStreamData(enc_id, enc_payload, false); uint64_t dec_id = q->openStream(false); std::vector<uint8_t> dec_payload = {0x03}; q->sendStreamData(dec_id, dec_payload, false); _h3ControlSent = true; } std::string path = nreq.getRequestURL(); if (path.empty()) path = _url.getPath(); std::string scheme = (_url.getProtocol() == HttpUrl::HTTP) ? "http" : "https"; std::stringstream authority; authority << _url.getHost() << ":" << _url.getPort(); std::vector<qpack::HeaderField> extra; for (HttpHeader::HeaderData *hd = nreq.getfirstHeaderData(); hd; hd = hd->nextHeaderData()) { const std::string &key = hd->getkey(); if (key.empty() || key[0] == ':') continue; if (key == "host" || key == "connection") continue; for (HttpHeader::HeaderData::Values *v = hd->getfirstValue(); v; v = v->nextvalue()) extra.push_back({key, v->getvalue(), false}); } std::vector<uint8_t> headers = qpack::Encoder::encodeRequestHeaders( "GET", scheme, authority.str(), path, extra, false, 0); std::vector<uint8_t> stream_payload; h3AppendFrame(stream_payload, 0x01, headers); uint64_t stream_id = q->openStream(true); q->sendStreamData(stream_id, stream_payload, true); // Read until we have the HEADERS frame libhttppp::HttpResponse res; res.setContentLength(data.size()); _streamBuf = std::move(data); _streamBufPos = 0; _streamMode = STREAM_CONTENT_LENGTH; _streamRemaining = _streamBuf.size(); bool got_headers = false; _streamH3Sid = stream_id; auto h3_pump = [&]() { for (int pump_i = 0; pump_i < 256; ++pump_i) { try { q->pumpNetwork(MSG_DONTWAIT); } catch (netplus::NetException &e) { if (e.getErrorType() != netplus::NetException::Note) throw; break; } } }; while (!got_headers) { uint8_t rbuf[4096]; size_t n = q->recvStreamData(stream_id, rbuf, sizeof(rbuf)); if (n == 0) { h3_pump(); n = q->recvStreamData(stream_id, rbuf, sizeof(rbuf)); if (n == 0) { _sw.waitRead(*_cltsock, 100); continue; } } _streamH3Raw.insert(_streamH3Raw.end(), rbuf, rbuf + n); // Try to parse frames size_t pos = 0; while (pos < _streamH3Raw.size()) { size_t frame_start = pos; size_t bytes = 0; uint64_t frame_type = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) break; pos += bytes; if (pos >= _streamH3Raw.size()) { pos = frame_start; break; } uint64_t frame_len = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) { pos = frame_start; break; } pos += bytes; if (pos + frame_len > _streamH3Raw.size()) { pos = frame_start; break; } if (frame_type == 0x01) { // HEADERS auto decoded = qpack::Decoder::decode(_streamH3Raw.data() + pos, frame_len); for (const auto &h : decoded) { if (h.name == ":status") { try { int code = std::stoi(h.value); res.setState(std::string("HTTP/3 ") + h.value); } catch (...) {} } else if (h.name == "content-length") { try { res.setContentLength(static_cast<size_t>(std::stoul(h.value))); _streamRemaining = static_cast<size_t>(std::stoul(h.value)); } catch (...) {} } else if (h.name == "content-type") { res.setContentType(h.value); } else { res.setHeaderData(h.name.c_str())->push_back(h.value); } } got_headers = true; } else if (frame_type == 0x00) { // DATA — came with headers _streamH3Body.insert(_streamH3Body.end(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos + frame_len)); } pos += frame_len; } if (pos > 0) _streamH3Raw.erase(_streamH3Raw.begin(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos)); } _streamMode = STREAM_H3; _streamH3EndStream = q->isStreamComplete(stream_id) && _streamH3Raw.empty(); return res; } // ── HTTP/2 streaming path ── if (_isH2) { auto data = _h2Request("GET", nreq); if (!_h2PrefaceSent) { std::string preface(H2C_CLIENT_PREFACE, H2C_CLIENT_PREFACE_LEN); preface += h2cBuildSettings(); _sendAll(preface); _h2PrefaceSent = true; } std::string path = nreq.getRequestURL(); if (path.empty()) path = _url.getPath(); std::string scheme = (_url.getProtocol() == HttpUrl::HTTPS) ? "https" : "http"; std::stringstream auth; auth << _url.getHost() << ":" << _url.getPort(); std::vector<hpack::HeaderField> extra; for (HttpHeader::HeaderData *hd = nreq.getfirstHeaderData(); hd; hd = hd->nextHeaderData()) { const std::string &key = hd->getkey(); if (key.empty() || key[0] == ':') continue; if (key == "host") continue; for (HttpHeader::HeaderData::Values *v = hd->getfirstValue(); v; v = v->nextvalue()) extra.push_back({key, v->getvalue(), false}); } std::string hpack_block = hpack::Encoder::encodeRequestHeaders( "GET", path, scheme, auth.str(), extra); uint32_t stream_id = _h2NextStreamId; _h2NextStreamId += 2; uint8_t hdr_flags = H2C_FLAG_END_HEADERS | H2C_FLAG_END_STREAM; std::string headers_frame = h2cBuildFrame(H2C_FRAME_HEADERS, hdr_flags, stream_id, hpack_block); _sendAll(headers_frame); if (!_h2Decoder) _h2Decoder = std::make_unique<hpack::Decoder>(); // Read until we get response HEADERS for our stream libhttppp::HttpResponse res; res.setContentLength(data.size()); _streamBuf = std::move(data); bool got_headers = false; _streamH2Sid = stream_id; _streamH2EndStream = false; auto h2_ensure_bytes = [&](size_t need) { while (_streamH2Raw.size() < need) { netplus::buffer buf(BLOCKSIZE); size_t n = _recvBlocking(buf); if (n == 0) { HTTPException he; he[HTTPException::Error] << "HTTP/2 GetStream: EOF while reading frames"; throw he; } _streamH2Raw.insert(_streamH2Raw.end(), reinterpret_cast<uint8_t*>(buf.data.buf), reinterpret_cast<uint8_t*>(buf.data.buf) + n); } }; while (!got_headers) { h2_ensure_bytes(H2C_FRAME_HEADER_LEN); uint32_t frame_len = (static_cast<uint32_t>(_streamH2Raw[0]) << 16) | (static_cast<uint32_t>(_streamH2Raw[1]) << 8) | static_cast<uint32_t>(_streamH2Raw[2]); uint8_t frame_type = _streamH2Raw[3]; uint8_t frame_flags = _streamH2Raw[4]; uint32_t frame_stream = ((static_cast<uint32_t>(_streamH2Raw[5]) & 0x7f) << 24) | (static_cast<uint32_t>(_streamH2Raw[6]) << 16) | (static_cast<uint32_t>(_streamH2Raw[7]) << 8) | static_cast<uint32_t>(_streamH2Raw[8]); h2_ensure_bytes(H2C_FRAME_HEADER_LEN + frame_len); const uint8_t *payload = _streamH2Raw.data() + H2C_FRAME_HEADER_LEN; switch (frame_type) { case H2C_FRAME_SETTINGS: { if (!(frame_flags & H2C_FLAG_ACK)) _sendAll(h2cBuildSettingsAck()); break; } case H2C_FRAME_HEADERS: { if (frame_stream == stream_id) { auto headers = _h2Decoder->decode(payload, frame_len); for (const auto &h : headers) { if (h.name == ":status") { try { res.setState(std::string("HTTP/2 ") + h.value); } catch (...) {} } else if (h.name == "content-length") { try { res.setContentLength(static_cast<size_t>(std::stoul(h.value))); _streamRemaining = static_cast<size_t>(std::stoul(h.value)); } catch (...) {} } else if (h.name == "content-type") { res.setContentType(h.value); } else { res.setHeaderData(h.name.c_str())->push_back(h.value); } } got_headers = true; if (frame_flags & H2C_FLAG_END_STREAM) _streamH2EndStream = true; } break; } case H2C_FRAME_DATA: { if (frame_stream == stream_id) { // Body data arrived with headers — save for readBodyChunk _streamBuf.insert(_streamBuf.end(), payload, payload + frame_len); if (frame_len > 0) { _sendAll(h2cBuildWindowUpdate(0, frame_len)); _sendAll(h2cBuildWindowUpdate(stream_id, frame_len)); } if (frame_flags & H2C_FLAG_END_STREAM) _streamH2EndStream = true; } break; } case H2C_FRAME_PING: { if (!(frame_flags & H2C_FLAG_ACK)) { std::string ping_payload(reinterpret_cast<const char*>(payload), frame_len); _sendAll(h2cBuildFrame(H2C_FRAME_PING, H2C_FLAG_ACK, 0, ping_payload)); } break; } case H2C_FRAME_GOAWAY: _streamH2EndStream = true; got_headers = true; break; default: break; } _streamH2Raw.erase(_streamH2Raw.begin(), _streamH2Raw.begin() + H2C_FRAME_HEADER_LEN + frame_len); } _streamMode = STREAM_H2; _streamBufPos = 0; _streamMode = STREAM_CONTENT_LENGTH; _streamRemaining = _streamBuf.size(); return res; } // HTTP/1.x streaming path // ── HTTP/1.x streaming path ── if (nreq.getRequestVersion().empty()) nreq.setRequestVersion(HTTPVERSION(1.1)); Loading