Commit 0fc7656d authored by jan.koester's avatar jan.koester
Browse files

rebalance

parent cd3cb821
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -111,6 +111,17 @@ public:
    // Returns number of nodes that completed vacuum successfully.
    size_t vacuum_all_nodes();

    // Rebalance block placement across all nodes.
    // For each group: retrieve data, delete the group, re-store so blocks land
    // on their correct nodes (block_index % n == node_index).
    // Returns {groups_rebalanced, groups_failed}.
    struct rebalance_result {
        size_t rebalanced = 0;
        size_t failed = 0;
        size_t already_ok = 0;
    };
    rebalance_result rebalance();

    // Mark a node as dead (skip it for the cooldown period)
    void mark_node_dead(size_t node_index);

+81 −0
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@
#include <chrono>
#include <future>
#include <mutex>
#include <set>

#ifdef NDEBUG
#define DBG_LOG(x) do {} while(0)
@@ -1316,6 +1317,86 @@ size_t client::vacuum_all_nodes() {
    return ok_count;
}

client::rebalance_result client::rebalance() {
    rebalance_result result;
    size_t n = k_ + m_;

    // Step 1: Collect all unique group IDs across all nodes (unlocked for listing)
    std::set<uint64_t> all_groups;
    for (size_t i = 0; i < nodes_.size(); ++i) {
        std::vector<uint64_t> node_groups;
        if (list_groups_on_node(i, node_groups)) {
            all_groups.insert(node_groups.begin(), node_groups.end());
        }
    }

    if (all_groups.empty())
        return result;

    std::cerr << "[PARITY] rebalance: " << all_groups.size() << " group(s) to check" << std::endl;

    // Step 2: For each group, check block placement and re-store if misplaced
    for (uint64_t gid : all_groups) {
        // List blocks on each node for this group
        auto node_blocks = list(gid);  // acquires mutex internally

        // Check if blocks are correctly placed: block_index % n == node_index
        bool misplaced = false;
        size_t total_blocks = 0;
        for (const auto& [node_idx, blocks] : node_blocks) {
            for (uint32_t bidx : blocks) {
                ++total_blocks;
                size_t expected_node = bidx % n;
                if (expected_node != node_idx) {
                    misplaced = true;
                }
            }
        }

        // Also check: each node should have at most one block per stripe
        // and total blocks should be a multiple of n
        if (!misplaced && total_blocks > 0 && total_blocks % n == 0) {
            ++result.already_ok;
            continue;
        }

        // Group needs rebalancing: retrieve → delete → re-store
        try {
            // Retrieve full data (parity decode handles misplaced blocks)
            auto data = retrieve(gid);
            if (data.empty()) {
                std::cerr << "[PARITY] rebalance: gid=" << gid
                          << " retrieve returned empty, skipping" << std::endl;
                ++result.failed;
                continue;
            }

            // Delete old (misplaced) blocks
            remove(gid);

            // Re-store with correct placement
            store(gid, data);

            ++result.rebalanced;
            std::cerr << "[PARITY] rebalance: gid=" << gid
                      << " re-stored (" << data.size() << " bytes)" << std::endl;
        } catch (const std::exception& e) {
            std::cerr << "[PARITY] rebalance: gid=" << gid
                      << " failed: " << e.what() << std::endl;
            ++result.failed;
        }
    }

    // Step 3: Vacuum all nodes to reclaim space from deleted blocks
    if (result.rebalanced > 0)
        vacuum_all_nodes();

    std::cerr << "[PARITY] rebalance done: " << result.rebalanced << " rebalanced, "
              << result.already_ok << " ok, " << result.failed << " failed" << std::endl;

    return result;
}

void client::mark_node_dead(size_t node_index) {
    std::lock_guard<std::mutex> guard(mutex_);
    if (node_index < connections_.size()) {