Commit 2107ddaf authored by jan.koester's avatar jan.koester
Browse files

test

parent 006f90ef
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
libparitypp (20260405+21) unstable; urgency=medium

  * Replace per-socket waitRead polling loop in store_stripe Phase 2 with
    waitReadMulti: single epoll/kqueue/poll call waits on all pending nodes
    at once, eliminating CPU-burning spin when nodes are slow to respond

 -- Jan Koester <jan.koester@tuxist.de>  Sun, 05 Apr 2026 00:00:00 +0000

libparitypp (20260405+20) unstable; urgency=medium

  * Add VACUUM protocol command (0x07) for remote block store compaction
+56 −34
Original line number Diff line number Diff line
@@ -335,13 +335,21 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
        }
    }

    // Phase 2: collect responses using proper waitRead
    // Phase 2: collect responses using waitReadMulti
    size_t remaining = 0;
    for (auto& p : pending) {
        if (p.sent && !p.local) ++remaining;
    }

    for (int attempt = 0; attempt < 1500 && remaining > 0; ++attempt) {
    // Pump all connections once — data may already be buffered from Phase 1
    for (auto& p : pending) {
        if (!p.sent || p.local || p.ok) continue;
        if (!connections_[p.node_index].socket) continue;
        try { connections_[p.node_index].socket->pumpNetwork(MSG_DONTWAIT); } catch (...) {}
    }

    for (int attempt = 0; attempt < 300 && remaining > 0; ++attempt) {
        // Check buffered data first before blocking
        for (auto& p : pending) {
            if (!p.sent || p.local || p.ok) continue;
            if (!connections_[p.node_index].socket) {
@@ -351,23 +359,14 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
                continue;
            }
            auto& conn = *connections_[p.node_index].socket;
            try {
                // Pump first — data may already be buffered from Phase 1
                conn.pumpNetwork(MSG_DONTWAIT);
                if (!conn.hasStreamData(p.stream)) {
                    // No buffered data yet — wait for socket to become readable
                    if (!wait_.waitRead(conn, 10)) continue;
                    conn.pumpNetwork(MSG_DONTWAIT);
            if (!conn.hasStreamData(p.stream)) continue;
                }
                {
            try {
                uint8_t hdr[protocol::HEADER_SIZE];
                size_t r = conn.recvStreamData(p.stream, hdr, protocol::HEADER_SIZE);
                if (r >= protocol::HEADER_SIZE) {
                    msg_type resp_type;
                    uint32_t resp_len;
                    if (protocol::decode_header(hdr, r, resp_type, resp_len)) {
                            // Drain any response body
                        if (resp_len > 0) {
                            std::vector<uint8_t> discard(resp_len);
                            size_t total = 0;
@@ -380,14 +379,11 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
                            }
                        }
                        p.ok = (resp_type == msg_type::OK);
                            if (!p.ok) {
                                p.fail_reason = "server returned non-OK response";
                            }
                        if (!p.ok) p.fail_reason = "server returned non-OK response";
                        conn.closeStream(p.stream);
                        --remaining;
                    }
                }
                }
            } catch (const netplus::NetException& e) {
                p.fail_reason = std::string("Phase 2 net: ") + e.what();
                connections_[p.node_index].socket.reset();
@@ -405,6 +401,32 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
                --remaining;
            }
        }
        if (remaining == 0) break;

        // Build socket list for waitReadMulti
        std::vector<netplus::socket*> wait_socks;
        std::vector<size_t> wait_idx; // maps wait_socks position -> pending index
        for (size_t i = 0; i < pending.size(); ++i) {
            auto& p = pending[i];
            if (!p.sent || p.local || p.ok) continue;
            if (!connections_[p.node_index].socket) continue;
            wait_socks.push_back(connections_[p.node_index].socket.get());
            wait_idx.push_back(i);
        }
        if (wait_socks.empty()) break;

        std::vector<bool> ready;
        wait_.waitReadMulti(wait_socks, ready, 50);

        // Pump only sockets that are actually ready
        for (size_t wi = 0; wi < wait_socks.size(); ++wi) {
            if (!ready[wi]) continue;
            auto& p = pending[wait_idx[wi]];
            if (!connections_[p.node_index].socket) continue;
            try {
                connections_[p.node_index].socket->pumpNetwork(MSG_DONTWAIT);
            } catch (...) {}
        }
    }

    // Mark response timeout for nodes that never responded