Commit fb98e5ca authored by jan.koester's avatar jan.koester
Browse files

http2 streaming

parent 58660cdf
Loading
Loading
Loading
Loading
+70 −4
Original line number Diff line number Diff line
@@ -414,21 +414,80 @@ void libhttppp::HttpEvent::_dispatchH2Stream(HttpRequest &cureq,
                             body.size() < content_length);

        if (is_streaming) {
            // Streaming response: collect all data via ResponseEvent
            // Streaming response: send HEADERS immediately, then push
            // DATA frames to the real connection incrementally so the
            // client receives data while we are still collecting it
            // from the upstream backend (via ResponseEvent on tempreq).

            tempreq.h2state().expectedContentLength = content_length;
            tempreq.SendData.pos = 0;

            // Collect extra :res-* headers before we start
            std::vector<libhttppp::hpack::HeaderField> extra;
            if (!_altSvcH3.empty())
                extra.push_back({"alt-svc", _altSvcH3});
            for (auto *hd = tempreq.getfirstHeaderData(); hd; hd = hd->nextHeaderData()) {
                const std::string &key = hd->getkey();
                if (key.substr(0, 5) != ":res-") continue;
                if (key == ":res-valid" || key == ":res-status" ||
                    key == ":res-content-type" || key == ":res-content-length") continue;
                std::string realKey = key.substr(5);
                std::string joined;
                for (auto *v = hd->getfirstValue(); v; v = v->nextvalue()) {
                    if (!joined.empty()) joined += "; ";
                    joined += v->getvalue();
                }
                if (!joined.empty())
                    extra.push_back({realKey, joined});
            }

            // Send HEADERS frame right away
            std::string hpack_out = libhttppp::hpack::Encoder::encodeResponseHeaders(
                status_code, content_type, content_length, extra);
            out += h2BuildFrame(H2_FRAME_HEADERS, H2_FLAG_END_HEADERS, sid, hpack_out);

            // Flush any body data we already have as DATA frames
            if (!body.empty()) {
                h2AppendDataFrames(out, sid, body, false);
            }

            // Flush to real connection so client starts receiving
            cureq.SendData.append(out.data(), out.size());
            out.clear();

            size_t total_sent = body.size();
            body.clear();
            body.shrink_to_fit();

            size_t max_iterations = content_length / BLOCKSIZE + 4096;
            size_t empty_count = 0;
            unsigned int backoff_ms = 1;
            for (size_t i = 0;
                 i < max_iterations && body.size() < content_length;
                 i < max_iterations && total_sent < content_length;
                 ++i) {
                ResponseEvent(tempreq, tid, args);
                size_t sa = tempreq.SendData.size();
                if (sa > 0) {
                    body.append(tempreq.SendData.data(), sa);
                    // Build DATA frames and flush directly to the
                    // real connection so epoll can drain to the client.
                    bool last = (total_sent + sa >= content_length);
                    std::string chunk(tempreq.SendData.data(), sa);
                    tempreq.SendData.clear();

                    std::string frames;
                    frames.reserve(sa + (sa / H2_MAX_FRAME_SIZE + 1) * H2_FRAME_HEADER_LEN);
                    // Split into H2-sized DATA frames
                    size_t off = 0;
                    while (off < chunk.size()) {
                        size_t frag = std::min(chunk.size() - off, H2_MAX_FRAME_SIZE);
                        bool end = last && (off + frag >= chunk.size());
                        uint8_t flags = end ? H2_FLAG_END_STREAM : 0;
                        frames += h2BuildFrame(H2_FRAME_DATA, flags, sid,
                                               std::string(chunk.data() + off, frag));
                        off += frag;
                    }
                    cureq.SendData.append(frames.data(), frames.size());
                    total_sent += sa;
                    empty_count = 0;
                    backoff_ms = 1;
                } else {
@@ -436,11 +495,18 @@ void libhttppp::HttpEvent::_dispatchH2Stream(HttpRequest &cureq,
                    if (empty_count > 500) {
                        break; // Genuine stall — give up
                    }
                    // No data yet — exponential backoff to reduce CPU usage
                    std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
                    if (backoff_ms < 32) backoff_ms *= 2;
                }
            }

            // If we exited without finishing, send END_STREAM on an empty DATA frame
            if (total_sent < content_length) {
                std::string end_frame = h2BuildFrame(H2_FRAME_DATA, H2_FLAG_END_STREAM, sid, "");
                cureq.SendData.append(end_frame.data(), end_frame.size());
            }

            return;  // Already handled — skip the normal encoding below
        }

        // Collect extra :res-* headers (skip handled ones)