Commit 694e833b authored by jan.koester's avatar jan.koester
Browse files

test

parent 71c1592f
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -198,10 +198,16 @@ private:
    void store_stripe(uint64_t group_id, uint32_t stripe_index,
                      const uint8_t* stripe_data, size_t stripe_len);

    // Retrieve and reconstruct one stripe from nodes
    // Retrieve and reconstruct one stripe from nodes.
    // Retries with backoff if the cluster is rebuilding.
    bool retrieve_stripe(uint64_t group_id, uint32_t stripe_index,
                         std::vector<uint8_t>& out);

    // Single-attempt stripe retrieval (internal).
    // Sets should_retry=true when some blocks were found but < k.
    bool retrieve_stripe_once(uint64_t group_id, uint32_t stripe_index,
                               std::vector<uint8_t>& out, bool& should_retry);

    uint64_t generate_group_id();
};

+5 −0
Original line number Diff line number Diff line
@@ -171,6 +171,11 @@ public:
        std::string key_file;     // Private key DER path
        int max_connections = 128;
        bool require_auth = false; // Enable client authentication
        // On startup, wait up to this many seconds for at least one peer
        // to become reachable before accepting client requests.
        // Prevents data-unavailable errors during full cluster restarts.
        // 0 = don't wait (legacy behaviour).
        int startup_peer_wait_secs = 0;
    };

    // Peer node info for auto-rebuild
+31 −1
Original line number Diff line number Diff line
@@ -556,6 +556,34 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,

bool client::retrieve_stripe(uint64_t group_id, uint32_t stripe_index,
                              std::vector<uint8_t>& out) {
    // Retry with backoff when the cluster is rebuilding after a full restart.
    // Without this, retrieve returns false immediately if too many nodes
    // haven't synced the block yet, even though the data exists on peers.
    static constexpr int MAX_RETRIES = 6;           // up to ~30s total
    static constexpr int INITIAL_WAIT_MS = 500;     // first retry after 500ms

    for (int attempt = 0; attempt <= MAX_RETRIES; ++attempt) {
        if (attempt > 0) {
            // Exponential backoff: 500, 1000, 2000, 4000, 8000, 16000 ms
            int wait_ms = INITIAL_WAIT_MS * (1 << (attempt - 1));
            std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
        }

        bool should_retry = false;
        if (!retrieve_stripe_once(group_id, stripe_index, out, should_retry))  {
            if (!should_retry || attempt == MAX_RETRIES)
                return false;
            // Some blocks were found but not enough — cluster may be rebuilding
            continue;
        }
        return true;
    }
    return false;
}

bool client::retrieve_stripe_once(uint64_t group_id, uint32_t stripe_index,
                                   std::vector<uint8_t>& out, bool& should_retry) {
    should_retry = false;
    size_t n = k_ + m_;

    auto retr_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
@@ -756,7 +784,9 @@ bool client::retrieve_stripe(uint64_t group_id, uint32_t stripe_index,

    if (fetched.empty()) return false;
    if (fetched.size() < k_) {
        return false;  // partial blocks — data not fully replicated yet
        // Some blocks found but not enough — the cluster might be rebuilding
        should_retry = true;
        return false;
    }

    // Check if all data blocks present (first k nodes)
+71 −1
Original line number Diff line number Diff line
@@ -832,6 +832,44 @@ void server::run() {

    running_ = true;

    // Socket wait instance (shared between startup probe and accept loop)
    netplus::socketwait wait;

    // Wait for at least one peer to become reachable before accepting
    // client requests. This prevents data-unavailable errors during
    // a full cluster restart where all nodes come up simultaneously.
    if (!peers_.empty() && cfg_.startup_peer_wait_secs > 0) {
        auto deadline = std::chrono::steady_clock::now()
                        + std::chrono::seconds(cfg_.startup_peer_wait_secs);
        bool peer_found = false;

        while (!peer_found && std::chrono::steady_clock::now() < deadline) {
            // Accept incoming connections while waiting (peers may connect to us)
            try {
                wait.waitRead(*socket_, 500);
                std::unique_ptr<netplus::socket> csock;
                socket_->accept(csock, true);
            } catch (...) {}

            // Probe each peer
            std::vector<client::node_info> nodes;
            for (const auto& p : peers_)
                nodes.push_back({p.address, p.port});
            try {
                client::credentials creds;
                creds.client_name = peer_creds_.client_name;
                creds.key = peer_creds_.key;
                client probe(1, 0, nodes, creds);
                for (size_t i = 0; i < nodes.size() && !peer_found; ++i) {
                    status_info info;
                    if (probe.get_node_status(i, info)) {
                        peer_found = true;
                    }
                }
            } catch (...) {}
        }
    }

    // Auto-rebuild: sync missing blocks from peers on startup
    if (!peers_.empty()) {
        trigger_rebuild();
@@ -839,7 +877,6 @@ void server::run() {

    // Simple accept loop — QUIC processes datagrams and fires
    // stream callbacks inside accept().
    netplus::socketwait wait;
    auto last_resync = std::chrono::steady_clock::now();
    while (running_) {
        try {
@@ -1017,6 +1054,39 @@ void server::process_message(netplus::quic& conn, uint64_t stream_id,
                auto resp = protocol::make_block_data(group_id, block_index,
                                                       block_data.data(), block_data.size());
                send_response(conn, stream_id, msg_type::BLOCK_DATA, resp);
            } else if (!peers_.empty()) {
                // Block not local — try forward-fetching from a peer.
                // This handles the full-cluster-restart scenario where rebuild
                // hasn't synced this block yet but a peer already has it.
                bool forwarded = false;
                try {
                    std::vector<client::node_info> nodes;
                    nodes.reserve(peers_.size());
                    for (const auto& p : peers_)
                        nodes.push_back({p.address, p.port});

                    client::credentials creds;
                    creds.client_name = peer_creds_.client_name;
                    creds.key = peer_creds_.key;

                    client peer_client(1, 0, nodes, creds);
                    if (peer_client.fetch_from_peers(group_id, block_index, block_data)) {
                        // Cache locally for future requests
                        store_->store(group_id, block_index,
                                      block_data.data(), block_data.size());
                        auto resp = protocol::make_block_data(group_id, block_index,
                                                               block_data.data(), block_data.size());
                        send_response(conn, stream_id, msg_type::BLOCK_DATA, resp);
                        forwarded = true;
                    }
                } catch (...) {}

                if (!forwarded) {
                    std::vector<uint8_t> not_found(12);
                    protocol::encode_u64(not_found.data(), group_id);
                    protocol::encode_u32(not_found.data() + 8, block_index);
                    send_response(conn, stream_id, msg_type::BLOCK_NOT_FOUND, not_found);
                }
            } else {
                std::vector<uint8_t> not_found(12);
                protocol::encode_u64(not_found.data(), group_id);