Commit 99ad3689 authored by jan.koester's avatar jan.koester
Browse files

test

parent f9268141
Loading
Loading
Loading
Loading
Loading
+27 −24
Original line number Diff line number Diff line
@@ -762,6 +762,13 @@ namespace authdb {
            // Don't try to push while critical
            if (critical_) continue;

            // Drain ALL pending session pushes FIRST (more time-critical)
            std::vector<PendingPush> sess_batch;
            while (!push_queue_.empty()) {
                sess_batch.push_back(std::move(push_queue_.front()));
                push_queue_.pop_front();
            }

            // Drain ALL pending data pushes
            std::vector<PendingDataPush> data_batch;
            while (!data_push_queue_.empty()) {
@@ -769,32 +776,9 @@ namespace authdb {
                data_push_queue_.pop_front();
            }

            // Drain ALL pending session pushes
            std::vector<PendingPush> sess_batch;
            while (!push_queue_.empty()) {
                sess_batch.push_back(std::move(push_queue_.front()));
                push_queue_.pop_front();
            }

            lk.unlock();

            // Push all data items sequentially (each is a per-entity payload or remove)
            for (auto &dp : data_batch) {
                try {
                    if (dp.is_remove)
                        pclient_->remove(dp.dgid);
                    else
                        pclient_->store(dp.dgid, dp.buf.data(), dp.buf.size());
                } catch (const std::exception &e) {
                    std::cerr << "[CLUSTER] push_worker: data push failed: " << e.what() << std::endl;
                    if (!dp.is_remove) {
                        std::lock_guard<std::mutex> pguard(push_mutex_);
                        data_push_queue_.push_back(std::move(dp));
                    }
                }
            }

            // Push all sessions sequentially (each needs its own group_id for lookup)
            // Push sessions first
            for (auto &item : sess_batch) {
                try {
                    session_client_->store(item.sid_gid, item.data.data(), item.data.size());
@@ -809,6 +793,25 @@ namespace authdb {
                    }
                }
            }

            // Push data items (manifests) — drop after 3 retries to avoid infinite loop
            for (auto &dp : data_batch) {
                try {
                    if (dp.is_remove)
                        pclient_->remove(dp.dgid);
                    else
                        pclient_->store(dp.dgid, dp.buf.data(), dp.buf.size());
                } catch (const std::exception &e) {
                    dp.retries++;
                    if (!dp.is_remove && dp.retries < 3) {
                        std::lock_guard<std::mutex> pguard(push_mutex_);
                        data_push_queue_.push_back(std::move(dp));
                    } else {
                        std::cerr << "[CLUSTER] push_worker: dropped data push after "
                                  << dp.retries << " retries: " << e.what() << std::endl;
                    }
                }
            }
        }
    }

+1 −0
Original line number Diff line number Diff line
@@ -261,6 +261,7 @@ namespace authdb {
            uint64_t dgid;
            std::vector<uint8_t> buf;
            bool is_remove{false};
            int retries{0};
        };
        std::deque<PendingDataPush> data_push_queue_;