Loading src/backends/cluster.cpp +49 −27 Original line number Diff line number Diff line Loading @@ -189,6 +189,12 @@ namespace authdb { std::memcpy(manifest.data() + pos + 16, &t, sizeof(int32_t)); } // Append full record data so fetch only needs 1 retrieve call if (_Buffer.size() > sizeof(AuthHeader)) manifest.insert(manifest.end(), _Buffer.begin() + sizeof(AuthHeader), _Buffer.end()); g_Cluster->queueDataPush(domainGroupId(), std::move(manifest)); } Loading Loading @@ -219,6 +225,12 @@ namespace authdb { std::memcpy(manifest.data() + pos + 16, &t, sizeof(int32_t)); } // Append full record data if (_Buffer.size() > sizeof(AuthHeader)) manifest.insert(manifest.end(), _Buffer.begin() + sizeof(AuthHeader), _Buffer.end()); try { client->store(domainGroupId(), manifest.data(), manifest.size()); } catch (const std::exception &e) { Loading @@ -233,7 +245,7 @@ namespace authdb { auto cache = getDomainCache(); auto now = std::chrono::steady_clock::now(); // Fast path: check if cached data is fresh (no lock needed for read) // Fast path: check if cached data is fresh bool do_peer_fetch = (now - cache->last_fetch) >= std::chrono::seconds(30); uint64_t cur_epoch = g_Cluster->getRecoveryEpoch(); Loading @@ -241,7 +253,6 @@ namespace authdb { do_peer_fetch = true; if (!do_peer_fetch) { // Use cached data if this instance has no data yet if (_Buffer.size() <= sizeof(AuthHeader) && !cache->buffer.empty()) { _Buffer = cache->buffer; _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader)); Loading @@ -251,8 +262,7 @@ namespace authdb { return; } // Serialize fetches per domain — if another thread is already fetching, // wait for its result instead of hammering the cluster. // Serialize fetches per domain std::unique_lock<std::mutex> flk(cache->fetch_mutex, std::try_to_lock); if (!flk.owns_lock()) { std::lock_guard<std::mutex> wait_lk(cache->fetch_mutex); Loading @@ -270,14 +280,14 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (!client) return; // 1. Fetch manifest from domain group // Single retrieve: manifest contains header + magic + entity keys + full record data std::vector<uint8_t> manifest; try { manifest = client->retrieve(dgid); cache->last_fetch = now; cache->recovery_epoch = cur_epoch; } catch (const std::exception &e) { std::cerr << "ClusterBackend::fetchFromCluster: manifest retrieve failed: " std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed: " << e.what() << std::endl; _ClusterDataExists.store(true); return; Loading @@ -292,7 +302,7 @@ namespace authdb { return; } // 2. Parse header and verify magic // Parse header AuthHeader head; std::memcpy(&head, manifest.data(), sizeof(AuthHeader)); Loading @@ -312,11 +322,15 @@ namespace authdb { std::memcpy(&entity_count, manifest.data() + moff, sizeof(uint32_t)); moff += sizeof(uint32_t); // 3. Parse entity keys from manifest // Skip entity key list size_t key_section = entity_count * (16 + sizeof(int32_t)); if (moff + key_section > manifest.size()) return; // Parse entity keys for hash tracking entity_hashes_.clear(); std::vector<EntityKey> keys; keys.reserve(entity_count); for (uint32_t i = 0; i < entity_count; i++) { if (moff + 16 + sizeof(int32_t) > manifest.size()) break; EntityKey ek; std::memcpy(ek.ruid, manifest.data() + moff, 16); moff += 16; Loading @@ -327,31 +341,36 @@ namespace authdb { keys.push_back(ek); } // 4. Fetch each entity and rebuild _Buffer // Rebuild _Buffer: header + record data that follows the entity key list _Buffer.resize(sizeof(AuthHeader)); std::memcpy(_Buffer.data(), &head, sizeof(AuthHeader)); entity_hashes_.clear(); for (auto &ek : keys) { uint64_t egid = entityGroupId(ek); try { auto entity_data = client->retrieve(egid); if (!entity_data.empty()) { _Buffer.insert(_Buffer.end(), entity_data.begin(), entity_data.end()); entity_hashes_[ek] = fnv1a_hash(entity_data); } } catch (const std::exception &e) { std::cerr << "[CLUSTER-BE] failed to fetch entity: " << e.what() << std::endl; if (moff < manifest.size()) { _Buffer.insert(_Buffer.end(), manifest.begin() + moff, manifest.end()); } // Validate buffer integrity if (!bufferValid(_Buffer)) { std::cerr << "[CLUSTER-BE] domain=" << _Domain << " manifest record data is corrupt, discarding" << std::endl; _Buffer.resize(sizeof(AuthHeader)); } _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader)); // Build entity hashes from records auto entities = parseEntities(); for (auto &[ek, data] : entities) { entity_hashes_[ek] = fnv1a_hash(data); } // Update shared cache cache->buffer = _Buffer; DBG_LOG("[CLUSTER-BE] fetchFromCluster rebuilt buf_size=" << _Buffer.size() << " entities=" << keys.size() << "\n"); << " entities=" << entity_hashes_.size() << "\n"); } void ClusterBackend::pushToCluster() { Loading Loading @@ -471,7 +490,8 @@ namespace authdb { if (g_Cluster && g_Cluster->isRunning()) { auto entities = parseEntities(); std::set<EntityKey> current_keys; bool manifest_changed = false; bool structure_changed = false; bool data_changed = false; for (auto &[ek, data] : entities) { current_keys.insert(ek); Loading @@ -479,11 +499,12 @@ namespace authdb { auto it = entity_hashes_.find(ek); if (it == entity_hashes_.end()) { // New entity manifest_changed = true; structure_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); entity_hashes_[ek] = hash; } else if (it->second != hash) { // Changed entity data_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); it->second = hash; } Loading @@ -494,13 +515,14 @@ namespace authdb { if (current_keys.find(it->first) == current_keys.end()) { g_Cluster->queueDataRemove(entityGroupId(it->first)); it = entity_hashes_.erase(it); manifest_changed = true; structure_changed = true; } else { ++it; } } if (manifest_changed) // Always push manifest when data changed (contains full buffer) if (structure_changed || data_changed) pushManifest(current_keys); // Update shared cache so other instances see the latest data Loading Loading
src/backends/cluster.cpp +49 −27 Original line number Diff line number Diff line Loading @@ -189,6 +189,12 @@ namespace authdb { std::memcpy(manifest.data() + pos + 16, &t, sizeof(int32_t)); } // Append full record data so fetch only needs 1 retrieve call if (_Buffer.size() > sizeof(AuthHeader)) manifest.insert(manifest.end(), _Buffer.begin() + sizeof(AuthHeader), _Buffer.end()); g_Cluster->queueDataPush(domainGroupId(), std::move(manifest)); } Loading Loading @@ -219,6 +225,12 @@ namespace authdb { std::memcpy(manifest.data() + pos + 16, &t, sizeof(int32_t)); } // Append full record data if (_Buffer.size() > sizeof(AuthHeader)) manifest.insert(manifest.end(), _Buffer.begin() + sizeof(AuthHeader), _Buffer.end()); try { client->store(domainGroupId(), manifest.data(), manifest.size()); } catch (const std::exception &e) { Loading @@ -233,7 +245,7 @@ namespace authdb { auto cache = getDomainCache(); auto now = std::chrono::steady_clock::now(); // Fast path: check if cached data is fresh (no lock needed for read) // Fast path: check if cached data is fresh bool do_peer_fetch = (now - cache->last_fetch) >= std::chrono::seconds(30); uint64_t cur_epoch = g_Cluster->getRecoveryEpoch(); Loading @@ -241,7 +253,6 @@ namespace authdb { do_peer_fetch = true; if (!do_peer_fetch) { // Use cached data if this instance has no data yet if (_Buffer.size() <= sizeof(AuthHeader) && !cache->buffer.empty()) { _Buffer = cache->buffer; _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader)); Loading @@ -251,8 +262,7 @@ namespace authdb { return; } // Serialize fetches per domain — if another thread is already fetching, // wait for its result instead of hammering the cluster. // Serialize fetches per domain std::unique_lock<std::mutex> flk(cache->fetch_mutex, std::try_to_lock); if (!flk.owns_lock()) { std::lock_guard<std::mutex> wait_lk(cache->fetch_mutex); Loading @@ -270,14 +280,14 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (!client) return; // 1. Fetch manifest from domain group // Single retrieve: manifest contains header + magic + entity keys + full record data std::vector<uint8_t> manifest; try { manifest = client->retrieve(dgid); cache->last_fetch = now; cache->recovery_epoch = cur_epoch; } catch (const std::exception &e) { std::cerr << "ClusterBackend::fetchFromCluster: manifest retrieve failed: " std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed: " << e.what() << std::endl; _ClusterDataExists.store(true); return; Loading @@ -292,7 +302,7 @@ namespace authdb { return; } // 2. Parse header and verify magic // Parse header AuthHeader head; std::memcpy(&head, manifest.data(), sizeof(AuthHeader)); Loading @@ -312,11 +322,15 @@ namespace authdb { std::memcpy(&entity_count, manifest.data() + moff, sizeof(uint32_t)); moff += sizeof(uint32_t); // 3. Parse entity keys from manifest // Skip entity key list size_t key_section = entity_count * (16 + sizeof(int32_t)); if (moff + key_section > manifest.size()) return; // Parse entity keys for hash tracking entity_hashes_.clear(); std::vector<EntityKey> keys; keys.reserve(entity_count); for (uint32_t i = 0; i < entity_count; i++) { if (moff + 16 + sizeof(int32_t) > manifest.size()) break; EntityKey ek; std::memcpy(ek.ruid, manifest.data() + moff, 16); moff += 16; Loading @@ -327,31 +341,36 @@ namespace authdb { keys.push_back(ek); } // 4. Fetch each entity and rebuild _Buffer // Rebuild _Buffer: header + record data that follows the entity key list _Buffer.resize(sizeof(AuthHeader)); std::memcpy(_Buffer.data(), &head, sizeof(AuthHeader)); entity_hashes_.clear(); for (auto &ek : keys) { uint64_t egid = entityGroupId(ek); try { auto entity_data = client->retrieve(egid); if (!entity_data.empty()) { _Buffer.insert(_Buffer.end(), entity_data.begin(), entity_data.end()); entity_hashes_[ek] = fnv1a_hash(entity_data); } } catch (const std::exception &e) { std::cerr << "[CLUSTER-BE] failed to fetch entity: " << e.what() << std::endl; if (moff < manifest.size()) { _Buffer.insert(_Buffer.end(), manifest.begin() + moff, manifest.end()); } // Validate buffer integrity if (!bufferValid(_Buffer)) { std::cerr << "[CLUSTER-BE] domain=" << _Domain << " manifest record data is corrupt, discarding" << std::endl; _Buffer.resize(sizeof(AuthHeader)); } _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader)); // Build entity hashes from records auto entities = parseEntities(); for (auto &[ek, data] : entities) { entity_hashes_[ek] = fnv1a_hash(data); } // Update shared cache cache->buffer = _Buffer; DBG_LOG("[CLUSTER-BE] fetchFromCluster rebuilt buf_size=" << _Buffer.size() << " entities=" << keys.size() << "\n"); << " entities=" << entity_hashes_.size() << "\n"); } void ClusterBackend::pushToCluster() { Loading Loading @@ -471,7 +490,8 @@ namespace authdb { if (g_Cluster && g_Cluster->isRunning()) { auto entities = parseEntities(); std::set<EntityKey> current_keys; bool manifest_changed = false; bool structure_changed = false; bool data_changed = false; for (auto &[ek, data] : entities) { current_keys.insert(ek); Loading @@ -479,11 +499,12 @@ namespace authdb { auto it = entity_hashes_.find(ek); if (it == entity_hashes_.end()) { // New entity manifest_changed = true; structure_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); entity_hashes_[ek] = hash; } else if (it->second != hash) { // Changed entity data_changed = true; g_Cluster->queueDataPush(entityGroupId(ek), std::move(data)); it->second = hash; } Loading @@ -494,13 +515,14 @@ namespace authdb { if (current_keys.find(it->first) == current_keys.end()) { g_Cluster->queueDataRemove(entityGroupId(it->first)); it = entity_hashes_.erase(it); manifest_changed = true; structure_changed = true; } else { ++it; } } if (manifest_changed) // Always push manifest when data changed (contains full buffer) if (structure_changed || data_changed) pushManifest(current_keys); // Update shared cache so other instances see the latest data Loading