Loading src/backends/cluster.cpp +6 −23 Original line number Diff line number Diff line Loading @@ -105,18 +105,14 @@ namespace authdb { } if (!do_peer_fetch) return; // Nothing to do — don't touch client_mutex_ return; // Nothing to do std::vector<uint8_t> peer_data; auto &client = g_Cluster->getClient(); if (client) { try { auto &cmtx = g_Cluster->getClientMutex(); std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::milliseconds(200)); if (lock.owns_lock()) { peer_data = client->retrieve(dgid); _LastPeerFetch = now; } } catch (const std::exception &e) { std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed: " << e.what() << std::endl; Loading Loading @@ -264,17 +260,8 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (client) { try { auto &cmtx = g_Cluster->getClientMutex(); for (int attempt = 0; attempt < 3; ++attempt) { std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::seconds(1)); if (lock.owns_lock()) { client->store(dgid, buf.data(), buf.size()); return true; } if (attempt == 2) { std::cerr << "ClusterBackend::pushToCluster: replication failed after retries" << std::endl; } } } catch (const netplus::NetException &e) { std::cerr << "Cluster replicate failed: " << e.what() << std::endl; } catch (const std::exception &e) { Loading Loading @@ -327,11 +314,7 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (client) { try { auto &cmtx = g_Cluster->getClientMutex(); std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::seconds(3)); if (lock.owns_lock()) { client->remove(dgid); } } catch (const std::exception &e) { std::cerr << "ClusterBackend::purge: peer remove failed: " << e.what() << std::endl; } Loading src/cluster.cpp +8 −59 Original line number Diff line number Diff line Loading @@ -424,8 +424,7 @@ namespace authdb { } // Register peer status callback so GET_STATUS reports peer counts. // This callback runs on the server thread when a remote node queries us, // so it must NOT acquire client_mutex_ or do network I/O (deadlock risk). // This callback runs on the server thread when a remote node queries us. // peer_count is based on actual filtered node list (after self-exclusion). server_->set_peer_status_callback([peer_count]() -> std::pair<uint32_t, uint32_t> { return {static_cast<uint32_t>(peer_count), static_cast<uint32_t>(peer_count)}; Loading Loading @@ -528,11 +527,6 @@ namespace authdb { first_run = false; if (!running_ || !pclient_) continue; try { // Always use try_to_lock — health probes must never starve // session pushes/fetches. If we can't get the lock, skip // this cycle; the next one (3s or 10s later) will retry. std::unique_lock<std::timed_mutex> lock(client_mutex_, std::try_to_lock); if (!lock.owns_lock()) continue; auto health = pclient_->get_cluster_status(); bool was_degraded = degraded_; bool was_critical = critical_; Loading Loading @@ -591,24 +585,16 @@ namespace authdb { << " sgid=" << sgid << " sid_gid=" << sid_gid << " data_size=" << data.size() << std::endl; // Store each key in a separate lock acquisition so other operations // (fetch, remove) can interleave instead of waiting 2× store time. // Store each key — client is internally thread-safe auto store_one = [&](uint64_t gid, const char *label) { for (int attempt = 0; attempt < 5; ++attempt) { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::milliseconds(200)); if (!lock.owns_lock()) { std::cerr << "[CLUSTER-SESSION] pushSession " << label << ": lock attempt " << attempt + 1 << "/5 failed, retrying..." << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } for (int attempt = 0; attempt < 3; ++attempt) { try { session_client_->store(gid, data.data(), data.size()); return true; } catch (const std::exception &e) { std::cerr << "[CLUSTER-SESSION] pushSession " << label << ": store attempt " << attempt + 1 << "/5 FAILED: " << e.what() << std::endl; if (attempt < 4) { << attempt + 1 << "/3 FAILED: " << e.what() << std::endl; if (attempt < 2) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); continue; } Loading @@ -618,9 +604,9 @@ namespace authdb { }; if (!store_one(sgid, "sgid")) throw AuthBackendError("cluster busy — session push (sgid) timeout after 5 retries"); throw AuthBackendError("cluster busy — session push (sgid) failed after 3 retries"); if (!store_one(sid_gid, "sid_gid")) throw AuthBackendError("cluster busy — session push (sid_gid) timeout after 5 retries"); throw AuthBackendError("cluster busy — session push (sid_gid) failed after 3 retries"); } void Cluster::queuePush(const SessionData &sess) { Loading Loading @@ -686,13 +672,7 @@ namespace authdb { if (have_dp) { data_future = std::async(std::launch::async, [this, dp = std::move(dp_item)]() mutable { try { std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(1)); if (lock.owns_lock()) { pclient_->store(dp.dgid, dp.buf.data(), dp.buf.size()); } else { std::lock_guard<std::mutex> pguard(push_mutex_); data_push_queue_.push_back(std::move(dp)); } } catch (const std::exception &e) { std::cerr << "[CLUSTER] push_worker: data push failed: " << e.what() << std::endl; std::lock_guard<std::mutex> pguard(push_mutex_); Loading @@ -705,17 +685,6 @@ namespace authdb { if (have_sess) { sess_future = std::async(std::launch::async, [this, item = std::move(sess_item)]() mutable { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(1)); if (!lock.owns_lock()) { item.retries++; if (item.retries < 5) { std::lock_guard<std::mutex> pguard(push_mutex_); push_queue_.push_back(std::move(item)); } else { std::cerr << "[CLUSTER] push_worker: dropped session after 5 retries" << std::endl; } return; } session_client_->store(item.sgid, item.data.data(), item.data.size()); session_client_->store(item.sid_gid, item.data.data(), item.data.size()); } catch (const std::exception &e) { Loading Loading @@ -746,8 +715,6 @@ namespace authdb { std::vector<uint8_t> data; if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid d_uid, d_did; Loading @@ -769,8 +736,6 @@ namespace authdb { // Fetch session to get the SID for removing the SID-keyed entry if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return; std::vector<uint8_t> data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid sid, d_uid, d_did; Loading @@ -793,8 +758,6 @@ namespace authdb { // Fetch session data to get uid+did key if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return; std::vector<uint8_t> data = session_client_->retrieve(sid_gid); if (!data.empty()) { uuid::uuid sid, uid, did; Loading @@ -816,8 +779,6 @@ namespace authdb { if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; auto data = session_client_->retrieve(sgid); if (!data.empty()) return true; } catch (...) {} Loading @@ -834,8 +795,6 @@ namespace authdb { // No local store — retrieve via erasure coding uint64_t sid_gid = sidGroupId(session_id); if (session_client_) { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; auto data = session_client_->retrieve(sid_gid); if (!data.empty()) { return SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results); Loading @@ -849,8 +808,6 @@ namespace authdb { if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; auto data = session_client_->retrieve(sid_gid); if (!data.empty()) return true; } catch (...) {} Loading @@ -871,11 +828,6 @@ namespace authdb { if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) { std::cerr << "[CLUSTER-SESSION] fetchBySid: session_client_mutex_ timeout" << std::endl; return false; } auto data = session_client_->retrieve(sid_gid); std::cerr << "[CLUSTER-SESSION] fetchBySid: retrieved " << data.size() << " bytes" << std::endl; if (!data.empty()) { Loading Loading @@ -904,8 +856,6 @@ namespace authdb { std::set<uint64_t> all_groups; try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return; size_t n = session_client_->get_cluster_status().nodes_total; for (size_t i = 0; i < n; i++) { std::vector<uint64_t> node_groups; Loading Loading @@ -1063,7 +1013,6 @@ namespace authdb { // Distribute erasure-coded shards to peer nodes try { std::unique_lock<std::timed_mutex> lock(client_mutex_); pclient_->store(localId, data.data(), data.size()); } catch (const netplus::NetException &e) { std::cerr << "Cluster replicate failed: " << e.what() << std::endl; Loading Loading
src/backends/cluster.cpp +6 −23 Original line number Diff line number Diff line Loading @@ -105,18 +105,14 @@ namespace authdb { } if (!do_peer_fetch) return; // Nothing to do — don't touch client_mutex_ return; // Nothing to do std::vector<uint8_t> peer_data; auto &client = g_Cluster->getClient(); if (client) { try { auto &cmtx = g_Cluster->getClientMutex(); std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::milliseconds(200)); if (lock.owns_lock()) { peer_data = client->retrieve(dgid); _LastPeerFetch = now; } } catch (const std::exception &e) { std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed: " << e.what() << std::endl; Loading Loading @@ -264,17 +260,8 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (client) { try { auto &cmtx = g_Cluster->getClientMutex(); for (int attempt = 0; attempt < 3; ++attempt) { std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::seconds(1)); if (lock.owns_lock()) { client->store(dgid, buf.data(), buf.size()); return true; } if (attempt == 2) { std::cerr << "ClusterBackend::pushToCluster: replication failed after retries" << std::endl; } } } catch (const netplus::NetException &e) { std::cerr << "Cluster replicate failed: " << e.what() << std::endl; } catch (const std::exception &e) { Loading Loading @@ -327,11 +314,7 @@ namespace authdb { auto &client = g_Cluster->getClient(); if (client) { try { auto &cmtx = g_Cluster->getClientMutex(); std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::seconds(3)); if (lock.owns_lock()) { client->remove(dgid); } } catch (const std::exception &e) { std::cerr << "ClusterBackend::purge: peer remove failed: " << e.what() << std::endl; } Loading
src/cluster.cpp +8 −59 Original line number Diff line number Diff line Loading @@ -424,8 +424,7 @@ namespace authdb { } // Register peer status callback so GET_STATUS reports peer counts. // This callback runs on the server thread when a remote node queries us, // so it must NOT acquire client_mutex_ or do network I/O (deadlock risk). // This callback runs on the server thread when a remote node queries us. // peer_count is based on actual filtered node list (after self-exclusion). server_->set_peer_status_callback([peer_count]() -> std::pair<uint32_t, uint32_t> { return {static_cast<uint32_t>(peer_count), static_cast<uint32_t>(peer_count)}; Loading Loading @@ -528,11 +527,6 @@ namespace authdb { first_run = false; if (!running_ || !pclient_) continue; try { // Always use try_to_lock — health probes must never starve // session pushes/fetches. If we can't get the lock, skip // this cycle; the next one (3s or 10s later) will retry. std::unique_lock<std::timed_mutex> lock(client_mutex_, std::try_to_lock); if (!lock.owns_lock()) continue; auto health = pclient_->get_cluster_status(); bool was_degraded = degraded_; bool was_critical = critical_; Loading Loading @@ -591,24 +585,16 @@ namespace authdb { << " sgid=" << sgid << " sid_gid=" << sid_gid << " data_size=" << data.size() << std::endl; // Store each key in a separate lock acquisition so other operations // (fetch, remove) can interleave instead of waiting 2× store time. // Store each key — client is internally thread-safe auto store_one = [&](uint64_t gid, const char *label) { for (int attempt = 0; attempt < 5; ++attempt) { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::milliseconds(200)); if (!lock.owns_lock()) { std::cerr << "[CLUSTER-SESSION] pushSession " << label << ": lock attempt " << attempt + 1 << "/5 failed, retrying..." << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } for (int attempt = 0; attempt < 3; ++attempt) { try { session_client_->store(gid, data.data(), data.size()); return true; } catch (const std::exception &e) { std::cerr << "[CLUSTER-SESSION] pushSession " << label << ": store attempt " << attempt + 1 << "/5 FAILED: " << e.what() << std::endl; if (attempt < 4) { << attempt + 1 << "/3 FAILED: " << e.what() << std::endl; if (attempt < 2) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); continue; } Loading @@ -618,9 +604,9 @@ namespace authdb { }; if (!store_one(sgid, "sgid")) throw AuthBackendError("cluster busy — session push (sgid) timeout after 5 retries"); throw AuthBackendError("cluster busy — session push (sgid) failed after 3 retries"); if (!store_one(sid_gid, "sid_gid")) throw AuthBackendError("cluster busy — session push (sid_gid) timeout after 5 retries"); throw AuthBackendError("cluster busy — session push (sid_gid) failed after 3 retries"); } void Cluster::queuePush(const SessionData &sess) { Loading Loading @@ -686,13 +672,7 @@ namespace authdb { if (have_dp) { data_future = std::async(std::launch::async, [this, dp = std::move(dp_item)]() mutable { try { std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(1)); if (lock.owns_lock()) { pclient_->store(dp.dgid, dp.buf.data(), dp.buf.size()); } else { std::lock_guard<std::mutex> pguard(push_mutex_); data_push_queue_.push_back(std::move(dp)); } } catch (const std::exception &e) { std::cerr << "[CLUSTER] push_worker: data push failed: " << e.what() << std::endl; std::lock_guard<std::mutex> pguard(push_mutex_); Loading @@ -705,17 +685,6 @@ namespace authdb { if (have_sess) { sess_future = std::async(std::launch::async, [this, item = std::move(sess_item)]() mutable { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(1)); if (!lock.owns_lock()) { item.retries++; if (item.retries < 5) { std::lock_guard<std::mutex> pguard(push_mutex_); push_queue_.push_back(std::move(item)); } else { std::cerr << "[CLUSTER] push_worker: dropped session after 5 retries" << std::endl; } return; } session_client_->store(item.sgid, item.data.data(), item.data.size()); session_client_->store(item.sid_gid, item.data.data(), item.data.size()); } catch (const std::exception &e) { Loading Loading @@ -746,8 +715,6 @@ namespace authdb { std::vector<uint8_t> data; if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid d_uid, d_did; Loading @@ -769,8 +736,6 @@ namespace authdb { // Fetch session to get the SID for removing the SID-keyed entry if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return; std::vector<uint8_t> data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid sid, d_uid, d_did; Loading @@ -793,8 +758,6 @@ namespace authdb { // Fetch session data to get uid+did key if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return; std::vector<uint8_t> data = session_client_->retrieve(sid_gid); if (!data.empty()) { uuid::uuid sid, uid, did; Loading @@ -816,8 +779,6 @@ namespace authdb { if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; auto data = session_client_->retrieve(sgid); if (!data.empty()) return true; } catch (...) {} Loading @@ -834,8 +795,6 @@ namespace authdb { // No local store — retrieve via erasure coding uint64_t sid_gid = sidGroupId(session_id); if (session_client_) { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; auto data = session_client_->retrieve(sid_gid); if (!data.empty()) { return SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results); Loading @@ -849,8 +808,6 @@ namespace authdb { if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return false; auto data = session_client_->retrieve(sid_gid); if (!data.empty()) return true; } catch (...) {} Loading @@ -871,11 +828,6 @@ namespace authdb { if (session_client_) { try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) { std::cerr << "[CLUSTER-SESSION] fetchBySid: session_client_mutex_ timeout" << std::endl; return false; } auto data = session_client_->retrieve(sid_gid); std::cerr << "[CLUSTER-SESSION] fetchBySid: retrieved " << data.size() << " bytes" << std::endl; if (!data.empty()) { Loading Loading @@ -904,8 +856,6 @@ namespace authdb { std::set<uint64_t> all_groups; try { std::unique_lock<std::timed_mutex> lock(session_client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) return; size_t n = session_client_->get_cluster_status().nodes_total; for (size_t i = 0; i < n; i++) { std::vector<uint64_t> node_groups; Loading Loading @@ -1063,7 +1013,6 @@ namespace authdb { // Distribute erasure-coded shards to peer nodes try { std::unique_lock<std::timed_mutex> lock(client_mutex_); pclient_->store(localId, data.data(), data.size()); } catch (const netplus::NetException &e) { std::cerr << "Cluster replicate failed: " << e.what() << std::endl; Loading