Commit be9a2e11 authored by jan.koester's avatar jan.koester
Browse files

store problem solved

parent 5f66cf56
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
mediadb (20260419+37) unstable; urgency=critical

  * Prevent stale index overwrite: replicate_index() now compares local
    store count against cluster store count and refuses to push a smaller
    index. This prevents nodes that haven't synced yet from overwriting
    a freshly imported index with their empty/stale version. delete_store
    uses force=true to bypass the guard when a store was intentionally
    removed. repair_replication only re-replicates the index if the
    local node actually has stores.

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 19 Apr 2026 00:00:00 +0200

mediadb (20260419+36) unstable; urgency=high

  * Streaming cluster import: each media blob is now written directly to
+36 −11
Original line number Diff line number Diff line
@@ -2522,7 +2522,7 @@ bool ClusterMediaBackend::delete_store(const std::string& id) {
    // Record tombstone so other nodes also delete this store on sync
    tombstones_.insert(id);
    replicate_tombstones();
    replicate_index();
    replicate_index(true);  // force: store count decreased intentionally
    return true;
}

@@ -3259,12 +3259,33 @@ void ClusterMediaBackend::repair_index() {
    replicate_index();
}

void ClusterMediaBackend::replicate_index() {
void ClusterMediaBackend::replicate_index(bool force) {
    if (!cluster_.isRunning()) return;
    auto buf = local_.save_index_to_buffer();
    if (buf.empty()) {
    if (buf.empty()) return;

    auto local_count = local_.store_ids().size();

    if (!force) {
        // Safety: never push a local index that has fewer stores than the
        // cluster index — that would wipe out recently imported stores on
        // other nodes.
        if (local_count == 0) return;  // never push empty index

        std::vector<uint8_t> cluster_idx;
        if (cluster_.fetch("index", cluster_idx) && cluster_idx.size() >= 8) {
            // Parse store count from cluster index: offset 4 = num_stores (u32 LE)
            std::uint32_t cluster_stores = 0;
            std::memcpy(&cluster_stores, cluster_idx.data() + 4, 4);
            if (local_count < cluster_stores) {
                std::cerr << "[CLUSTER] skipping index replicate: local has "
                          << local_count << " stores, cluster has "
                          << cluster_stores << "\n";
                return;
            }
        }
    }

    cluster_.replicate("index", buf.data(), buf.size());
}

@@ -3359,8 +3380,8 @@ void ClusterMediaBackend::repair_replication() {
    if (online_count < required_shards) return; // not enough nodes online to repair

    // Always check index replication — even before initial_sync_ok_.
    // If this node has local data, it can push the index to the cluster
    // to unblock other nodes that are stuck fetching.
    // But ONLY push if this node actually has stores. An empty index
    // must never overwrite a non-empty one in the cluster.
    uint64_t index_gid = cluster_group_id("index");
    int index_shards = 0;
    for (size_t i = 0; i < peer_groups.size(); ++i) {
@@ -3368,13 +3389,17 @@ void ClusterMediaBackend::repair_replication() {
            ++index_shards;
    }
    if (index_shards < static_cast<int>(required_shards)) {
        auto sids = local_.store_ids();
        if (!sids.empty()) {
            auto buf = local_.save_index_to_buffer();
            if (!buf.empty()) {
                std::cerr << "[CLUSTER-REPAIR] re-replicating index ("
                      << index_shards << "/" << required_shards << ")\n";
                          << index_shards << "/" << required_shards
                          << ", " << sids.size() << " local stores)\n";
                cluster_.replicate("index", buf.data(), buf.size());
            }
        }
    }

    // Everything below requires a successful initial sync
    if (!initial_sync_ok_.load()) return;
+1 −1
Original line number Diff line number Diff line
@@ -328,7 +328,7 @@ public:
    BinDb& local();

private:
    void replicate_index();
    void replicate_index(bool force = false);
    void replicate_store(const std::string& store_id);
    void replicate_tombstones();
    void load_tombstones();