Loading src/admin.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -655,6 +655,7 @@ namespace authdb { json_object_object_add(jn, "groups", json_object_new_int(info.group_count)); json_object_object_add(jn, "peers_total", json_object_new_int(info.peers_total)); json_object_object_add(jn, "peers_online", json_object_new_int(info.peers_online)); json_object_object_add(jn, "scrub_running", json_object_new_boolean(info.scrub_running)); json_object_array_add(jnodes, jn); } json_object_object_add(jobj, "nodes", jnodes); Loading src/cluster.cpp +27 −13 Original line number Diff line number Diff line Loading @@ -463,6 +463,13 @@ namespace authdb { if (self_idx != SIZE_MAX) { session_read_client_->set_local_node(self_idx, session_store_); } // Dedicated scrub/rebalance client — avoids blocking pclient_ // (push_worker + health monitor) during long scrub operations scrub_client_ = std::make_unique<paritypp::client>( cfg_.data_blocks, cfg_.parity_blocks, nodes, pcreds); if (self_idx != SIZE_MAX) { scrub_client_->set_local_node(self_idx, store_); } // Pre-establish QUIC connections so first requests don't pay // handshake + auth latency (2-3s per node) Loading Loading @@ -630,6 +637,8 @@ namespace authdb { session_read_client_->mark_node_dead(i); if (read_client_) read_client_->mark_node_dead(i); if (scrub_client_) scrub_client_->mark_node_dead(i); } } } Loading Loading @@ -689,18 +698,20 @@ namespace authdb { std::thread([this](){ std::unique_lock<std::mutex> lk(scrub_mutex_, std::try_to_lock); if (!lk.owns_lock()) return; if (!pclient_->try_scrub_lock()) return; if (!scrub_client_ || !scrub_client_->try_scrub_lock()) return; scrub_running_ = true; if (server_) server_->set_rebuilding(true); try { auto rb = pclient_->rebalance(); auto rb = scrub_client_->rebalance(); if (rb.rebalanced > 0) { std::cerr << "Cluster: periodic rebalance moved " << rb.rebalanced << " group(s)" << std::endl; pclient_->vacuum_all_nodes(); scrub_client_->vacuum_all_nodes(); } } catch (...) {} scrub_running_ = false; pclient_->release_scrub_lock(); if (server_) server_->set_rebuilding(false); scrub_client_->release_scrub_lock(); }).detach(); } } else if (health.nodes_online >= k) { Loading Loading @@ -1120,7 +1131,7 @@ namespace authdb { Cluster::ScrubResult Cluster::scrub() { ScrubResult result; if (!pclient_) return result; if (!scrub_client_) return result; // Only one scrub/rebalance at a time std::unique_lock<std::mutex> scrub_lk(scrub_mutex_, std::try_to_lock); Loading @@ -1129,11 +1140,13 @@ namespace authdb { return result; } scrub_running_ = true; if (server_) server_->set_rebuilding(true); // Cluster-wide lock: prevent concurrent scrub across nodes if (!pclient_->try_scrub_lock()) { if (!scrub_client_->try_scrub_lock()) { std::cerr << "[SCRUB] skipped — another node is scrubbing" << std::endl; scrub_running_ = false; if (server_) server_->set_rebuilding(false); return result; } Loading Loading @@ -1164,7 +1177,7 @@ namespace authdb { std::set<uint64_t> orphaned_groups; for (size_t ni = 0; ni < n; ++ni) { std::vector<uint64_t> node_groups; if (pclient_->list_groups_on_node(ni, node_groups)) { if (scrub_client_->list_groups_on_node(ni, node_groups)) { for (uint64_t gid : node_groups) { if (gid == lock_gid) continue; // skip lock group if (session_gids.count(gid)) Loading @@ -1187,7 +1200,7 @@ namespace authdb { << " orphaned groups" << std::endl; for (uint64_t gid : orphaned_groups) { try { pclient_->remove(gid); scrub_client_->remove(gid); } catch (...) {} } } else if (!orphaned_groups.empty()) { Loading Loading @@ -1216,7 +1229,7 @@ namespace authdb { << " nodes — repairing" << std::endl; try { auto data = pclient_->retrieve(gid); auto data = scrub_client_->retrieve(gid); if (data.empty()) { std::cerr << "[SCRUB] group " << gid << " — retrieve returned empty, skipping (NOT removing)" << std::endl; Loading Loading @@ -1246,8 +1259,8 @@ namespace authdb { } // Data is valid — safe to remove and re-store for correct placement pclient_->remove(gid); pclient_->store(gid, data.data(), data.size()); scrub_client_->remove(gid); scrub_client_->store(gid, data.data(), data.size()); result.groups_repaired++; std::cerr << "[SCRUB] group " << gid << " — repaired" << std::endl; Loading @@ -1260,7 +1273,7 @@ namespace authdb { // 3. Vacuum all nodes to reclaim space from overwritten / deleted blocks if (result.groups_repaired > 0) { pclient_->vacuum_all_nodes(); scrub_client_->vacuum_all_nodes(); recovery_epoch_.fetch_add(1); } Loading @@ -1269,7 +1282,8 @@ namespace authdb { << " failed=" << result.groups_failed << std::endl; scrub_running_ = false; pclient_->release_scrub_lock(); if (server_) server_->set_rebuilding(false); scrub_client_->release_scrub_lock(); return result; } Loading src/cluster.h +4 −0 Original line number Diff line number Diff line Loading @@ -291,6 +291,10 @@ namespace authdb { std::unique_ptr<paritypp::client> pclient_; // Dedicated client for scrub/rebalance — avoids blocking pclient_ // (used by push_worker and health monitor) during long scrub operations std::unique_ptr<paritypp::client> scrub_client_; // Dedicated client for domain data reads — avoids contention with // pclient_ which is used by push_worker (store) and health monitor std::unique_ptr<paritypp::client> read_client_; Loading Loading
src/admin.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -655,6 +655,7 @@ namespace authdb { json_object_object_add(jn, "groups", json_object_new_int(info.group_count)); json_object_object_add(jn, "peers_total", json_object_new_int(info.peers_total)); json_object_object_add(jn, "peers_online", json_object_new_int(info.peers_online)); json_object_object_add(jn, "scrub_running", json_object_new_boolean(info.scrub_running)); json_object_array_add(jnodes, jn); } json_object_object_add(jobj, "nodes", jnodes); Loading
src/cluster.cpp +27 −13 Original line number Diff line number Diff line Loading @@ -463,6 +463,13 @@ namespace authdb { if (self_idx != SIZE_MAX) { session_read_client_->set_local_node(self_idx, session_store_); } // Dedicated scrub/rebalance client — avoids blocking pclient_ // (push_worker + health monitor) during long scrub operations scrub_client_ = std::make_unique<paritypp::client>( cfg_.data_blocks, cfg_.parity_blocks, nodes, pcreds); if (self_idx != SIZE_MAX) { scrub_client_->set_local_node(self_idx, store_); } // Pre-establish QUIC connections so first requests don't pay // handshake + auth latency (2-3s per node) Loading Loading @@ -630,6 +637,8 @@ namespace authdb { session_read_client_->mark_node_dead(i); if (read_client_) read_client_->mark_node_dead(i); if (scrub_client_) scrub_client_->mark_node_dead(i); } } } Loading Loading @@ -689,18 +698,20 @@ namespace authdb { std::thread([this](){ std::unique_lock<std::mutex> lk(scrub_mutex_, std::try_to_lock); if (!lk.owns_lock()) return; if (!pclient_->try_scrub_lock()) return; if (!scrub_client_ || !scrub_client_->try_scrub_lock()) return; scrub_running_ = true; if (server_) server_->set_rebuilding(true); try { auto rb = pclient_->rebalance(); auto rb = scrub_client_->rebalance(); if (rb.rebalanced > 0) { std::cerr << "Cluster: periodic rebalance moved " << rb.rebalanced << " group(s)" << std::endl; pclient_->vacuum_all_nodes(); scrub_client_->vacuum_all_nodes(); } } catch (...) {} scrub_running_ = false; pclient_->release_scrub_lock(); if (server_) server_->set_rebuilding(false); scrub_client_->release_scrub_lock(); }).detach(); } } else if (health.nodes_online >= k) { Loading Loading @@ -1120,7 +1131,7 @@ namespace authdb { Cluster::ScrubResult Cluster::scrub() { ScrubResult result; if (!pclient_) return result; if (!scrub_client_) return result; // Only one scrub/rebalance at a time std::unique_lock<std::mutex> scrub_lk(scrub_mutex_, std::try_to_lock); Loading @@ -1129,11 +1140,13 @@ namespace authdb { return result; } scrub_running_ = true; if (server_) server_->set_rebuilding(true); // Cluster-wide lock: prevent concurrent scrub across nodes if (!pclient_->try_scrub_lock()) { if (!scrub_client_->try_scrub_lock()) { std::cerr << "[SCRUB] skipped — another node is scrubbing" << std::endl; scrub_running_ = false; if (server_) server_->set_rebuilding(false); return result; } Loading Loading @@ -1164,7 +1177,7 @@ namespace authdb { std::set<uint64_t> orphaned_groups; for (size_t ni = 0; ni < n; ++ni) { std::vector<uint64_t> node_groups; if (pclient_->list_groups_on_node(ni, node_groups)) { if (scrub_client_->list_groups_on_node(ni, node_groups)) { for (uint64_t gid : node_groups) { if (gid == lock_gid) continue; // skip lock group if (session_gids.count(gid)) Loading @@ -1187,7 +1200,7 @@ namespace authdb { << " orphaned groups" << std::endl; for (uint64_t gid : orphaned_groups) { try { pclient_->remove(gid); scrub_client_->remove(gid); } catch (...) {} } } else if (!orphaned_groups.empty()) { Loading Loading @@ -1216,7 +1229,7 @@ namespace authdb { << " nodes — repairing" << std::endl; try { auto data = pclient_->retrieve(gid); auto data = scrub_client_->retrieve(gid); if (data.empty()) { std::cerr << "[SCRUB] group " << gid << " — retrieve returned empty, skipping (NOT removing)" << std::endl; Loading Loading @@ -1246,8 +1259,8 @@ namespace authdb { } // Data is valid — safe to remove and re-store for correct placement pclient_->remove(gid); pclient_->store(gid, data.data(), data.size()); scrub_client_->remove(gid); scrub_client_->store(gid, data.data(), data.size()); result.groups_repaired++; std::cerr << "[SCRUB] group " << gid << " — repaired" << std::endl; Loading @@ -1260,7 +1273,7 @@ namespace authdb { // 3. Vacuum all nodes to reclaim space from overwritten / deleted blocks if (result.groups_repaired > 0) { pclient_->vacuum_all_nodes(); scrub_client_->vacuum_all_nodes(); recovery_epoch_.fetch_add(1); } Loading @@ -1269,7 +1282,8 @@ namespace authdb { << " failed=" << result.groups_failed << std::endl; scrub_running_ = false; pclient_->release_scrub_lock(); if (server_) server_->set_rebuilding(false); scrub_client_->release_scrub_lock(); return result; } Loading
src/cluster.h +4 −0 Original line number Diff line number Diff line Loading @@ -291,6 +291,10 @@ namespace authdb { std::unique_ptr<paritypp::client> pclient_; // Dedicated client for scrub/rebalance — avoids blocking pclient_ // (used by push_worker and health monitor) during long scrub operations std::unique_ptr<paritypp::client> scrub_client_; // Dedicated client for domain data reads — avoids contention with // pclient_ which is used by push_worker (store) and health monitor std::unique_ptr<paritypp::client> read_client_; Loading