Commit 5f66cf56 authored by jan.koester's avatar jan.koester
Browse files

test

parent 43e15a53
Loading
Loading
Loading
Loading
+19 −0
Original line number Diff line number Diff line
mediadb (20260419+36) unstable; urgency=high

  * Streaming cluster import: each media blob is now written directly to
    the cluster as it is parsed from the import stream, instead of buffering
    all blobs in RAM first. Drastically reduces memory usage during import.
    Post-import verification ensures all keys are fully replicated.

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 19 Apr 2026 00:00:00 +0200

mediadb (20260419+35) unstable; urgency=high

  * Import verification: after replicating all blobs to the cluster, a new
    Phase 3 checks shard counts for every key (index, store metadata, media
    blobs). Under-replicated keys are re-replicated immediately, ensuring
    all nodes can serve imported media right away instead of waiting for the
    periodic repair cycle.

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 19 Apr 2026 00:00:00 +0200

mediadb (20260419+34) unstable; urgency=high

  * Fix index replication deadlock: repair_replication() no longer requires
+75 −43
Original line number Diff line number Diff line
@@ -2878,57 +2878,89 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s
    // cluster state.  Use collect_fn so BinDb::mutex_ is NOT held during
    // network I/O (import_from_stream releases it before we replicate).
    std::unique_lock<std::shared_mutex> cguard(cluster_op_mutex_);
    DBG_LOG("[CLUSTER-IMPORT] acquired cluster_op_mutex_\n");

    // Phase 1: Parse and update local state (BinDb::mutex_ held internally, fast).
    // Collect replication buffers instead of calling cluster_.replicate() under
    // BinDb::mutex_ — that would block all reads for the entire network duration.
    std::vector<std::pair<std::string, std::vector<uint8_t>>> repl_queue;
    auto collect_fn = [&repl_queue](const std::string& key, const uint8_t* d, size_t l) {
        DBG_LOG("[CLUSTER-IMPORT] collect key=" << key << " size=" << l << "\n");
        repl_queue.emplace_back(key, std::vector<uint8_t>(d, d + l));
    };
    DBG_LOG("[CLUSTER-IMPORT] Phase 1: parsing...\n");
    bool parse_ok = local_.import_db_from_buffer(data, len, collect_fn);
    DBG_LOG("[CLUSTER-IMPORT] Phase 1 done, ok=" << parse_ok
              << " queued=" << repl_queue.size() << "\n");
    if (!parse_ok) { importing_.store(false); return false; }

    // Phase 2: Replicate collected buffers.  BinDb::mutex_ is released
    // (import_from_stream returned), so reads to local_ are not blocked.
    // cluster_op_mutex_ is still held so ClusterMediaBackend reads wait,
    // but sync cannot interfere.
    DBG_LOG("[CLUSTER-IMPORT] Phase 2: replicating " << repl_queue.size() << " buffers\n");
    int repl_idx = 0;
    for (auto& [key, buf] : repl_queue) {
        ++repl_idx;
        DBG_LOG("[CLUSTER-IMPORT]   replicate " << repl_idx << "/" << repl_queue.size()
                  << " key=" << key << " size=" << buf.size() << "\n");
        
        bool ok = false;

    // Stream-Import: each entry is written to the cluster immediately
    // as it is parsed, keeping RAM usage minimal.
    int repl_count = 0;
    int repl_fail = 0;
    auto stream_fn = [&](const std::string& key, const uint8_t* d, size_t l) {
        ++repl_count;
        if (repl_count % 50 == 1)
            std::cerr << "[CLUSTER-IMPORT] replicate #" << repl_count
                      << " key=" << key << " size=" << l << "\n";
        constexpr int MAX_RETRIES = 3;
        for (int attempt = 1; attempt <= MAX_RETRIES; ++attempt) {
            if (cluster_.replicate(key, buf.data(), buf.size())) {
                ok = true;
                break;
            if (cluster_.replicate(key, d, l)) return;
            std::cerr << "[CLUSTER-IMPORT] retry " << attempt << " key=" << key << "\n";
            std::this_thread::sleep_for(std::chrono::seconds(attempt));
        }
            DBG_LOG("[CLUSTER-IMPORT]   retry attempt " << attempt << " for key=" << key << "\n");
            std::this_thread::sleep_for(std::chrono::seconds(attempt * 2));
        std::cerr << "[CLUSTER-IMPORT] FAILED key=" << key << " after retries\n";
        ++repl_fail;
    };

    bool parse_ok = local_.import_db_from_buffer(data, len, stream_fn);
    std::cerr << "[CLUSTER-IMPORT] parse done, ok=" << parse_ok
              << " replicated=" << repl_count << " failed=" << repl_fail << "\n";
    if (!parse_ok || repl_fail > 0) { importing_.store(false); return false; }

    // Verify replication: ensure all keys have enough shards on all nodes.
    std::cerr << "[CLUSTER-IMPORT] verifying replication\n";
    {
        const auto& cfg = cluster_.getConfig();
        size_t required = cfg.data_blocks + cfg.parity_blocks;
        auto peer_groups = cluster_.list_peer_groups();
        std::vector<std::unordered_set<uint64_t>> node_sets(peer_groups.size());
        for (size_t i = 0; i < peer_groups.size(); ++i)
            node_sets[i].insert(peer_groups[i].groups.begin(),
                                peer_groups[i].groups.end());

        // Re-replicate index if under-replicated
        uint64_t idx_gid = cluster_group_id("index");
        int idx_shards = 0;
        for (size_t i = 0; i < peer_groups.size(); ++i)
            if (peer_groups[i].online && node_sets[i].count(idx_gid)) ++idx_shards;
        if (idx_shards < static_cast<int>(required)) {
            std::cerr << "[CLUSTER-IMPORT] index under-replicated (" << idx_shards
                      << "/" << required << "), re-replicating\n";
            replicate_index();
        }

        if (!ok) {
            DBG_LOG("[CLUSTER-IMPORT]   FAILED key=" << key << " after retries. Aborting import.\n");
            importing_.store(false);
            return false;
        // Re-replicate store metadata if under-replicated
        auto sids = local_.store_ids();
        for (const auto& sid : sids) {
            uint64_t gid = cluster_group_id("store:" + sid);
            int shards = 0;
            for (size_t i = 0; i < peer_groups.size(); ++i)
                if (peer_groups[i].online && node_sets[i].count(gid)) ++shards;
            if (shards < static_cast<int>(required)) {
                std::cerr << "[CLUSTER-IMPORT] store:" << sid << " under-replicated ("
                          << shards << "/" << required << "), re-replicating\n";
                replicate_store(sid);
            }
        }

        DBG_LOG("[CLUSTER-IMPORT]   OK key=" << key << "\n");
        buf.clear();
        buf.shrink_to_fit();
        // Re-replicate media blobs if under-replicated
        auto mids = local_.media_ids();
        int fixed = 0;
        for (const auto& mid : mids) {
            uint64_t gid = cluster_group_id("media:" + mid);
            int shards = 0;
            for (size_t i = 0; i < peer_groups.size(); ++i)
                if (peer_groups[i].online && node_sets[i].count(gid)) ++shards;
            if (shards < static_cast<int>(required)) {
                std::vector<uint8_t> data_buf;
                if (cluster_.fetch("media:" + mid, data_buf) && !data_buf.empty()) {
                    cluster_.replicate("media:" + mid, data_buf.data(), data_buf.size());
                    ++fixed;
                }
            }
        }
        if (fixed > 0)
            std::cerr << "[CLUSTER-IMPORT] re-replicated " << fixed
                      << " under-replicated media blobs\n";
    }
    repl_queue.clear();

    DBG_LOG("[CLUSTER-IMPORT] complete, success\n");
    std::cerr << "[CLUSTER-IMPORT] complete, success\n";
    importing_.store(false);
    return true;
}