Loading src/httpd.cpp +78 −8 Original line number Diff line number Diff line Loading @@ -974,8 +974,12 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock, tempreq.setHeaderData("content-length")->push_back(std::to_string(bodyData.size())); } // Assign a thread-safe tid via round-robin so H3 requests // don't all collide on tid=0 resources (PG connections, etc.) int h3tid = _h3NextTid.fetch_add(1, std::memory_order_relaxed) % std::max(threads, 1); // Call user's RequestEvent RequestEvent(tempreq, 0, 0); RequestEvent(tempreq, h3tid, 0); // Read structured response info from :res-* pseudo-headers auto *resValid = tempreq.getHeaderData(":res-valid"); Loading Loading @@ -1012,29 +1016,95 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock, bool is_streaming = (content_length > 0 && body.size() < content_length); // If streaming, collect all remaining data // If streaming, send HEADERS immediately then push DATA // frames incrementally so the client receives data while // we are still reading from upstream (same as H2 path). if (is_streaming) { // Collect extra :res-* headers before we start sending std::vector<libhttppp::qpack::HeaderField> hdr_extra; for (auto *hd = tempreq.getfirstHeaderData(); hd; hd = hd->nextHeaderData()) { const std::string &ekey = hd->getkey(); if (ekey.substr(0, 5) != ":res-") continue; if (ekey == ":res-valid" || ekey == ":res-status" || ekey == ":res-content-type" || ekey == ":res-content-length") continue; std::string realKey = ekey.substr(5); std::string joined; for (auto *v = hd->getfirstValue(); v; v = v->nextvalue()) { if (!joined.empty()) joined += "; "; joined += v->getvalue(); } if (!joined.empty()) hdr_extra.push_back({realKey, joined}); } // Build and send H3 HEADERS frame immediately auto hdr_block = libhttppp::qpack::Encoder::encodeResponseHeaders( status_code, content_type.empty() ? "text/html" : content_type, content_length, hdr_extra); // H3 frame: type=0x01 (HEADERS), then varint length, then payload std::vector<uint8_t> h3hdr; h3hdr.push_back(0x01); // HEADERS frame type uint8_t lenbuf[8]; size_t lb = h3EncodeVarInt(hdr_block.size(), lenbuf); h3hdr.insert(h3hdr.end(), lenbuf, lenbuf + lb); h3hdr.insert(h3hdr.end(), hdr_block.begin(), hdr_block.end()); // Send initial body as DATA frame if any if (!body.empty()) { h3hdr.push_back(0x00); // DATA frame type lb = h3EncodeVarInt(body.size(), lenbuf); h3hdr.insert(h3hdr.end(), lenbuf, lenbuf + lb); h3hdr.insert(h3hdr.end(), body.begin(), body.end()); } q->sendStreamData(stream_id, h3hdr, false); size_t total_sent = body.size(); body.clear(); body.shrink_to_fit(); tempreq.SendData.pos = 0; size_t max_iter = content_length / BLOCKSIZE + 4096; size_t empty_streak = 0; const size_t max_empty = 600; // ~60s at 100ms waits const size_t max_empty = 600; for (size_t i = 0; i < max_iter && body.size() < content_length; i < max_iter && total_sent < content_length; ++i) { ResponseEvent(tempreq, 0, 0); ResponseEvent(tempreq, h3tid, 0); size_t sa = tempreq.SendData.size(); if (sa > 0) { body.append(tempreq.SendData.data(), sa); // Build H3 DATA frame and send incrementally std::vector<uint8_t> data_frame; data_frame.push_back(0x00); // DATA frame type lb = h3EncodeVarInt(sa, lenbuf); data_frame.insert(data_frame.end(), lenbuf, lenbuf + lb); data_frame.insert(data_frame.end(), tempreq.SendData.data(), tempreq.SendData.data() + sa); tempreq.SendData.clear(); total_sent += sa; bool last = (total_sent >= content_length); q->sendStreamData(stream_id, data_frame, last); empty_streak = 0; } else { ++empty_streak; if (empty_streak >= max_empty) break; // Yield briefly so upstream data can arrive std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } // If we didn't finish, close the stream anyway if (total_sent < content_length) { q->sendStreamData(stream_id, {}, true); } // Clean up :res-* pseudo-headers tempreq.deldata(":res-valid"); tempreq.deldata(":res-status"); tempreq.deldata(":res-content-type"); tempreq.deldata(":res-content-length"); return; } // Collect extra :res-* headers (e.g. set-cookie) Loading src/httpd.h +2 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ #include <netplus/socket.h> #include <netplus/eventapi.h> #include <atomic> #include <cstdint> #include <map> #include <memory> Loading Loading @@ -79,6 +80,7 @@ namespace libhttppp { std::mutex _h3BufferMutex; std::map<uint64_t, H3StreamBuffer> _h3StreamBuffers; std::set<netplus::socket*> _h3ControlStreamsSent; std::atomic<int> _h3NextTid{0}; void _initH3ControlStreams(netplus::quic *q); void _dispatchH2Stream(HttpRequest &cureq, std::string &out, uint32_t sid, Loading Loading
src/httpd.cpp +78 −8 Original line number Diff line number Diff line Loading @@ -974,8 +974,12 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock, tempreq.setHeaderData("content-length")->push_back(std::to_string(bodyData.size())); } // Assign a thread-safe tid via round-robin so H3 requests // don't all collide on tid=0 resources (PG connections, etc.) int h3tid = _h3NextTid.fetch_add(1, std::memory_order_relaxed) % std::max(threads, 1); // Call user's RequestEvent RequestEvent(tempreq, 0, 0); RequestEvent(tempreq, h3tid, 0); // Read structured response info from :res-* pseudo-headers auto *resValid = tempreq.getHeaderData(":res-valid"); Loading Loading @@ -1012,29 +1016,95 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock, bool is_streaming = (content_length > 0 && body.size() < content_length); // If streaming, collect all remaining data // If streaming, send HEADERS immediately then push DATA // frames incrementally so the client receives data while // we are still reading from upstream (same as H2 path). if (is_streaming) { // Collect extra :res-* headers before we start sending std::vector<libhttppp::qpack::HeaderField> hdr_extra; for (auto *hd = tempreq.getfirstHeaderData(); hd; hd = hd->nextHeaderData()) { const std::string &ekey = hd->getkey(); if (ekey.substr(0, 5) != ":res-") continue; if (ekey == ":res-valid" || ekey == ":res-status" || ekey == ":res-content-type" || ekey == ":res-content-length") continue; std::string realKey = ekey.substr(5); std::string joined; for (auto *v = hd->getfirstValue(); v; v = v->nextvalue()) { if (!joined.empty()) joined += "; "; joined += v->getvalue(); } if (!joined.empty()) hdr_extra.push_back({realKey, joined}); } // Build and send H3 HEADERS frame immediately auto hdr_block = libhttppp::qpack::Encoder::encodeResponseHeaders( status_code, content_type.empty() ? "text/html" : content_type, content_length, hdr_extra); // H3 frame: type=0x01 (HEADERS), then varint length, then payload std::vector<uint8_t> h3hdr; h3hdr.push_back(0x01); // HEADERS frame type uint8_t lenbuf[8]; size_t lb = h3EncodeVarInt(hdr_block.size(), lenbuf); h3hdr.insert(h3hdr.end(), lenbuf, lenbuf + lb); h3hdr.insert(h3hdr.end(), hdr_block.begin(), hdr_block.end()); // Send initial body as DATA frame if any if (!body.empty()) { h3hdr.push_back(0x00); // DATA frame type lb = h3EncodeVarInt(body.size(), lenbuf); h3hdr.insert(h3hdr.end(), lenbuf, lenbuf + lb); h3hdr.insert(h3hdr.end(), body.begin(), body.end()); } q->sendStreamData(stream_id, h3hdr, false); size_t total_sent = body.size(); body.clear(); body.shrink_to_fit(); tempreq.SendData.pos = 0; size_t max_iter = content_length / BLOCKSIZE + 4096; size_t empty_streak = 0; const size_t max_empty = 600; // ~60s at 100ms waits const size_t max_empty = 600; for (size_t i = 0; i < max_iter && body.size() < content_length; i < max_iter && total_sent < content_length; ++i) { ResponseEvent(tempreq, 0, 0); ResponseEvent(tempreq, h3tid, 0); size_t sa = tempreq.SendData.size(); if (sa > 0) { body.append(tempreq.SendData.data(), sa); // Build H3 DATA frame and send incrementally std::vector<uint8_t> data_frame; data_frame.push_back(0x00); // DATA frame type lb = h3EncodeVarInt(sa, lenbuf); data_frame.insert(data_frame.end(), lenbuf, lenbuf + lb); data_frame.insert(data_frame.end(), tempreq.SendData.data(), tempreq.SendData.data() + sa); tempreq.SendData.clear(); total_sent += sa; bool last = (total_sent >= content_length); q->sendStreamData(stream_id, data_frame, last); empty_streak = 0; } else { ++empty_streak; if (empty_streak >= max_empty) break; // Yield briefly so upstream data can arrive std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } // If we didn't finish, close the stream anyway if (total_sent < content_length) { q->sendStreamData(stream_id, {}, true); } // Clean up :res-* pseudo-headers tempreq.deldata(":res-valid"); tempreq.deldata(":res-status"); tempreq.deldata(":res-content-type"); tempreq.deldata(":res-content-length"); return; } // Collect extra :res-* headers (e.g. set-cookie) Loading
src/httpd.h +2 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ #include <netplus/socket.h> #include <netplus/eventapi.h> #include <atomic> #include <cstdint> #include <map> #include <memory> Loading Loading @@ -79,6 +80,7 @@ namespace libhttppp { std::mutex _h3BufferMutex; std::map<uint64_t, H3StreamBuffer> _h3StreamBuffers; std::set<netplus::socket*> _h3ControlStreamsSent; std::atomic<int> _h3NextTid{0}; void _initH3ControlStreams(netplus::quic *q); void _dispatchH2Stream(HttpRequest &cureq, std::string &out, uint32_t sid, Loading