Commit cd3cb821 authored by jan.koester's avatar jan.koester
Browse files

test

parent 042c2af9
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
libparitypp (20260411+37) unstable; urgency=medium

  * Fix client global mutex blocking: release mutex during retrieve_stripe
    retry backoff sleep so other operations are not serialized behind retries
  * Change retrieve/retrieve_range from lock_guard to unique_lock to allow
    unlock/relock around retry sleeps

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 11 Apr 2026 18:00:00 +0200

libparitypp (20260411+36) unstable; urgency=medium

  * Add per-record CRC32 checksums to file_block_store (record header 17→21 bytes)
+4 −1
Original line number Diff line number Diff line
@@ -200,7 +200,10 @@ private:

    // Retrieve and reconstruct one stripe from nodes.
    // Retries with backoff if the cluster is rebuilding.
    bool retrieve_stripe(uint64_t group_id, uint32_t stripe_index,
    // The caller's unique_lock is released during retry sleeps so that other
    // client operations are not blocked by long backoff waits.
    bool retrieve_stripe(std::unique_lock<std::mutex>& lk,
                         uint64_t group_id, uint32_t stripe_index,
                         std::vector<uint8_t>& out);

    // Single-attempt stripe retrieval (internal).
+13 −8
Original line number Diff line number Diff line
@@ -554,7 +554,8 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
    }
}

bool client::retrieve_stripe(uint64_t group_id, uint32_t stripe_index,
bool client::retrieve_stripe(std::unique_lock<std::mutex>& lk,
                              uint64_t group_id, uint32_t stripe_index,
                              std::vector<uint8_t>& out) {
    // Retry with backoff when the cluster is rebuilding after a full restart.
    // Without this, retrieve returns false immediately if too many nodes
@@ -564,9 +565,13 @@ bool client::retrieve_stripe(uint64_t group_id, uint32_t stripe_index,

    for (int attempt = 0; attempt <= MAX_RETRIES; ++attempt) {
        if (attempt > 0) {
            // Exponential backoff: 500, 1000, 2000, 4000, 8000, 16000 ms
            // Release the global mutex during the backoff sleep so that other
            // client operations (retrieves for other groups, stores, etc.)
            // are not blocked.
            int wait_ms = INITIAL_WAIT_MS * (1 << (attempt - 1));
            lk.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
            lk.lock();
        }

        bool should_retry = false;
@@ -957,11 +962,11 @@ std::unique_ptr<store_session> client::begin_store(uint64_t group_id, size_t tot
// ---- Stripe-based retrieve ----

std::vector<uint8_t> client::retrieve(uint64_t group_id, size_t original_size) {
    std::lock_guard<std::mutex> guard(mutex_);
    std::unique_lock<std::mutex> guard(mutex_);
    std::vector<uint8_t> result;
    for (uint32_t s = 0; ; ++s) {
        std::vector<uint8_t> stripe;
        if (!retrieve_stripe(group_id, s, stripe)) break;
        if (!retrieve_stripe(guard, group_id, s, stripe)) break;
        result.insert(result.end(), stripe.begin(), stripe.end());
    }

@@ -971,13 +976,13 @@ std::vector<uint8_t> client::retrieve(uint64_t group_id, size_t original_size) {
}

std::vector<uint8_t> client::retrieve(uint64_t group_id) {
    std::lock_guard<std::mutex> guard(mutex_);
    std::unique_lock<std::mutex> guard(mutex_);
    std::vector<uint8_t> result;
    result.reserve(STRIPE_SIZE * 4);

    for (uint32_t s = 0; ; ++s) {
        std::vector<uint8_t> stripe;
        if (!retrieve_stripe(group_id, s, stripe)) break;
        if (!retrieve_stripe(guard, group_id, s, stripe)) break;
        result.insert(result.end(), stripe.begin(), stripe.end());
    }

@@ -998,7 +1003,7 @@ std::vector<uint8_t> client::retrieve(uint64_t group_id) {

std::vector<uint8_t> client::retrieve_range(uint64_t group_id,
                                              uint64_t offset, uint64_t length) {
    std::lock_guard<std::mutex> guard(mutex_);
    std::unique_lock<std::mutex> guard(mutex_);
    if (length == 0) return {};

    // The stored framed data is: [8-byte length header][raw data]
@@ -1016,7 +1021,7 @@ std::vector<uint8_t> client::retrieve_range(uint64_t group_id,

    for (uint32_t s = first_stripe; s <= last_stripe; ++s) {
        std::vector<uint8_t> stripe;
        if (!retrieve_stripe(group_id, s, stripe)) {
        if (!retrieve_stripe(guard, group_id, s, stripe)) {
            throw std::runtime_error("retrieve_range: stripe " + std::to_string(s) + " not found");
        }
        buf.insert(buf.end(), stripe.begin(), stripe.end());