Loading src/backends/cluster.cpp +14 −0 Original line number Diff line number Diff line Loading @@ -106,6 +106,20 @@ namespace authdb { cache->last_fetch = {}; } std::set<uint64_t> ClusterBackend::knownDomainGroupIds() { std::shared_lock<std::shared_mutex> rl(s_cache_mutex_); std::set<uint64_t> ids; for (auto &[domain, _] : s_domain_cache_) { std::vector<uint8_t> input(domain.begin(), domain.end()); auto hash = netplus::sha256_hash(input); uint64_t id = 0; for (int i = 0; i < 8; i++) id = (id << 8) | hash[i]; ids.insert(id); } return ids; } static uint64_t fnv1a_hash(const std::vector<uint8_t> &data) { uint64_t hash = 14695981039346656037ULL; for (auto b : data) { Loading src/backends/cluster.h +3 −0 Original line number Diff line number Diff line Loading @@ -85,6 +85,9 @@ namespace authdb { // Force next fetchFromCluster to bypass the 30s throttle void forceRefresh(); // Returns the set of domain group IDs from all known cached domains static std::set<uint64_t> knownDomainGroupIds(); private: struct EntityKey { unsigned char ruid[16] = {0}; Loading src/cluster.cpp +21 −2 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include "cluster.h" #include "exception.h" #include "backends/cluster.h" #include <netplus/exception.h> #include <crypto/sha.h> Loading Loading @@ -1121,7 +1122,9 @@ namespace authdb { // 1. Collect all group IDs from every node uint64_t lock_gid = paritypp::client::scrub_lock_group_id(); auto known_domain_gids = ClusterBackend::knownDomainGroupIds(); std::map<uint64_t, std::set<size_t>> group_nodes; // group_id -> set of node indices that have it std::set<uint64_t> orphaned_groups; for (size_t ni = 0; ni < n; ++ni) { std::vector<uint64_t> node_groups; if (pclient_->list_groups_on_node(ni, node_groups)) { Loading @@ -1129,14 +1132,30 @@ namespace authdb { if (gid == lock_gid) continue; // skip lock group if (session_gids.count(gid)) continue; // skip session groups if (known_domain_gids.count(gid)) { group_nodes[gid].insert(ni); } else { orphaned_groups.insert(gid); } } } } // Remove orphaned groups (old per-entity groups or stale data) if (!orphaned_groups.empty()) { std::cerr << "[SCRUB] removing " << orphaned_groups.size() << " orphaned groups" << std::endl; for (uint64_t gid : orphaned_groups) { try { pclient_->remove(gid); } catch (...) {} } } std::cerr << "[SCRUB] found " << group_nodes.size() << " data groups across " << n << " nodes" << " (skipped " << session_gids.size() << " session groups)" << std::endl; << " (skipped " << session_gids.size() << " session groups" << ", removed " << orphaned_groups.size() << " orphaned)" << std::endl; // 2. Repair under-replicated AND rebalance misplaced in one pass for (auto &[gid, present_nodes] : group_nodes) { Loading Loading
src/backends/cluster.cpp +14 −0 Original line number Diff line number Diff line Loading @@ -106,6 +106,20 @@ namespace authdb { cache->last_fetch = {}; } std::set<uint64_t> ClusterBackend::knownDomainGroupIds() { std::shared_lock<std::shared_mutex> rl(s_cache_mutex_); std::set<uint64_t> ids; for (auto &[domain, _] : s_domain_cache_) { std::vector<uint8_t> input(domain.begin(), domain.end()); auto hash = netplus::sha256_hash(input); uint64_t id = 0; for (int i = 0; i < 8; i++) id = (id << 8) | hash[i]; ids.insert(id); } return ids; } static uint64_t fnv1a_hash(const std::vector<uint8_t> &data) { uint64_t hash = 14695981039346656037ULL; for (auto b : data) { Loading
src/backends/cluster.h +3 −0 Original line number Diff line number Diff line Loading @@ -85,6 +85,9 @@ namespace authdb { // Force next fetchFromCluster to bypass the 30s throttle void forceRefresh(); // Returns the set of domain group IDs from all known cached domains static std::set<uint64_t> knownDomainGroupIds(); private: struct EntityKey { unsigned char ruid[16] = {0}; Loading
src/cluster.cpp +21 −2 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include "cluster.h" #include "exception.h" #include "backends/cluster.h" #include <netplus/exception.h> #include <crypto/sha.h> Loading Loading @@ -1121,7 +1122,9 @@ namespace authdb { // 1. Collect all group IDs from every node uint64_t lock_gid = paritypp::client::scrub_lock_group_id(); auto known_domain_gids = ClusterBackend::knownDomainGroupIds(); std::map<uint64_t, std::set<size_t>> group_nodes; // group_id -> set of node indices that have it std::set<uint64_t> orphaned_groups; for (size_t ni = 0; ni < n; ++ni) { std::vector<uint64_t> node_groups; if (pclient_->list_groups_on_node(ni, node_groups)) { Loading @@ -1129,14 +1132,30 @@ namespace authdb { if (gid == lock_gid) continue; // skip lock group if (session_gids.count(gid)) continue; // skip session groups if (known_domain_gids.count(gid)) { group_nodes[gid].insert(ni); } else { orphaned_groups.insert(gid); } } } } // Remove orphaned groups (old per-entity groups or stale data) if (!orphaned_groups.empty()) { std::cerr << "[SCRUB] removing " << orphaned_groups.size() << " orphaned groups" << std::endl; for (uint64_t gid : orphaned_groups) { try { pclient_->remove(gid); } catch (...) {} } } std::cerr << "[SCRUB] found " << group_nodes.size() << " data groups across " << n << " nodes" << " (skipped " << session_gids.size() << " session groups)" << std::endl; << " (skipped " << session_gids.size() << " session groups" << ", removed " << orphaned_groups.size() << " orphaned)" << std::endl; // 2. Repair under-replicated AND rebalance misplaced in one pass for (auto &[gid, present_nodes] : group_nodes) { Loading