Loading src/backends/cluster.cpp +20 −13 Original line number Diff line number Diff line Loading @@ -210,12 +210,15 @@ namespace authdb { _Buffer.begin() + sizeof(AuthHeader), _Buffer.end()); g_Cluster->queueDataPush(domainGroupId(), std::move(manifest)); Cluster *cluster = g_Cluster; if (cluster) cluster->queueDataPush(domainGroupId(), std::move(manifest)); } void ClusterBackend::pushManifestSync(const std::set<EntityKey> &keys) { if (!g_Cluster || !g_Cluster->isRunning()) return; auto &client = g_Cluster->getClient(); Cluster *cluster = g_Cluster; if (!cluster || !cluster->isRunning()) return; auto &client = cluster->getClient(); if (!client) return; AuthHeader head; Loading @@ -226,7 +229,7 @@ namespace authdb { // This prevents a node with stale/empty data from wiping out freshly // imported or updated domain data on other nodes. try { auto &read_cli = g_Cluster->getReadClient(); auto &read_cli = cluster->getReadClient(); if (read_cli) { auto cluster_manifest = read_cli->retrieve(domainGroupId()); if (cluster_manifest.size() >= sizeof(AuthHeader)) { Loading Loading @@ -275,12 +278,13 @@ namespace authdb { } void ClusterBackend::fetchFromCluster() { if (!g_Cluster || !g_Cluster->isRunning()) Cluster *cluster = g_Cluster; if (!cluster || !cluster->isRunning()) return; // While scrub/rebuild is running, blocks may not be on all nodes // yet — skip the expensive network fetch and use cached data. if (g_Cluster->isScrubRunning()) { if (cluster->isScrubRunning()) { auto cache = getDomainCache(); if (!cache->buffer.empty() && _Buffer.size() <= sizeof(AuthHeader)) { _Buffer = cache->buffer; Loading @@ -295,7 +299,7 @@ namespace authdb { // Fast path: check if cached data is fresh bool do_peer_fetch = (now - cache->last_fetch) >= std::chrono::seconds(10); uint64_t cur_epoch = g_Cluster->getRecoveryEpoch(); uint64_t cur_epoch = cluster->getRecoveryEpoch(); if (cur_epoch != cache->recovery_epoch) do_peer_fetch = true; Loading Loading @@ -324,7 +328,7 @@ namespace authdb { DBG_LOG("[CLUSTER-BE] fetchFromCluster domain=" << _Domain << " buf_size=" << _Buffer.size() << " dirty=" << _Dirty.load() << "\n"); auto &client = g_Cluster->getReadClient(); auto &client = cluster->getReadClient(); if (!client) return; // Single retrieve with timeout: manifest contains header + magic + entity keys + full record data Loading Loading @@ -523,7 +527,8 @@ namespace authdb { : _Domain(domain), _Pos(0), _Dirty(false) { // Try the shared domain cache first — no network I/O needed. if (g_Cluster && g_Cluster->isRunning()) { Cluster *cluster = g_Cluster; if (cluster && cluster->isRunning()) { auto cache = getDomainCache(); if (!cache->buffer.empty()) { _Buffer = cache->buffer; Loading @@ -531,7 +536,7 @@ namespace authdb { } else { // No cached data yet — check local block store (no network) uint64_t dgid = domainGroupId(); auto &store = g_Cluster->getStore(); auto &store = cluster->getStore(); auto blocks = store.list_blocks(dgid); if (!blocks.empty()) _ClusterDataExists.store(true); Loading Loading @@ -616,7 +621,8 @@ namespace authdb { if (_Dirty) { newRevesion(); if (g_Cluster && g_Cluster->isRunning()) { Cluster *cluster = g_Cluster; if (cluster && cluster->isRunning()) { // Check if any entity actually changed via hash comparison auto entities = parseEntities(); std::set<EntityKey> current_keys; Loading Loading @@ -690,10 +696,11 @@ namespace authdb { } void ClusterBackend::purge() { if (!g_Cluster || !g_Cluster->isRunning()) Cluster *cluster = g_Cluster; if (!cluster || !cluster->isRunning()) return; auto &client = g_Cluster->getClient(); auto &client = cluster->getClient(); if (client) { try { client->remove(domainGroupId()); Loading Loading
src/backends/cluster.cpp +20 −13 Original line number Diff line number Diff line Loading @@ -210,12 +210,15 @@ namespace authdb { _Buffer.begin() + sizeof(AuthHeader), _Buffer.end()); g_Cluster->queueDataPush(domainGroupId(), std::move(manifest)); Cluster *cluster = g_Cluster; if (cluster) cluster->queueDataPush(domainGroupId(), std::move(manifest)); } void ClusterBackend::pushManifestSync(const std::set<EntityKey> &keys) { if (!g_Cluster || !g_Cluster->isRunning()) return; auto &client = g_Cluster->getClient(); Cluster *cluster = g_Cluster; if (!cluster || !cluster->isRunning()) return; auto &client = cluster->getClient(); if (!client) return; AuthHeader head; Loading @@ -226,7 +229,7 @@ namespace authdb { // This prevents a node with stale/empty data from wiping out freshly // imported or updated domain data on other nodes. try { auto &read_cli = g_Cluster->getReadClient(); auto &read_cli = cluster->getReadClient(); if (read_cli) { auto cluster_manifest = read_cli->retrieve(domainGroupId()); if (cluster_manifest.size() >= sizeof(AuthHeader)) { Loading Loading @@ -275,12 +278,13 @@ namespace authdb { } void ClusterBackend::fetchFromCluster() { if (!g_Cluster || !g_Cluster->isRunning()) Cluster *cluster = g_Cluster; if (!cluster || !cluster->isRunning()) return; // While scrub/rebuild is running, blocks may not be on all nodes // yet — skip the expensive network fetch and use cached data. if (g_Cluster->isScrubRunning()) { if (cluster->isScrubRunning()) { auto cache = getDomainCache(); if (!cache->buffer.empty() && _Buffer.size() <= sizeof(AuthHeader)) { _Buffer = cache->buffer; Loading @@ -295,7 +299,7 @@ namespace authdb { // Fast path: check if cached data is fresh bool do_peer_fetch = (now - cache->last_fetch) >= std::chrono::seconds(10); uint64_t cur_epoch = g_Cluster->getRecoveryEpoch(); uint64_t cur_epoch = cluster->getRecoveryEpoch(); if (cur_epoch != cache->recovery_epoch) do_peer_fetch = true; Loading Loading @@ -324,7 +328,7 @@ namespace authdb { DBG_LOG("[CLUSTER-BE] fetchFromCluster domain=" << _Domain << " buf_size=" << _Buffer.size() << " dirty=" << _Dirty.load() << "\n"); auto &client = g_Cluster->getReadClient(); auto &client = cluster->getReadClient(); if (!client) return; // Single retrieve with timeout: manifest contains header + magic + entity keys + full record data Loading Loading @@ -523,7 +527,8 @@ namespace authdb { : _Domain(domain), _Pos(0), _Dirty(false) { // Try the shared domain cache first — no network I/O needed. if (g_Cluster && g_Cluster->isRunning()) { Cluster *cluster = g_Cluster; if (cluster && cluster->isRunning()) { auto cache = getDomainCache(); if (!cache->buffer.empty()) { _Buffer = cache->buffer; Loading @@ -531,7 +536,7 @@ namespace authdb { } else { // No cached data yet — check local block store (no network) uint64_t dgid = domainGroupId(); auto &store = g_Cluster->getStore(); auto &store = cluster->getStore(); auto blocks = store.list_blocks(dgid); if (!blocks.empty()) _ClusterDataExists.store(true); Loading Loading @@ -616,7 +621,8 @@ namespace authdb { if (_Dirty) { newRevesion(); if (g_Cluster && g_Cluster->isRunning()) { Cluster *cluster = g_Cluster; if (cluster && cluster->isRunning()) { // Check if any entity actually changed via hash comparison auto entities = parseEntities(); std::set<EntityKey> current_keys; Loading Loading @@ -690,10 +696,11 @@ namespace authdb { } void ClusterBackend::purge() { if (!g_Cluster || !g_Cluster->isRunning()) Cluster *cluster = g_Cluster; if (!cluster || !cluster->isRunning()) return; auto &client = g_Cluster->getClient(); auto &client = cluster->getClient(); if (client) { try { client->remove(domainGroupId()); Loading