Loading src/http.cpp +194 −0 Original line number Diff line number Diff line Loading @@ -1254,6 +1254,200 @@ size_t libhttppp::HttpClient::readBodyChunk(char *buf, size_t bufsize) { return take; } // ── HTTP/2 streaming mode ── if (_streamMode == STREAM_H2) { // First drain any body data buffered during GetStream size_t got = drainBuf(bufsize); if (got > 0) return got; if (_streamH2EndStream) { _streamMode = STREAM_NONE; return 0; } // Read more H2 frames until we get DATA for our stream auto h2_ensure_bytes = [&](size_t need) { while (_streamH2Raw.size() < need) { netplus::buffer fbuf(BLOCKSIZE); size_t n = _recvBlocking(fbuf); if (n == 0) { _streamH2EndStream = true; return; } _streamH2Raw.insert(_streamH2Raw.end(), reinterpret_cast<uint8_t*>(fbuf.data.buf), reinterpret_cast<uint8_t*>(fbuf.data.buf) + n); } }; while (_streamBuf.empty() && !_streamH2EndStream) { h2_ensure_bytes(H2C_FRAME_HEADER_LEN); if (_streamH2EndStream) break; uint32_t frame_len = (static_cast<uint32_t>(_streamH2Raw[0]) << 16) | (static_cast<uint32_t>(_streamH2Raw[1]) << 8) | static_cast<uint32_t>(_streamH2Raw[2]); uint8_t frame_type = _streamH2Raw[3]; uint8_t frame_flags = _streamH2Raw[4]; uint32_t frame_stream = ((static_cast<uint32_t>(_streamH2Raw[5]) & 0x7f) << 24) | (static_cast<uint32_t>(_streamH2Raw[6]) << 16) | (static_cast<uint32_t>(_streamH2Raw[7]) << 8) | static_cast<uint32_t>(_streamH2Raw[8]); h2_ensure_bytes(H2C_FRAME_HEADER_LEN + frame_len); if (_streamH2EndStream) break; const uint8_t *payload = _streamH2Raw.data() + H2C_FRAME_HEADER_LEN; switch (frame_type) { case H2C_FRAME_SETTINGS: { if (!(frame_flags & H2C_FLAG_ACK)) _sendAll(h2cBuildSettingsAck()); break; } case H2C_FRAME_DATA: { if (frame_stream == _streamH2Sid) { _streamBuf.insert(_streamBuf.end(), payload, payload + frame_len); _streamBufPos = 0; if (frame_len > 0) { _sendAll(h2cBuildWindowUpdate(0, frame_len)); _sendAll(h2cBuildWindowUpdate(_streamH2Sid, frame_len)); } if (frame_flags & H2C_FLAG_END_STREAM) _streamH2EndStream = true; } break; } case H2C_FRAME_HEADERS: { // Trailing headers if (frame_stream == _streamH2Sid && (frame_flags & H2C_FLAG_END_STREAM)) _streamH2EndStream = true; break; } case H2C_FRAME_PING: { if (!(frame_flags & H2C_FLAG_ACK)) { std::string ping_payload(reinterpret_cast<const char*>(payload), frame_len); _sendAll(h2cBuildFrame(H2C_FRAME_PING, H2C_FLAG_ACK, 0, ping_payload)); } break; } case H2C_FRAME_GOAWAY: _streamH2EndStream = true; break; default: break; } _streamH2Raw.erase(_streamH2Raw.begin(), _streamH2Raw.begin() + H2C_FRAME_HEADER_LEN + frame_len); } // Drain any newly received body data got = drainBuf(bufsize); if (got > 0) return got; if (_streamH2EndStream) { _streamMode = STREAM_NONE; } return 0; } // ── HTTP/3 streaming mode ── if (_streamMode == STREAM_H3) { // First drain any body data buffered during GetStream / previous reads if (!_streamH3Body.empty()) { size_t have = _streamH3Body.size(); size_t take = (std::min)(have, bufsize); std::memcpy(buf, _streamH3Body.data(), take); _streamH3Body.erase(_streamH3Body.begin(), _streamH3Body.begin() + (ptrdiff_t)take); return take; } if (_streamH3EndStream) { _streamMode = STREAM_NONE; return 0; } auto *q = dynamic_cast<netplus::quic*>(_cltsock.get()); if (!q) { _streamMode = STREAM_NONE; return 0; } // Pump network and read stream data auto h3_pump = [&]() { for (int pump_i = 0; pump_i < 256; ++pump_i) { try { q->pumpNetwork(MSG_DONTWAIT); } catch (netplus::NetException &e) { if (e.getErrorType() != netplus::NetException::Note) throw; break; } } }; // Try reading data (with pump if needed) uint8_t rbuf[4096]; size_t n = q->recvStreamData(_streamH3Sid, rbuf, sizeof(rbuf)); if (n == 0) { h3_pump(); n = q->recvStreamData(_streamH3Sid, rbuf, sizeof(rbuf)); if (n == 0) { if (q->isStreamComplete(_streamH3Sid) && _streamH3Raw.empty()) { _streamH3EndStream = true; _streamMode = STREAM_NONE; } return 0; } } _streamH3Raw.insert(_streamH3Raw.end(), rbuf, rbuf + n); // Parse H3 frames size_t pos = 0; while (pos < _streamH3Raw.size()) { size_t frame_start = pos; size_t bytes = 0; uint64_t frame_type = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) break; pos += bytes; if (pos >= _streamH3Raw.size()) { pos = frame_start; break; } uint64_t frame_len = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) { pos = frame_start; break; } pos += bytes; if (pos + frame_len > _streamH3Raw.size()) { pos = frame_start; break; } if (frame_type == 0x00) { // DATA _streamH3Body.insert(_streamH3Body.end(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos + frame_len)); } // Skip HEADERS and other frames pos += frame_len; } if (pos > 0) _streamH3Raw.erase(_streamH3Raw.begin(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos)); // Check stream completion if (q->isStreamComplete(_streamH3Sid) && _streamH3Raw.empty() && _streamH3Body.empty()) _streamH3EndStream = true; // Return decoded body data if (!_streamH3Body.empty()) { size_t have = _streamH3Body.size(); size_t take = (std::min)(have, bufsize); std::memcpy(buf, _streamH3Body.data(), take); _streamH3Body.erase(_streamH3Body.begin(), _streamH3Body.begin() + (ptrdiff_t)take); return take; } if (_streamH3EndStream) { _streamMode = STREAM_NONE; } return 0; } return 0; } Loading Loading
src/http.cpp +194 −0 Original line number Diff line number Diff line Loading @@ -1254,6 +1254,200 @@ size_t libhttppp::HttpClient::readBodyChunk(char *buf, size_t bufsize) { return take; } // ── HTTP/2 streaming mode ── if (_streamMode == STREAM_H2) { // First drain any body data buffered during GetStream size_t got = drainBuf(bufsize); if (got > 0) return got; if (_streamH2EndStream) { _streamMode = STREAM_NONE; return 0; } // Read more H2 frames until we get DATA for our stream auto h2_ensure_bytes = [&](size_t need) { while (_streamH2Raw.size() < need) { netplus::buffer fbuf(BLOCKSIZE); size_t n = _recvBlocking(fbuf); if (n == 0) { _streamH2EndStream = true; return; } _streamH2Raw.insert(_streamH2Raw.end(), reinterpret_cast<uint8_t*>(fbuf.data.buf), reinterpret_cast<uint8_t*>(fbuf.data.buf) + n); } }; while (_streamBuf.empty() && !_streamH2EndStream) { h2_ensure_bytes(H2C_FRAME_HEADER_LEN); if (_streamH2EndStream) break; uint32_t frame_len = (static_cast<uint32_t>(_streamH2Raw[0]) << 16) | (static_cast<uint32_t>(_streamH2Raw[1]) << 8) | static_cast<uint32_t>(_streamH2Raw[2]); uint8_t frame_type = _streamH2Raw[3]; uint8_t frame_flags = _streamH2Raw[4]; uint32_t frame_stream = ((static_cast<uint32_t>(_streamH2Raw[5]) & 0x7f) << 24) | (static_cast<uint32_t>(_streamH2Raw[6]) << 16) | (static_cast<uint32_t>(_streamH2Raw[7]) << 8) | static_cast<uint32_t>(_streamH2Raw[8]); h2_ensure_bytes(H2C_FRAME_HEADER_LEN + frame_len); if (_streamH2EndStream) break; const uint8_t *payload = _streamH2Raw.data() + H2C_FRAME_HEADER_LEN; switch (frame_type) { case H2C_FRAME_SETTINGS: { if (!(frame_flags & H2C_FLAG_ACK)) _sendAll(h2cBuildSettingsAck()); break; } case H2C_FRAME_DATA: { if (frame_stream == _streamH2Sid) { _streamBuf.insert(_streamBuf.end(), payload, payload + frame_len); _streamBufPos = 0; if (frame_len > 0) { _sendAll(h2cBuildWindowUpdate(0, frame_len)); _sendAll(h2cBuildWindowUpdate(_streamH2Sid, frame_len)); } if (frame_flags & H2C_FLAG_END_STREAM) _streamH2EndStream = true; } break; } case H2C_FRAME_HEADERS: { // Trailing headers if (frame_stream == _streamH2Sid && (frame_flags & H2C_FLAG_END_STREAM)) _streamH2EndStream = true; break; } case H2C_FRAME_PING: { if (!(frame_flags & H2C_FLAG_ACK)) { std::string ping_payload(reinterpret_cast<const char*>(payload), frame_len); _sendAll(h2cBuildFrame(H2C_FRAME_PING, H2C_FLAG_ACK, 0, ping_payload)); } break; } case H2C_FRAME_GOAWAY: _streamH2EndStream = true; break; default: break; } _streamH2Raw.erase(_streamH2Raw.begin(), _streamH2Raw.begin() + H2C_FRAME_HEADER_LEN + frame_len); } // Drain any newly received body data got = drainBuf(bufsize); if (got > 0) return got; if (_streamH2EndStream) { _streamMode = STREAM_NONE; } return 0; } // ── HTTP/3 streaming mode ── if (_streamMode == STREAM_H3) { // First drain any body data buffered during GetStream / previous reads if (!_streamH3Body.empty()) { size_t have = _streamH3Body.size(); size_t take = (std::min)(have, bufsize); std::memcpy(buf, _streamH3Body.data(), take); _streamH3Body.erase(_streamH3Body.begin(), _streamH3Body.begin() + (ptrdiff_t)take); return take; } if (_streamH3EndStream) { _streamMode = STREAM_NONE; return 0; } auto *q = dynamic_cast<netplus::quic*>(_cltsock.get()); if (!q) { _streamMode = STREAM_NONE; return 0; } // Pump network and read stream data auto h3_pump = [&]() { for (int pump_i = 0; pump_i < 256; ++pump_i) { try { q->pumpNetwork(MSG_DONTWAIT); } catch (netplus::NetException &e) { if (e.getErrorType() != netplus::NetException::Note) throw; break; } } }; // Try reading data (with pump if needed) uint8_t rbuf[4096]; size_t n = q->recvStreamData(_streamH3Sid, rbuf, sizeof(rbuf)); if (n == 0) { h3_pump(); n = q->recvStreamData(_streamH3Sid, rbuf, sizeof(rbuf)); if (n == 0) { if (q->isStreamComplete(_streamH3Sid) && _streamH3Raw.empty()) { _streamH3EndStream = true; _streamMode = STREAM_NONE; } return 0; } } _streamH3Raw.insert(_streamH3Raw.end(), rbuf, rbuf + n); // Parse H3 frames size_t pos = 0; while (pos < _streamH3Raw.size()) { size_t frame_start = pos; size_t bytes = 0; uint64_t frame_type = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) break; pos += bytes; if (pos >= _streamH3Raw.size()) { pos = frame_start; break; } uint64_t frame_len = h3DecodeVarInt(_streamH3Raw.data() + pos, _streamH3Raw.size() - pos, bytes); if (bytes == 0) { pos = frame_start; break; } pos += bytes; if (pos + frame_len > _streamH3Raw.size()) { pos = frame_start; break; } if (frame_type == 0x00) { // DATA _streamH3Body.insert(_streamH3Body.end(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos + frame_len)); } // Skip HEADERS and other frames pos += frame_len; } if (pos > 0) _streamH3Raw.erase(_streamH3Raw.begin(), _streamH3Raw.begin() + static_cast<ptrdiff_t>(pos)); // Check stream completion if (q->isStreamComplete(_streamH3Sid) && _streamH3Raw.empty() && _streamH3Body.empty()) _streamH3EndStream = true; // Return decoded body data if (!_streamH3Body.empty()) { size_t have = _streamH3Body.size(); size_t take = (std::min)(have, bufsize); std::memcpy(buf, _streamH3Body.data(), take); _streamH3Body.erase(_streamH3Body.begin(), _streamH3Body.begin() + (ptrdiff_t)take); return take; } if (_streamH3EndStream) { _streamMode = STREAM_NONE; } return 0; } return 0; } Loading