Commit 2cee9ddd authored by jan.koester's avatar jan.koester
Browse files

test

parent 9fc4d5e8
Loading
Loading
Loading
Loading
+10 −0
Original line number Diff line number Diff line
libparitypp (20260405+24) unstable; urgency=medium

  * Pipeline retrieve_stripe: send FETCH_BLOCK to all nodes simultaneously,
    collect responses with waitReadMulti (parallelize block fetches per stripe)
  * Auto-retry in client::store on stale QUIC connections: reset all
    connections and retry once on first stripe failure
  * Gate all debug logging behind NDEBUG macro (no output in Release builds)

 -- Jan Koester <jan.koester@tuxist.de>  Sun, 05 Apr 2026 00:00:00 +0000

libparitypp (20260405+23) unstable; urgency=medium

  * Increase STRIPE_SIZE from 16KB to 4MB to reduce roundtrips per store
+231 −48
Original line number Diff line number Diff line
@@ -8,6 +8,14 @@
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <mutex>

#ifdef NDEBUG
#define DBG_LOG(x) do {} while(0)
#else
#define DBG_LOG(x) do { std::cerr << x; } while(0)
#endif

namespace paritypp {

@@ -460,7 +468,7 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
    }

    if (failed > m_) {
        std::cerr << "[PARITY] store_stripe FAILED: " << diag << "\n";
        DBG_LOG("[PARITY] store_stripe FAILED: " << diag << "\n");
        throw std::runtime_error("store_stripe: too many node failures ("
                                 + std::to_string(failed) + " > " + std::to_string(m_)
                                 + "): " + diag);
@@ -470,49 +478,202 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
bool client::retrieve_stripe(uint64_t group_id, uint32_t stripe_index,
                              std::vector<uint8_t>& out) {
    size_t n = k_ + m_;
    std::vector<std::vector<uint8_t>> fetched;
    std::vector<size_t> indices;
    bool any_found = false;

    for (size_t i = 0; i < n && fetched.size() < k_; i++) {
        try {
    // Pipelined fetch: send requests to all nodes in parallel,
    // then collect responses using waitReadMulti.
    struct Pending {
        size_t node_index;
        uint64_t stream = 0;
        bool local = false;
        bool sent = false;
        bool ok = false;
        std::vector<uint8_t> block;
            uint32_t bidx = stripe_index * static_cast<uint32_t>(k_ + m_) + static_cast<uint32_t>(i);
            if (fetch_block_from_node(i, group_id, bidx, block) && !block.empty()) {
                fetched.push_back(std::move(block));
                indices.push_back(i);
                any_found = true;
    };
    std::vector<Pending> pending(n);
    size_t local_ok_count = 0;

    // Phase 1: send FETCH_BLOCK to all nodes (or fetch from local store)
    for (size_t i = 0; i < n; ++i) {
        uint32_t bidx = stripe_index * static_cast<uint32_t>(n) + static_cast<uint32_t>(i);
        pending[i].node_index = i;

        if (static_cast<int>(i) == local_node_index_ && local_store_) {
            pending[i].local = true;
            pending[i].ok = local_store_->fetch(group_id, bidx, pending[i].block)
                            && !pending[i].block.empty();
            if (pending[i].ok) ++local_ok_count;
            continue;
        }
        } catch (const std::exception& e) {

        try {
            ensure_connected(i);
            if (!connections_[i].connected || !connections_[i].socket) continue;
            auto& conn = *connections_[i].socket;
            auto payload = protocol::make_fetch_block(group_id, bidx);
            auto msg = protocol::encode_message(msg_type::FETCH_BLOCK, payload);
            uint64_t stream = conn.openStream(true);
            pending[i].stream = stream;

            for (int sw = 0; sw < 150; ++sw) {
                if (wait_.waitWrite(conn, 100)) {
                    conn.sendStreamData(stream, msg, true);
                    conn.pumpNetwork(MSG_DONTWAIT);
                    pending[i].sent = true;
                    break;
                }
                conn.pumpNetwork(MSG_DONTWAIT);
            }
            if (!pending[i].sent) {
                try { conn.closeStream(stream); } catch (...) {}
            }
        } catch (...) {
            connections_[i].socket.reset();
            connections_[i].connected = false;
        }
    }

    if (!any_found) return false;
    // Phase 2: collect responses using waitReadMulti
    size_t remote_sent = 0;
    for (auto& p : pending) if (p.sent && !p.local) ++remote_sent;

    // Pump all connections once
    for (auto& p : pending) {
        if (!p.sent || p.local || p.ok) continue;
        if (!connections_[p.node_index].socket) continue;
        try { connections_[p.node_index].socket->pumpNetwork(MSG_DONTWAIT); } catch (...) {}
    }

    size_t remaining = remote_sent;
    size_t total_ok = local_ok_count;

    for (int attempt = 0; attempt < 300 && remaining > 0 && total_ok < k_; ++attempt) {
        // Check buffered data
        for (auto& p : pending) {
            if (!p.sent || p.local || p.ok) continue;
            if (!connections_[p.node_index].socket) {
                --remaining;
                p.sent = false;
                continue;
            }
            auto& conn = *connections_[p.node_index].socket;
            if (!conn.hasStreamData(p.stream)) continue;
            try {
                uint8_t hdr[protocol::HEADER_SIZE];
                size_t r = conn.recvStreamData(p.stream, hdr, protocol::HEADER_SIZE);
                if (r >= protocol::HEADER_SIZE) {
                    msg_type resp_type;
                    uint32_t resp_len;
                    if (protocol::decode_header(hdr, r, resp_type, resp_len)) {
                        if (resp_type == msg_type::BLOCK_DATA && resp_len >= 12) {
                            std::vector<uint8_t> body(resp_len);
                            size_t total_read = 0;
                            int body_att = 0;
                            while (total_read < resp_len && body_att++ < 150) {
                                if (!wait_.waitRead(conn, 100)) continue;
                                conn.pumpNetwork(MSG_DONTWAIT);
                                total_read += conn.recvStreamData(p.stream,
                                    body.data() + total_read, resp_len - total_read);
                            }
                            if (total_read >= resp_len) {
                                const uint8_t* data;
                                size_t data_len;
                                uint64_t gid;
                                uint32_t bidx;
                                if (protocol::parse_block_data(body.data(), resp_len,
                                                                gid, bidx, data, data_len)) {
                                    p.block.assign(data, data + data_len);
                                    p.ok = true;
                                    ++total_ok;
                                }
                            }
                        } else if (resp_len > 0) {
                            // Drain non-BLOCK_DATA response body
                            std::vector<uint8_t> discard(resp_len);
                            size_t total_read = 0;
                            int body_att = 0;
                            while (total_read < resp_len && body_att++ < 150) {
                                if (!wait_.waitRead(conn, 100)) continue;
                                conn.pumpNetwork(MSG_DONTWAIT);
                                total_read += conn.recvStreamData(p.stream,
                                    discard.data() + total_read, resp_len - total_read);
                            }
                        }
                    }
                    conn.closeStream(p.stream);
                    --remaining;
                }
            } catch (...) {
                connections_[p.node_index].socket.reset();
                connections_[p.node_index].connected = false;
                --remaining;
            }
        }
        if (remaining == 0 || total_ok >= k_) break;

        // Wait on all pending sockets
        std::vector<netplus::socket*> wait_socks;
        std::vector<size_t> wait_idx;
        for (size_t i = 0; i < pending.size(); ++i) {
            auto& p = pending[i];
            if (!p.sent || p.local || p.ok) continue;
            if (!connections_[p.node_index].socket) continue;
            wait_socks.push_back(connections_[p.node_index].socket.get());
            wait_idx.push_back(i);
        }
        if (wait_socks.empty()) break;

        std::vector<bool> ready;
        wait_.waitReadMulti(wait_socks, ready, 50);

        for (size_t wi = 0; wi < wait_socks.size(); ++wi) {
            if (!ready[wi]) continue;
            auto& p = pending[wait_idx[wi]];
            if (!connections_[p.node_index].socket) continue;
            try { connections_[p.node_index].socket->pumpNetwork(MSG_DONTWAIT); } catch (...) {}
        }
    }

    // Close remaining streams
    for (auto& p : pending) {
        if (p.sent && !p.local && !p.ok && connections_[p.node_index].socket) {
            try { connections_[p.node_index].socket->closeStream(p.stream); } catch (...) {}
        }
    }

    // Collect successful blocks
    std::vector<std::vector<uint8_t>> fetched;
    std::vector<size_t> fetch_indices;
    for (auto& p : pending) {
        if (p.ok && !p.block.empty()) {
            fetch_indices.push_back(p.node_index);
            fetched.push_back(std::move(p.block));
        }
    }

    if (fetched.empty()) return false;
    if (fetched.size() < k_) {
        throw std::runtime_error("retrieve_stripe: not enough blocks ("
                                 + std::to_string(fetched.size()) + " < " + std::to_string(k_) + ")");
    }

    // Check if all data blocks present
    // Check if all data blocks present (first k nodes)
    bool all_data = true;
    for (size_t i = 0; i < k_; i++) {
        bool found = false;
        for (auto idx : indices) { if (idx == i) { found = true; break; } }
        for (auto idx : fetch_indices) { if (idx == i) { found = true; break; } }
        if (!found) { all_data = false; break; }
    }

    std::vector<std::vector<uint8_t>> data_blocks;
    if (all_data) {
        data_blocks.resize(k_);
        for (size_t j = 0; j < indices.size(); j++) {
            if (indices[j] < k_)
                data_blocks[indices[j]] = std::move(fetched[j]);
        for (size_t j = 0; j < fetch_indices.size(); j++) {
            if (fetch_indices[j] < k_)
                data_blocks[fetch_indices[j]] = std::move(fetched[j]);
        }
    } else {
        size_t block_size = fetched[0].size();
        data_blocks = parity_engine_.decode(fetched, indices, block_size);
        data_blocks = parity_engine_.decode(fetched, fetch_indices, block_size);
    }

    out = parity::join(data_blocks, STRIPE_SIZE);
@@ -532,11 +693,16 @@ void client::store(uint64_t group_id, const uint8_t* data, size_t data_len) {

    // Combine header + data into a logical stream, process in stripes
    size_t framed_len = 8 + data_len;
    uint32_t total_stripes = static_cast<uint32_t>((framed_len + STRIPE_SIZE - 1) / STRIPE_SIZE);
    DBG_LOG("[PARITY] store gid=" << group_id << " data_len=" << data_len
              << " stripes=" << total_stripes << "\n");

    // Retry once on failure: stale QUIC connections from a previous operation
    // cause the first stripe to fail; resetting connections fixes it.
    for (int attempt = 0; attempt < 2; ++attempt) {
        try {
            uint32_t stripe_idx = 0;
            size_t offset = 0;
    uint32_t total_stripes = static_cast<uint32_t>((framed_len + STRIPE_SIZE - 1) / STRIPE_SIZE);
    std::cerr << "[PARITY] store gid=" << group_id << " data_len=" << data_len
              << " stripes=" << total_stripes << "\n";

            while (offset < framed_len) {
                std::vector<uint8_t> stripe(STRIPE_SIZE, 0);
@@ -559,12 +725,27 @@ void client::store(uint64_t group_id, const uint8_t* data, size_t data_len) {

                store_stripe(group_id, stripe_idx, stripe.data(), STRIPE_SIZE);
                if (stripe_idx % 100 == 0 || stripe_idx + 1 == total_stripes) {
            std::cerr << "[PARITY]   stripe " << (stripe_idx+1) << "/" << total_stripes << "\n";
                    DBG_LOG("[PARITY]   stripe " << (stripe_idx+1) << "/" << total_stripes << "\n");
                }
                ++stripe_idx;
                offset += STRIPE_SIZE;
            }
    std::cerr << "[PARITY] store gid=" << group_id << " complete\n";
            DBG_LOG("[PARITY] store gid=" << group_id << " complete\n");
            return; // success
        } catch (const std::exception& e) {
            if (attempt == 0) {
                DBG_LOG("[PARITY] store attempt 1 failed: " << e.what()
                          << ", resetting connections and retrying\n");
                // Reset all remote connections so ensure_connected() opens fresh ones
                for (size_t i = 0; i < connections_.size(); ++i) {
                    connections_[i].socket.reset();
                    connections_[i].connected = false;
                }
            } else {
                throw; // second attempt failed, propagate
            }
        }
    }
}

void client::store(uint64_t group_id, const std::vector<uint8_t>& data) {
@@ -660,6 +841,8 @@ std::vector<uint8_t> client::retrieve(uint64_t group_id, size_t original_size) {

std::vector<uint8_t> client::retrieve(uint64_t group_id) {
    std::vector<uint8_t> result;
    result.reserve(STRIPE_SIZE * 4);

    for (uint32_t s = 0; ; ++s) {
        std::vector<uint8_t> stripe;
        if (!retrieve_stripe(group_id, s, stripe)) break;