Loading src/backends/cluster.cpp +10 −36 Original line number Diff line number Diff line Loading @@ -374,26 +374,14 @@ namespace authdb { } void ClusterBackend::pushToCluster() { // Sync push for destructor — push all dirty entities directly // Sync push for destructor — single manifest contains full buffer if (!_Dirty) return; newRevesion(); if (!g_Cluster || !g_Cluster->isRunning()) return; auto &client = g_Cluster->getClient(); if (!client) return; auto entities = parseEntities(); std::set<EntityKey> keys; for (auto &[ek, data] : entities) { for (auto &[ek, data] : entities) keys.insert(ek); try { client->store(entityGroupId(ek), data.data(), data.size()); } catch (const std::exception &e) { std::cerr << "ClusterBackend::pushToCluster: entity push failed: " << e.what() << std::endl; } } pushManifestSync(keys); _Dirty = false; Loading Loading @@ -488,41 +476,36 @@ namespace authdb { newRevesion(); if (g_Cluster && g_Cluster->isRunning()) { // Check if any entity actually changed via hash comparison auto entities = parseEntities(); std::set<EntityKey> current_keys; bool structure_changed = false; bool data_changed = false; bool changed = false; for (auto &[ek, data] : entities) { current_keys.insert(ek); uint64_t hash = fnv1a_hash(data); auto it = entity_hashes_.find(ek); if (it == entity_hashes_.end()) { // New entity structure_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); entity_hashes_[ek] = hash; changed = true; } else if (it->second != hash) { // Changed entity data_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); it->second = hash; changed = true; } } // Detect deleted entities for (auto it = entity_hashes_.begin(); it != entity_hashes_.end(); ) { if (current_keys.find(it->first) == current_keys.end()) { g_Cluster->queueDataRemove(entityGroupId(it->first)); it = entity_hashes_.erase(it); structure_changed = true; changed = true; } else { ++it; } } // Always push manifest when data changed (contains full buffer) if (structure_changed || data_changed) // Single manifest push contains full buffer — no per-entity stores needed if (changed) pushManifest(current_keys); // Update shared cache so other instances see the latest data Loading Loading @@ -572,19 +555,10 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (client) { // Remove all entity groups for (auto &[ek, _] : entity_hashes_) { try { client->remove(entityGroupId(ek)); } catch (const std::exception &e) { std::cerr << "ClusterBackend::purge: entity remove failed: " << e.what() << std::endl; } } // Remove manifest try { client->remove(domainGroupId()); } catch (const std::exception &e) { std::cerr << "ClusterBackend::purge: manifest remove failed: " << e.what() << std::endl; std::cerr << "ClusterBackend::purge: remove failed: " << e.what() << std::endl; } } Loading Loading
src/backends/cluster.cpp +10 −36 Original line number Diff line number Diff line Loading @@ -374,26 +374,14 @@ namespace authdb { } void ClusterBackend::pushToCluster() { // Sync push for destructor — push all dirty entities directly // Sync push for destructor — single manifest contains full buffer if (!_Dirty) return; newRevesion(); if (!g_Cluster || !g_Cluster->isRunning()) return; auto &client = g_Cluster->getClient(); if (!client) return; auto entities = parseEntities(); std::set<EntityKey> keys; for (auto &[ek, data] : entities) { for (auto &[ek, data] : entities) keys.insert(ek); try { client->store(entityGroupId(ek), data.data(), data.size()); } catch (const std::exception &e) { std::cerr << "ClusterBackend::pushToCluster: entity push failed: " << e.what() << std::endl; } } pushManifestSync(keys); _Dirty = false; Loading Loading @@ -488,41 +476,36 @@ namespace authdb { newRevesion(); if (g_Cluster && g_Cluster->isRunning()) { // Check if any entity actually changed via hash comparison auto entities = parseEntities(); std::set<EntityKey> current_keys; bool structure_changed = false; bool data_changed = false; bool changed = false; for (auto &[ek, data] : entities) { current_keys.insert(ek); uint64_t hash = fnv1a_hash(data); auto it = entity_hashes_.find(ek); if (it == entity_hashes_.end()) { // New entity structure_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); entity_hashes_[ek] = hash; changed = true; } else if (it->second != hash) { // Changed entity data_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); it->second = hash; changed = true; } } // Detect deleted entities for (auto it = entity_hashes_.begin(); it != entity_hashes_.end(); ) { if (current_keys.find(it->first) == current_keys.end()) { g_Cluster->queueDataRemove(entityGroupId(it->first)); it = entity_hashes_.erase(it); structure_changed = true; changed = true; } else { ++it; } } // Always push manifest when data changed (contains full buffer) if (structure_changed || data_changed) // Single manifest push contains full buffer — no per-entity stores needed if (changed) pushManifest(current_keys); // Update shared cache so other instances see the latest data Loading Loading @@ -572,19 +555,10 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (client) { // Remove all entity groups for (auto &[ek, _] : entity_hashes_) { try { client->remove(entityGroupId(ek)); } catch (const std::exception &e) { std::cerr << "ClusterBackend::purge: entity remove failed: " << e.what() << std::endl; } } // Remove manifest try { client->remove(domainGroupId()); } catch (const std::exception &e) { std::cerr << "ClusterBackend::purge: manifest remove failed: " << e.what() << std::endl; std::cerr << "ClusterBackend::purge: remove failed: " << e.what() << std::endl; } } Loading