Commit 084a6b8f authored by jan.koester's avatar jan.koester
Browse files

test

parent 7f7a2069
Loading
Loading
Loading
Loading
Loading
+34 −18
Original line number Diff line number Diff line
@@ -451,12 +451,23 @@ namespace authdb {
                if (self_idx != SIZE_MAX) {
                    read_client_->set_local_node(self_idx, store_);
                }
                // Dedicated session client — separate connections, no contention with domain fetches
                // Dedicated session write client — used only by push_worker
                session_client_ = std::make_unique<paritypp::client>(
                    cfg_.data_blocks, cfg_.parity_blocks, nodes, pcreds);
                if (self_idx != SIZE_MAX) {
                    session_client_->set_local_node(self_idx, session_store_);
                }
                // Dedicated session read client — used by request threads
                session_read_client_ = std::make_unique<paritypp::client>(
                    cfg_.data_blocks, cfg_.parity_blocks, nodes, pcreds);
                if (self_idx != SIZE_MAX) {
                    session_read_client_->set_local_node(self_idx, session_store_);
                }

                // Pre-establish QUIC connections so first requests don't pay
                // handshake + auth latency (2-3s per node)
                read_client_->warmup();
                session_read_client_->warmup();
            } else {
                std::cerr << "Cluster: no peers configured" << std::endl;
            }
@@ -603,14 +614,19 @@ namespace authdb {
            try {
                auto health = pclient_->get_cluster_status();

                // Propagate dead node info to session_client_
                // Propagate dead node info to session clients and read clients
                if (session_client_) {
                    std::set<size_t> online_nodes;
                    for (const auto &ns : health.node_statuses)
                        online_nodes.insert(ns.first);
                    for (size_t i = 0; i < n; ++i) {
                        if (online_nodes.find(i) == online_nodes.end())
                        if (online_nodes.find(i) == online_nodes.end()) {
                            session_client_->mark_node_dead(i);
                            if (session_read_client_)
                                session_read_client_->mark_node_dead(i);
                            if (read_client_)
                                read_client_->mark_node_dead(i);
                        }
                    }
                }

@@ -838,17 +854,17 @@ namespace authdb {
                std::this_thread::sleep_for(std::chrono::seconds(1));
            if (!running_) break;
            if (critical_) continue;
            if (!session_client_) continue;
            if (!session_read_client_) continue;

            try {
                int64_t now = std::chrono::duration_cast<std::chrono::seconds>(
                    std::chrono::system_clock::now().time_since_epoch()).count();

                std::set<uint64_t> all_groups;
                size_t n = session_client_->get_cluster_status().nodes_total;
                size_t n = session_read_client_->get_cluster_status().nodes_total;
                for (size_t i = 0; i < n; i++) {
                    std::vector<uint64_t> node_groups;
                    if (session_client_->list_groups_on_node(i, node_groups))
                    if (session_read_client_->list_groups_on_node(i, node_groups))
                        all_groups.insert(node_groups.begin(), node_groups.end());
                }

@@ -857,7 +873,7 @@ namespace authdb {
                    // Register this group as a session so the interceptor routes it correctly
                    if (interceptor_) interceptor_->mark_session_group(gid);
                    try {
                        auto data = session_client_->retrieve(gid);
                        auto data = session_read_client_->retrieve(gid);
                        if (data.empty()) continue;
                        uuid::uuid sid, uid, did;
                        std::vector<uuid::uuid> members;
@@ -888,10 +904,10 @@ namespace authdb {
                                          std::vector<uuid::uuid> &members,
                                          std::string &username,
                                          std::vector<std::pair<uuid::uuid, bool>> &gpo_results) {
        // No local store — retrieve via erasure coding
        // Retrieve via dedicated read client — never blocked by push_worker stores
        uint64_t sid_gid = sidGroupId(session_id);
        if (session_client_) {
            auto data = session_client_->retrieve(sid_gid);
        if (session_read_client_) {
            auto data = session_read_client_->retrieve(sid_gid);
            if (!data.empty()) {
                int64_t created_at;
                return SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results, created_at);
@@ -903,9 +919,9 @@ namespace authdb {
    bool Cluster::hasSessionBySid(const uuid::uuid &session_id) {
        uint64_t sid_gid = sidGroupId(session_id);

        if (session_client_) {
        if (session_read_client_) {
            try {
                auto data = session_client_->retrieve(sid_gid);
                auto data = session_read_client_->retrieve(sid_gid);
                if (!data.empty()) return true;
            } catch (...) {}
        }
@@ -920,9 +936,9 @@ namespace authdb {
                                     std::vector<std::pair<uuid::uuid, bool>> &gpo_results) {
        uint64_t sid_gid = sidGroupId(session_id);

        if (session_client_) {
        if (session_read_client_) {
            try {
                auto data = session_client_->retrieve(sid_gid);
                auto data = session_read_client_->retrieve(sid_gid);
                if (!data.empty()) {
                    int64_t created_at;
                    if (SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results, created_at)) {
@@ -941,15 +957,15 @@ namespace authdb {

    void Cluster::listAllSessions(std::vector<SessionInfo> &out) {
        out.clear();
        if (!session_client_) return;
        if (!session_read_client_) return;
        std::set<std::string> seen_sids;

        std::set<uint64_t> all_groups;
        try {
            size_t n = session_client_->get_cluster_status().nodes_total;
            size_t n = session_read_client_->get_cluster_status().nodes_total;
            for (size_t i = 0; i < n; i++) {
                std::vector<uint64_t> node_groups;
                if (session_client_->list_groups_on_node(i, node_groups)) {
                if (session_read_client_->list_groups_on_node(i, node_groups)) {
                    all_groups.insert(node_groups.begin(), node_groups.end());
                }
            }
@@ -957,7 +973,7 @@ namespace authdb {
            // Try to retrieve and deserialize each group as a session
            for (uint64_t gid : all_groups) {
                try {
                    auto data = session_client_->retrieve(gid);
                    auto data = session_read_client_->retrieve(gid);
                    if (data.empty()) continue;
                    SessionInfo info;
                    if (SessionBlock::deserialize(data, info.sid, info.uid, info.did,
+6 −1
Original line number Diff line number Diff line
@@ -159,6 +159,7 @@ namespace authdb {
        paritypp::block_store &getStore() { return *store_; }
        std::unique_ptr<paritypp::client> &getClient() { return pclient_; }
        std::unique_ptr<paritypp::client> &getReadClient() { return read_client_; }
        std::unique_ptr<paritypp::client> &getSessionReadClient() { return session_read_client_; }

        // Push a session to the cluster (after local addSession)
        void pushSession(const SessionData &sess);
@@ -278,9 +279,13 @@ namespace authdb {
        // pclient_ which is used by push_worker (store) and health monitor
        std::unique_ptr<paritypp::client> read_client_;

        // Dedicated client for session ops — avoids contention with domain data fetches
        // Dedicated client for session writes — used by push_worker only
        std::unique_ptr<paritypp::client> session_client_;

        // Dedicated client for session reads — avoids contention with
        // push_worker holding session_client_->mutex_ during store()
        std::unique_ptr<paritypp::client> session_read_client_;

        void server_loop();
        void health_monitor_loop();
        void push_worker_loop();