Loading src/cluster.cpp +16 −125 Original line number Diff line number Diff line Loading @@ -231,25 +231,6 @@ namespace authdb { return k; } uint64_t sessionGroupId(const uuid::uuid &uid, const uuid::uuid &did) { uint64_t ckey = simpleCacheKey(uid.value, 16, did.value, 16); { std::lock_guard<std::mutex> g(g_sgidCacheMtx); auto it = g_sgidCache.find(ckey); if (it != g_sgidCache.end()) return it->second; } uint8_t input[32]; std::memcpy(input, uid.value, 16); std::memcpy(input + 16, did.value, 16); uint64_t id = hashUuids(input, 32); { std::lock_guard<std::mutex> g(g_sgidCacheMtx); g_sgidCache[ckey] = id; } return id; } uint64_t sidGroupId(const uuid::uuid &sid) { static const uint8_t prefix = 0xFF; uint64_t ckey = simpleCacheKey(&prefix, 1, sid.value, 16); Loading Loading @@ -573,52 +554,35 @@ namespace authdb { auto data = SessionBlock::serialize(sess); uuid::uuid uid, did, sid; uuid::uuid sid; sess.getSid(sid); sess.getUid(uid); sess.getDid(did); uint64_t sgid = sessionGroupId(uid, did); uint64_t sid_gid = sidGroupId(sid); std::cerr << "[CLUSTER-SESSION] pushSession: sid=" << sid.c_str() << " sgid=" << sgid << " sid_gid=" << sid_gid << " sid_gid=" << sid_gid << " data_size=" << data.size() << std::endl; // Store each key — client is internally thread-safe auto store_one = [&](uint64_t gid, const char *label) { for (int attempt = 0; attempt < 3; ++attempt) { try { session_client_->store(gid, data.data(), data.size()); return true; session_client_->store(sid_gid, data.data(), data.size()); return; } catch (const std::exception &e) { std::cerr << "[CLUSTER-SESSION] pushSession " << label << ": store attempt " std::cerr << "[CLUSTER-SESSION] pushSession: store attempt " << attempt + 1 << "/3 FAILED: " << e.what() << std::endl; if (attempt < 2) { if (attempt < 2) std::this_thread::sleep_for(std::chrono::milliseconds(200)); continue; } } } return false; }; if (!store_one(sgid, "sgid")) throw AuthBackendError("cluster busy — session push (sgid) failed after 3 retries"); if (!store_one(sid_gid, "sid_gid")) throw AuthBackendError("cluster busy — session push (sid_gid) failed after 3 retries"); throw AuthBackendError("cluster busy — session push failed after 3 retries"); } void Cluster::queuePush(const SessionData &sess) { if (!pclient_) return; if (!session_client_) return; auto data = SessionBlock::serialize(sess); uuid::uuid uid, did, sid; uuid::uuid sid; sess.getSid(sid); sess.getUid(uid); sess.getDid(did); uint64_t sgid = sessionGroupId(uid, did); uint64_t sid_gid = sidGroupId(sid); PendingPush pp{data, sgid, sid_gid, 0}; PendingPush pp{data, sid_gid, 0}; std::lock_guard<std::mutex> pguard(push_mutex_); push_queue_.push_back(std::move(pp)); push_cv_.notify_one(); Loading Loading @@ -685,7 +649,6 @@ namespace authdb { if (have_sess) { sess_future = std::async(std::launch::async, [this, item = std::move(sess_item)]() mutable { try { session_client_->store(item.sgid, item.data.data(), item.data.size()); session_client_->store(item.sid_gid, item.data.data(), item.data.size()); } catch (const std::exception &e) { item.retries++; Loading @@ -706,87 +669,15 @@ namespace authdb { } } bool Cluster::fetchSession(const uuid::uuid &uid, const uuid::uuid &did, uuid::uuid &sid, std::vector<uuid::uuid> &members, std::string &username, std::vector<std::pair<uuid::uuid, bool>> &gpo_results) { uint64_t sgid = sessionGroupId(uid, did); std::vector<uint8_t> data; if (session_client_) { try { data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid d_uid, d_did; if (SessionBlock::deserialize(data, sid, d_uid, d_did, members, username, gpo_results)) { return true; } } } catch (...) { return false; } } return false; } void Cluster::removeSession(const uuid::uuid &uid, const uuid::uuid &did) { uint64_t sgid = sessionGroupId(uid, did); // Fetch session to get the SID for removing the SID-keyed entry if (session_client_) { try { std::vector<uint8_t> data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid sid, d_uid, d_did; std::vector<uuid::uuid> members; std::string username; std::vector<std::pair<uuid::uuid, bool>> gpo_results; if (SessionBlock::deserialize(data, sid, d_uid, d_did, members, username, gpo_results)) { uint64_t sid_gid = sidGroupId(sid); session_client_->remove(sid_gid); } } session_client_->remove(sgid); } catch (...) {} } } void Cluster::removeSessionBySid(const uuid::uuid &session_id) { uint64_t sid_gid = sidGroupId(session_id); // Fetch session data to get uid+did key if (session_client_) { try { std::vector<uint8_t> data = session_client_->retrieve(sid_gid); if (!data.empty()) { uuid::uuid sid, uid, did; std::vector<uuid::uuid> members; std::string username; std::vector<std::pair<uuid::uuid, bool>> gpo_results; if (SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results)) { uint64_t sgid = sessionGroupId(uid, did); session_client_->remove(sgid); } } session_client_->remove(sid_gid); } catch (...) {} } } bool Cluster::hasSession(const uuid::uuid &uid, const uuid::uuid &did) { uint64_t sgid = sessionGroupId(uid, did); if (session_client_) { try { auto data = session_client_->retrieve(sgid); if (!data.empty()) return true; } catch (...) {} } return false; } bool Cluster::fetchSessionLocalBySid(const uuid::uuid &session_id, uuid::uuid &sid, uuid::uuid &uid, uuid::uuid &did, std::vector<uuid::uuid> &members, Loading src/cluster.h +0 −15 Original line number Diff line number Diff line Loading @@ -101,7 +101,6 @@ namespace authdb { std::vector<std::pair<uuid::uuid, bool>> &gpo_results); }; uint64_t sessionGroupId(const uuid::uuid &uid, const uuid::uuid &did); uint64_t sidGroupId(const uuid::uuid &sid); // --- Data operation replication --- Loading Loading @@ -163,22 +162,9 @@ namespace authdb { // Queue a backend data push for background replication void queueDataPush(uint64_t dgid, std::vector<uint8_t> buf); // Try to fetch a session from the cluster by uid+did // Returns true if found, fills out parameters bool fetchSession(const uuid::uuid &uid, const uuid::uuid &did, uuid::uuid &sid, std::vector<uuid::uuid> &members, std::string &username, std::vector<std::pair<uuid::uuid, bool>> &gpo_results); // Remove a session from the cluster void removeSession(const uuid::uuid &uid, const uuid::uuid &did); // Remove a session from the cluster by SID void removeSessionBySid(const uuid::uuid &session_id); // Check if a session already exists in the cluster for uid+did bool hasSession(const uuid::uuid &uid, const uuid::uuid &did); // Check if a session still exists in the cluster by SID bool hasSessionBySid(const uuid::uuid &session_id); Loading Loading @@ -232,7 +218,6 @@ namespace authdb { struct PendingPush { std::vector<uint8_t> data; uint64_t sgid; uint64_t sid_gid; int retries{0}; }; Loading src/session.cpp +0 −24 Original line number Diff line number Diff line Loading @@ -428,30 +428,6 @@ const authdb::SessionData *authdb::ClusterSession::addSession(AuthBackend &backe if (g_Cluster->isCritical()) throw AuthBackendError("cluster critical — too few nodes online, please try again later"); // Check cluster for existing session (same user+domain) { uuid::uuid cluster_sid; std::vector<uuid::uuid> cluster_members; std::string cluster_username; std::vector<std::pair<uuid::uuid, bool>> cluster_gpo; if (g_Cluster->fetchSession(userid, domainid, cluster_sid, cluster_members, cluster_username, cluster_gpo)) { std::cerr << "[SESSION] addSession: found existing session sid=" << cluster_sid.c_str() << " via uid+did lookup, ensuring sid_gid entry exists" << std::endl; SessionData *imported = new SessionData(cluster_sid, userid, domainid, cluster_members); imported->_username = cluster_username; imported->setGPOResults(cluster_gpo); // Ensure sid_gid entry exists — queue async to avoid blocking on dead nodes g_Cluster->queuePush(*imported); std::lock_guard<std::mutex> guard(_lock); removeLocal(cluster_sid); appendLocal(imported); return imported; } } // Create new session std::cerr << "[SESSION] addSession: calling createSessionData..." << std::endl; SessionData *newSession = nullptr; Loading Loading
src/cluster.cpp +16 −125 Original line number Diff line number Diff line Loading @@ -231,25 +231,6 @@ namespace authdb { return k; } uint64_t sessionGroupId(const uuid::uuid &uid, const uuid::uuid &did) { uint64_t ckey = simpleCacheKey(uid.value, 16, did.value, 16); { std::lock_guard<std::mutex> g(g_sgidCacheMtx); auto it = g_sgidCache.find(ckey); if (it != g_sgidCache.end()) return it->second; } uint8_t input[32]; std::memcpy(input, uid.value, 16); std::memcpy(input + 16, did.value, 16); uint64_t id = hashUuids(input, 32); { std::lock_guard<std::mutex> g(g_sgidCacheMtx); g_sgidCache[ckey] = id; } return id; } uint64_t sidGroupId(const uuid::uuid &sid) { static const uint8_t prefix = 0xFF; uint64_t ckey = simpleCacheKey(&prefix, 1, sid.value, 16); Loading Loading @@ -573,52 +554,35 @@ namespace authdb { auto data = SessionBlock::serialize(sess); uuid::uuid uid, did, sid; uuid::uuid sid; sess.getSid(sid); sess.getUid(uid); sess.getDid(did); uint64_t sgid = sessionGroupId(uid, did); uint64_t sid_gid = sidGroupId(sid); std::cerr << "[CLUSTER-SESSION] pushSession: sid=" << sid.c_str() << " sgid=" << sgid << " sid_gid=" << sid_gid << " sid_gid=" << sid_gid << " data_size=" << data.size() << std::endl; // Store each key — client is internally thread-safe auto store_one = [&](uint64_t gid, const char *label) { for (int attempt = 0; attempt < 3; ++attempt) { try { session_client_->store(gid, data.data(), data.size()); return true; session_client_->store(sid_gid, data.data(), data.size()); return; } catch (const std::exception &e) { std::cerr << "[CLUSTER-SESSION] pushSession " << label << ": store attempt " std::cerr << "[CLUSTER-SESSION] pushSession: store attempt " << attempt + 1 << "/3 FAILED: " << e.what() << std::endl; if (attempt < 2) { if (attempt < 2) std::this_thread::sleep_for(std::chrono::milliseconds(200)); continue; } } } return false; }; if (!store_one(sgid, "sgid")) throw AuthBackendError("cluster busy — session push (sgid) failed after 3 retries"); if (!store_one(sid_gid, "sid_gid")) throw AuthBackendError("cluster busy — session push (sid_gid) failed after 3 retries"); throw AuthBackendError("cluster busy — session push failed after 3 retries"); } void Cluster::queuePush(const SessionData &sess) { if (!pclient_) return; if (!session_client_) return; auto data = SessionBlock::serialize(sess); uuid::uuid uid, did, sid; uuid::uuid sid; sess.getSid(sid); sess.getUid(uid); sess.getDid(did); uint64_t sgid = sessionGroupId(uid, did); uint64_t sid_gid = sidGroupId(sid); PendingPush pp{data, sgid, sid_gid, 0}; PendingPush pp{data, sid_gid, 0}; std::lock_guard<std::mutex> pguard(push_mutex_); push_queue_.push_back(std::move(pp)); push_cv_.notify_one(); Loading Loading @@ -685,7 +649,6 @@ namespace authdb { if (have_sess) { sess_future = std::async(std::launch::async, [this, item = std::move(sess_item)]() mutable { try { session_client_->store(item.sgid, item.data.data(), item.data.size()); session_client_->store(item.sid_gid, item.data.data(), item.data.size()); } catch (const std::exception &e) { item.retries++; Loading @@ -706,87 +669,15 @@ namespace authdb { } } bool Cluster::fetchSession(const uuid::uuid &uid, const uuid::uuid &did, uuid::uuid &sid, std::vector<uuid::uuid> &members, std::string &username, std::vector<std::pair<uuid::uuid, bool>> &gpo_results) { uint64_t sgid = sessionGroupId(uid, did); std::vector<uint8_t> data; if (session_client_) { try { data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid d_uid, d_did; if (SessionBlock::deserialize(data, sid, d_uid, d_did, members, username, gpo_results)) { return true; } } } catch (...) { return false; } } return false; } void Cluster::removeSession(const uuid::uuid &uid, const uuid::uuid &did) { uint64_t sgid = sessionGroupId(uid, did); // Fetch session to get the SID for removing the SID-keyed entry if (session_client_) { try { std::vector<uint8_t> data = session_client_->retrieve(sgid); if (!data.empty()) { uuid::uuid sid, d_uid, d_did; std::vector<uuid::uuid> members; std::string username; std::vector<std::pair<uuid::uuid, bool>> gpo_results; if (SessionBlock::deserialize(data, sid, d_uid, d_did, members, username, gpo_results)) { uint64_t sid_gid = sidGroupId(sid); session_client_->remove(sid_gid); } } session_client_->remove(sgid); } catch (...) {} } } void Cluster::removeSessionBySid(const uuid::uuid &session_id) { uint64_t sid_gid = sidGroupId(session_id); // Fetch session data to get uid+did key if (session_client_) { try { std::vector<uint8_t> data = session_client_->retrieve(sid_gid); if (!data.empty()) { uuid::uuid sid, uid, did; std::vector<uuid::uuid> members; std::string username; std::vector<std::pair<uuid::uuid, bool>> gpo_results; if (SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results)) { uint64_t sgid = sessionGroupId(uid, did); session_client_->remove(sgid); } } session_client_->remove(sid_gid); } catch (...) {} } } bool Cluster::hasSession(const uuid::uuid &uid, const uuid::uuid &did) { uint64_t sgid = sessionGroupId(uid, did); if (session_client_) { try { auto data = session_client_->retrieve(sgid); if (!data.empty()) return true; } catch (...) {} } return false; } bool Cluster::fetchSessionLocalBySid(const uuid::uuid &session_id, uuid::uuid &sid, uuid::uuid &uid, uuid::uuid &did, std::vector<uuid::uuid> &members, Loading
src/cluster.h +0 −15 Original line number Diff line number Diff line Loading @@ -101,7 +101,6 @@ namespace authdb { std::vector<std::pair<uuid::uuid, bool>> &gpo_results); }; uint64_t sessionGroupId(const uuid::uuid &uid, const uuid::uuid &did); uint64_t sidGroupId(const uuid::uuid &sid); // --- Data operation replication --- Loading Loading @@ -163,22 +162,9 @@ namespace authdb { // Queue a backend data push for background replication void queueDataPush(uint64_t dgid, std::vector<uint8_t> buf); // Try to fetch a session from the cluster by uid+did // Returns true if found, fills out parameters bool fetchSession(const uuid::uuid &uid, const uuid::uuid &did, uuid::uuid &sid, std::vector<uuid::uuid> &members, std::string &username, std::vector<std::pair<uuid::uuid, bool>> &gpo_results); // Remove a session from the cluster void removeSession(const uuid::uuid &uid, const uuid::uuid &did); // Remove a session from the cluster by SID void removeSessionBySid(const uuid::uuid &session_id); // Check if a session already exists in the cluster for uid+did bool hasSession(const uuid::uuid &uid, const uuid::uuid &did); // Check if a session still exists in the cluster by SID bool hasSessionBySid(const uuid::uuid &session_id); Loading Loading @@ -232,7 +218,6 @@ namespace authdb { struct PendingPush { std::vector<uint8_t> data; uint64_t sgid; uint64_t sid_gid; int retries{0}; }; Loading
src/session.cpp +0 −24 Original line number Diff line number Diff line Loading @@ -428,30 +428,6 @@ const authdb::SessionData *authdb::ClusterSession::addSession(AuthBackend &backe if (g_Cluster->isCritical()) throw AuthBackendError("cluster critical — too few nodes online, please try again later"); // Check cluster for existing session (same user+domain) { uuid::uuid cluster_sid; std::vector<uuid::uuid> cluster_members; std::string cluster_username; std::vector<std::pair<uuid::uuid, bool>> cluster_gpo; if (g_Cluster->fetchSession(userid, domainid, cluster_sid, cluster_members, cluster_username, cluster_gpo)) { std::cerr << "[SESSION] addSession: found existing session sid=" << cluster_sid.c_str() << " via uid+did lookup, ensuring sid_gid entry exists" << std::endl; SessionData *imported = new SessionData(cluster_sid, userid, domainid, cluster_members); imported->_username = cluster_username; imported->setGPOResults(cluster_gpo); // Ensure sid_gid entry exists — queue async to avoid blocking on dead nodes g_Cluster->queuePush(*imported); std::lock_guard<std::mutex> guard(_lock); removeLocal(cluster_sid); appendLocal(imported); return imported; } } // Create new session std::cerr << "[SESSION] addSession: calling createSessionData..." << std::endl; SessionData *newSession = nullptr; Loading