Commit c46f9cdb authored by jan.koester's avatar jan.koester
Browse files

speed

parent 1a7a4297
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
mediadb (20260419+39) unstable; urgency=critical

  * Fix import hang: removed the post-import verification phase that fetched
    every media blob from the cluster to check shard counts. This blocked
    all API requests (including the status page) because it held cluster_op_mutex_
    exclusively during potentially slow network I/O. The periodic repair_replication()
    handles under-replicated keys instead.
  * Fix stuck importing_ flag: now uses RAII guard so importing_ is always
    cleared on exit, even on exceptions.
  * Simplify replicate_index() guard: removed cluster_.fetch("index") call
    that could itself hang. Now uses simple initial_sync_ok_ check instead.

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 19 Apr 2026 00:00:00 +0200

mediadb (20260419+38) unstable; urgency=critical

  * Fix import + tombstone conflict: importing stores that were previously
+15 −86
Original line number Diff line number Diff line
@@ -2865,18 +2865,19 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::vector<std::uint8_t>&
}

bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::size_t len) {
    DBG_LOG("[CLUSTER-IMPORT] start, input_size=" << len << "\n");
    std::cerr << "[CLUSTER-IMPORT] start, input_size=" << len << "\n";
    if (!cluster_.isRunning()) {
        DBG_LOG("[CLUSTER-IMPORT] cluster not running!\n");
        std::cerr << "[CLUSTER-IMPORT] cluster not running!\n";
        return false;
    }

    importing_.store(true);
    // RAII guard: always clear importing_ on exit, even on exceptions
    struct ImportGuard {
        std::atomic<bool>& flag;
        ~ImportGuard() { flag.store(false); }
    } import_guard{importing_};

    // Hold cluster_op_mutex_ for the entire import to prevent
    // sync_from_cluster from overwriting newly imported data with stale
    // cluster state.  Use collect_fn so BinDb::mutex_ is NOT held during
    // network I/O (import_from_stream releases it before we replicate).
    std::unique_lock<std::shared_mutex> cguard(cluster_op_mutex_);

    // Stream-Import: each entry is written to the cluster immediately
@@ -2901,11 +2902,9 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s
    bool parse_ok = local_.import_db_from_buffer(data, len, stream_fn);
    std::cerr << "[CLUSTER-IMPORT] parse done, ok=" << parse_ok
              << " replicated=" << repl_count << " failed=" << repl_fail << "\n";
    if (!parse_ok || repl_fail > 0) { importing_.store(false); return false; }
    if (!parse_ok || repl_fail > 0) return false;

    // Clear tombstones for any stores that were just imported.
    // Without this, a previous delete_store tombstone would cause
    // sync_from_cluster to immediately remove the reimported store.
    {
        auto sids = local_.store_ids();
        bool tombstone_changed = false;
@@ -2919,65 +2918,9 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s
        }
    }

    // Verify replication: ensure all keys have enough shards on all nodes.
    std::cerr << "[CLUSTER-IMPORT] verifying replication\n";
    {
        const auto& cfg = cluster_.getConfig();
        size_t required = cfg.data_blocks + cfg.parity_blocks;
        auto peer_groups = cluster_.list_peer_groups();
        std::vector<std::unordered_set<uint64_t>> node_sets(peer_groups.size());
        for (size_t i = 0; i < peer_groups.size(); ++i)
            node_sets[i].insert(peer_groups[i].groups.begin(),
                                peer_groups[i].groups.end());

        // Re-replicate index if under-replicated
        uint64_t idx_gid = cluster_group_id("index");
        int idx_shards = 0;
        for (size_t i = 0; i < peer_groups.size(); ++i)
            if (peer_groups[i].online && node_sets[i].count(idx_gid)) ++idx_shards;
        if (idx_shards < static_cast<int>(required)) {
            std::cerr << "[CLUSTER-IMPORT] index under-replicated (" << idx_shards
                      << "/" << required << "), re-replicating\n";
            replicate_index();
        }

        // Re-replicate store metadata if under-replicated
        auto sids = local_.store_ids();
        for (const auto& sid : sids) {
            uint64_t gid = cluster_group_id("store:" + sid);
            int shards = 0;
            for (size_t i = 0; i < peer_groups.size(); ++i)
                if (peer_groups[i].online && node_sets[i].count(gid)) ++shards;
            if (shards < static_cast<int>(required)) {
                std::cerr << "[CLUSTER-IMPORT] store:" << sid << " under-replicated ("
                          << shards << "/" << required << "), re-replicating\n";
                replicate_store(sid);
            }
        }

        // Re-replicate media blobs if under-replicated
        auto mids = local_.media_ids();
        int fixed = 0;
        for (const auto& mid : mids) {
            uint64_t gid = cluster_group_id("media:" + mid);
            int shards = 0;
            for (size_t i = 0; i < peer_groups.size(); ++i)
                if (peer_groups[i].online && node_sets[i].count(gid)) ++shards;
            if (shards < static_cast<int>(required)) {
                std::vector<uint8_t> data_buf;
                if (cluster_.fetch("media:" + mid, data_buf) && !data_buf.empty()) {
                    cluster_.replicate("media:" + mid, data_buf.data(), data_buf.size());
                    ++fixed;
                }
            }
        }
        if (fixed > 0)
            std::cerr << "[CLUSTER-IMPORT] re-replicated " << fixed
                      << " under-replicated media blobs\n";
    }

    std::cerr << "[CLUSTER-IMPORT] complete, success\n";
    importing_.store(false);
    // importing_ is cleared by ImportGuard destructor
    // Replication verification is handled by periodic repair_replication()
    return true;
}

@@ -3280,26 +3223,12 @@ void ClusterMediaBackend::replicate_index(bool force) {
    auto buf = local_.save_index_to_buffer();
    if (buf.empty()) return;

    auto local_count = local_.store_ids().size();

    if (!force) {
        // Safety: never push a local index that has fewer stores than the
        // cluster index — that would wipe out recently imported stores on
        // other nodes.
        if (local_count == 0) return;  // never push empty index

        std::vector<uint8_t> cluster_idx;
        if (cluster_.fetch("index", cluster_idx) && cluster_idx.size() >= 8) {
            // Parse store count from cluster index: offset 4 = num_stores (u32 LE)
            std::uint32_t cluster_stores = 0;
            std::memcpy(&cluster_stores, cluster_idx.data() + 4, 4);
            if (local_count < cluster_stores) {
                std::cerr << "[CLUSTER] skipping index replicate: local has "
                          << local_count << " stores, cluster has "
                          << cluster_stores << "\n";
                return;
            }
        }
        // Safety: don't push an empty index, and don't push if this node
        // hasn't completed initial sync yet (could overwrite a good index).
        auto local_count = local_.store_ids().size();
        if (local_count == 0) return;
        if (!initial_sync_ok_.load()) return;
    }

    cluster_.replicate("index", buf.data(), buf.size());