Commit 3f08b3ed authored by jan.koester's avatar jan.koester
Browse files

sppedup

parent c0eb970c
Loading
Loading
Loading
Loading
Loading
+259 −76
Original line number Diff line number Diff line
@@ -83,6 +83,126 @@ namespace authdb {
        return true;
    }

    static uint64_t fnv1a_hash(const std::vector<uint8_t> &data) {
        uint64_t hash = 14695981039346656037ULL;
        for (auto b : data) {
            hash ^= b;
            hash *= 1099511628211ULL;
        }
        return hash;
    }

    static constexpr uint32_t MANIFEST_MAGIC = 0x454E5459; // "ENTY"

    uint64_t ClusterBackend::entityGroupId(const EntityKey &ek) const {
        std::vector<uint8_t> input(_Domain.begin(), _Domain.end());
        input.insert(input.end(), ek.ruid, ek.ruid + 16);
        int32_t t = ek.type;
        input.insert(input.end(),
                     reinterpret_cast<uint8_t*>(&t),
                     reinterpret_cast<uint8_t*>(&t) + sizeof(int32_t));
        auto hash = netplus::sha256_hash(input);
        uint64_t id = 0;
        for (int i = 0; i < 8; i++)
            id = (id << 8) | hash[i];
        return id;
    }

    std::map<ClusterBackend::EntityKey, std::vector<uint8_t>>
    ClusterBackend::parseEntities() const {
        std::map<EntityKey, std::vector<uint8_t>> result;

        size_t rd = sizeof(AuthHeader);
        while (rd + sizeof(AuthData::Record) <= _Buffer.size()) {
            AuthData::Record rec;
            std::memcpy(&rec, _Buffer.data() + rd, sizeof(AuthData::Record));
            size_t record_start = rd;
            rd += sizeof(AuthData::Record);

            if (rec.type == DataType::EmptyData) {
                rd += rec.datasize;
                continue;
            }

            if (rd + rec.datasize > _Buffer.size())
                break;

            EntityKey ek;
            std::memcpy(ek.ruid, rec.ruid, 16);
            ek.type = rec.type;

            size_t total = sizeof(AuthData::Record) + rec.datasize;
            auto &entity_buf = result[ek];
            size_t old_size = entity_buf.size();
            entity_buf.resize(old_size + total);
            std::memcpy(entity_buf.data() + old_size, _Buffer.data() + record_start, total);

            rd += rec.datasize;
        }

        return result;
    }

    void ClusterBackend::pushManifest(const std::set<EntityKey> &keys) {
        AuthHeader head;
        if (_Buffer.size() >= sizeof(AuthHeader))
            std::memcpy(&head, _Buffer.data(), sizeof(AuthHeader));

        uint32_t magic = MANIFEST_MAGIC;
        uint32_t count = static_cast<uint32_t>(keys.size());

        std::vector<uint8_t> manifest;
        manifest.resize(sizeof(AuthHeader) + sizeof(uint32_t) + sizeof(uint32_t));
        size_t off = 0;
        std::memcpy(manifest.data() + off, &head, sizeof(AuthHeader)); off += sizeof(AuthHeader);
        std::memcpy(manifest.data() + off, &magic, sizeof(uint32_t));  off += sizeof(uint32_t);
        std::memcpy(manifest.data() + off, &count, sizeof(uint32_t));  off += sizeof(uint32_t);

        for (auto &ek : keys) {
            size_t pos = manifest.size();
            manifest.resize(pos + 16 + sizeof(int32_t));
            std::memcpy(manifest.data() + pos, ek.ruid, 16);
            int32_t t = ek.type;
            std::memcpy(manifest.data() + pos + 16, &t, sizeof(int32_t));
        }

        g_Cluster->queueDataPush(domainGroupId(), std::move(manifest));
    }

    void ClusterBackend::pushManifestSync(const std::set<EntityKey> &keys) {
        if (!g_Cluster || !g_Cluster->isRunning()) return;
        auto &client = g_Cluster->getClient();
        if (!client) return;

        AuthHeader head;
        if (_Buffer.size() >= sizeof(AuthHeader))
            std::memcpy(&head, _Buffer.data(), sizeof(AuthHeader));

        uint32_t magic = MANIFEST_MAGIC;
        uint32_t count = static_cast<uint32_t>(keys.size());

        std::vector<uint8_t> manifest;
        manifest.resize(sizeof(AuthHeader) + sizeof(uint32_t) + sizeof(uint32_t));
        size_t off = 0;
        std::memcpy(manifest.data() + off, &head, sizeof(AuthHeader)); off += sizeof(AuthHeader);
        std::memcpy(manifest.data() + off, &magic, sizeof(uint32_t));  off += sizeof(uint32_t);
        std::memcpy(manifest.data() + off, &count, sizeof(uint32_t));  off += sizeof(uint32_t);

        for (auto &ek : keys) {
            size_t pos = manifest.size();
            manifest.resize(pos + 16 + sizeof(int32_t));
            std::memcpy(manifest.data() + pos, ek.ruid, 16);
            int32_t t = ek.type;
            std::memcpy(manifest.data() + pos + 16, &t, sizeof(int32_t));
        }

        try {
            client->store(domainGroupId(), manifest.data(), manifest.size());
        } catch (const std::exception &e) {
            std::cerr << "ClusterBackend::pushManifestSync failed: " << e.what() << std::endl;
        }
    }

    void ClusterBackend::fetchFromCluster() {
        if (!g_Cluster || !g_Cluster->isRunning())
            return;
@@ -91,12 +211,9 @@ namespace authdb {
        DBG_LOG("[CLUSTER-BE] fetchFromCluster domain=" << _Domain
                  << " buf_size=" << _Buffer.size() << " dirty=" << _Dirty.load() << "\n");

        // Only fetch from peers every 30 seconds — auth data changes rarely
        // Check BEFORE acquiring the mutex to avoid contention with session ops
        auto now = std::chrono::steady_clock::now();
        bool do_peer_fetch = (now - _LastPeerFetch) >= std::chrono::seconds(30);

        // Force immediate fetch if cluster recovered (new nodes available)
        uint64_t cur_epoch = g_Cluster->getRecoveryEpoch();
        if (cur_epoch != _lastRecoveryEpoch) {
            do_peer_fetch = true;
@@ -105,68 +222,114 @@ namespace authdb {
        }

        if (!do_peer_fetch)
            return;  // Nothing to do
            return;

        std::vector<uint8_t> peer_data;
        auto &client = g_Cluster->getClient();
        if (client) {
        if (!client) return;

        // 1. Fetch manifest from domain group
        std::vector<uint8_t> manifest;
        try {
                peer_data = client->retrieve(dgid);
            manifest = client->retrieve(dgid);
            _LastPeerFetch = now;
        } catch (const std::exception &e) {
                std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed: "
            std::cerr << "ClusterBackend::fetchFromCluster: manifest retrieve failed: "
                      << e.what() << std::endl;
                // Mark data as existing when the error indicates shards are present
                // but not enough peers are available
            _ClusterDataExists.store(true);
            return;
        } catch (...) {
                std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed (unknown)" << std::endl;
            _ClusterDataExists.store(true);
            }
            return;
        }

        if (!bufferValid(peer_data))
            peer_data.clear();

        size_t peer_rev = bufferRevision(peer_data);
        size_t cur_rev  = bufferRevision(_Buffer);
        if (manifest.size() < sizeof(AuthHeader) + sizeof(uint32_t) + sizeof(uint32_t)) {
            if (!manifest.empty())
                _ClusterDataExists.store(true);
            return;
        }

        bool peer_has_records = (!peer_data.empty() && peer_data.size() > sizeof(AuthHeader));
        // 2. Parse header and verify magic
        AuthHeader head;
        std::memcpy(&head, manifest.data(), sizeof(AuthHeader));

        if (!peer_has_records)
            return;
        size_t moff = sizeof(AuthHeader);
        uint32_t magic = 0;
        std::memcpy(&magic, manifest.data() + moff, sizeof(uint32_t));
        moff += sizeof(uint32_t);

        // Ensure _Buffer has at least a valid header as merge base
        if (_Buffer.size() < sizeof(AuthHeader)) {
            if (peer_data.size() >= sizeof(AuthHeader))
                _Buffer.assign(peer_data.begin(),
                               peer_data.begin() + sizeof(AuthHeader));
            else
        if (magic != MANIFEST_MAGIC) {
            // Old format or corrupted — needs re-import
            _ClusterDataExists.store(true);
            std::cerr << "[CLUSTER-BE] domain=" << _Domain
                      << " manifest has wrong magic (legacy format?), needs re-import" << std::endl;
            return;
        }

        // Append records from peer data into _Buffer.
        // vacuum() will deduplicate afterwards.
        _Buffer.insert(_Buffer.end(),
                       peer_data.begin() + sizeof(AuthHeader),
                       peer_data.end());
        uint32_t entity_count = 0;
        std::memcpy(&entity_count, manifest.data() + moff, sizeof(uint32_t));
        moff += sizeof(uint32_t);

        size_t max_rev = std::max(cur_rev, peer_rev);
        AuthHeader head;
        std::memcpy(&head, _Buffer.data(), sizeof(AuthHeader));
        head.Revesion = max_rev;
        // 3. Parse entity keys
        std::vector<EntityKey> keys;
        keys.reserve(entity_count);
        for (uint32_t i = 0; i < entity_count; i++) {
            if (moff + 16 + sizeof(int32_t) > manifest.size()) break;
            EntityKey ek;
            std::memcpy(ek.ruid, manifest.data() + moff, 16);
            moff += 16;
            int32_t t = 0;
            std::memcpy(&t, manifest.data() + moff, sizeof(int32_t));
            moff += sizeof(int32_t);
            ek.type = t;
            keys.push_back(ek);
        }

        // 4. Fetch each entity and rebuild _Buffer
        _Buffer.resize(sizeof(AuthHeader));
        std::memcpy(_Buffer.data(), &head, sizeof(AuthHeader));
        entity_hashes_.clear();

        for (auto &ek : keys) {
            uint64_t egid = entityGroupId(ek);
            try {
                auto entity_data = client->retrieve(egid);
                if (!entity_data.empty()) {
                    _Buffer.insert(_Buffer.end(), entity_data.begin(), entity_data.end());
                    entity_hashes_[ek] = fnv1a_hash(entity_data);
                }
            } catch (const std::exception &e) {
                std::cerr << "[CLUSTER-BE] failed to fetch entity: " << e.what() << std::endl;
            }
        }

        vacuum();
        _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader));
        DBG_LOG("[CLUSTER-BE] fetchFromCluster rebuilt buf_size=" << _Buffer.size()
                  << " entities=" << keys.size() << "\n");
    }

    void ClusterBackend::pushToCluster() {
        // Legacy entry point for destructor
        // Sync push for destructor — push all dirty entities directly
        if (!_Dirty) return;
        newRevesion();
        uint64_t dgid = domainGroupId();
        std::vector<uint8_t> buf_copy = _Buffer;
        if (pushToClusterAsync(dgid, buf_copy))

        if (!g_Cluster || !g_Cluster->isRunning()) return;
        auto &client = g_Cluster->getClient();
        if (!client) return;

        auto entities = parseEntities();
        std::set<EntityKey> keys;

        for (auto &[ek, data] : entities) {
            keys.insert(ek);
            try {
                client->store(entityGroupId(ek), data.data(), data.size());
            } catch (const std::exception &e) {
                std::cerr << "ClusterBackend::pushToCluster: entity push failed: "
                          << e.what() << std::endl;
            }
        }

        pushManifestSync(keys);
        _Dirty = false;
    }

@@ -284,31 +447,44 @@ namespace authdb {
                  << " buf_size=" << _Buffer.size() << " dirty=" << _Dirty.load() << "\n");
        if (_Dirty) {
            newRevesion();
            uint64_t dgid = domainGroupId();
            // Queue for background push via the single worker thread

            if (g_Cluster && g_Cluster->isRunning()) {
                g_Cluster->queueDataPush(dgid, _Buffer);
            }
            _Dirty = false;
                auto entities = parseEntities();
                std::set<EntityKey> current_keys;
                bool manifest_changed = false;

                for (auto &[ek, data] : entities) {
                    current_keys.insert(ek);
                    uint64_t hash = fnv1a_hash(data);
                    auto it = entity_hashes_.find(ek);
                    if (it == entity_hashes_.end()) {
                        // New entity
                        manifest_changed = true;
                        g_Cluster->queueDataPush(entityGroupId(ek), std::move(data));
                        entity_hashes_[ek] = hash;
                    } else if (it->second != hash) {
                        // Changed entity
                        g_Cluster->queueDataPush(entityGroupId(ek), std::move(data));
                        it->second = hash;
                    }
                }

    bool ClusterBackend::pushToClusterAsync(uint64_t dgid, const std::vector<uint8_t> &buf) {
        if (!g_Cluster || !g_Cluster->isRunning())
            return false;
                // Detect deleted entities
                for (auto it = entity_hashes_.begin(); it != entity_hashes_.end(); ) {
                    if (current_keys.find(it->first) == current_keys.end()) {
                        g_Cluster->queueDataRemove(entityGroupId(it->first));
                        it = entity_hashes_.erase(it);
                        manifest_changed = true;
                    } else {
                        ++it;
                    }
                }

        auto &client = g_Cluster->getClient();
        if (client) {
            try {
                client->store(dgid, buf.data(), buf.size());
                return true;
            } catch (const netplus::NetException &e) {
                std::cerr << "Cluster replicate failed: " << e.what() << std::endl;
            } catch (const std::exception &e) {
                std::cerr << "Cluster replicate failed: " << e.what() << std::endl;
                if (manifest_changed)
                    pushManifest(current_keys);
            }
            _Dirty = false;
        }
        return false;
    }

    void ClusterBackend::setPos(size_t pos) {
@@ -348,18 +524,25 @@ namespace authdb {
        if (!g_Cluster || !g_Cluster->isRunning())
            return;

        uint64_t dgid = domainGroupId();

        // Remove from peer nodes
        auto &client = g_Cluster->getClient();
        if (client) {
            // Remove all entity groups
            for (auto &[ek, _] : entity_hashes_) {
                try {
                    client->remove(entityGroupId(ek));
                } catch (const std::exception &e) {
                    std::cerr << "ClusterBackend::purge: entity remove failed: " << e.what() << std::endl;
                }
            }
            // Remove manifest
            try {
                client->remove(dgid);
                client->remove(domainGroupId());
            } catch (const std::exception &e) {
                std::cerr << "ClusterBackend::purge: peer remove failed: " << e.what() << std::endl;
                std::cerr << "ClusterBackend::purge: manifest remove failed: " << e.what() << std::endl;
            }
        }

        entity_hashes_.clear();
        _Buffer.clear();
        _Dirty = false;
    }
+21 −1
Original line number Diff line number Diff line
@@ -32,6 +32,8 @@
#include <atomic>
#include <chrono>
#include <mutex>
#include <map>
#include <set>

#include "../backend.h"
#include "../authdb.h"
@@ -72,10 +74,27 @@ namespace authdb {
        void forceRefresh() { _LastPeerFetch = {}; }

    private:
        struct EntityKey {
            unsigned char ruid[16] = {0};
            int type = 0;
            bool operator<(const EntityKey &o) const {
                int cmp = std::memcmp(ruid, o.ruid, 16);
                if (cmp != 0) return cmp < 0;
                return type < o.type;
            }
            bool operator==(const EntityKey &o) const {
                return std::memcmp(ruid, o.ruid, 16) == 0 && type == o.type;
            }
        };

        uint64_t domainGroupId() const;
        uint64_t entityGroupId(const EntityKey &ek) const;
        void fetchFromCluster();
        void pushToCluster();
        bool pushToClusterAsync(uint64_t dgid, const std::vector<uint8_t> &buf);

        std::map<EntityKey, std::vector<uint8_t>> parseEntities() const;
        void pushManifest(const std::set<EntityKey> &keys);
        void pushManifestSync(const std::set<EntityKey> &keys);

        std::string              _Domain;
        std::vector<uint8_t>     _Buffer;
@@ -85,5 +104,6 @@ namespace authdb {
        std::chrono::steady_clock::time_point _LastPeerFetch{};
        uint64_t _lastRecoveryEpoch{0};
        std::mutex _FetchMutex;  // serialize per-domain cluster fetches
        std::map<EntityKey, uint64_t> entity_hashes_; // per-entity content hashes
    };
};
+27 −6
Original line number Diff line number Diff line
@@ -723,15 +723,31 @@ namespace authdb {

    void Cluster::queueDataPush(uint64_t dgid, std::vector<uint8_t> buf) {
        std::lock_guard<std::mutex> pguard(push_mutex_);
        // Replace any pending push for the same domain group (only latest matters)
        // Replace any pending push/remove for the same group (only latest matters)
        for (auto &dp : data_push_queue_) {
            if (dp.dgid == dgid) {
                dp.buf = std::move(buf);
                dp.is_remove = false;
                push_cv_.notify_one();
                return;
            }
        }
        data_push_queue_.push_back({dgid, std::move(buf)});
        data_push_queue_.push_back({dgid, std::move(buf), false});
        push_cv_.notify_one();
    }

    void Cluster::queueDataRemove(uint64_t dgid) {
        std::lock_guard<std::mutex> pguard(push_mutex_);
        // Replace any pending push for this group with a remove
        for (auto &dp : data_push_queue_) {
            if (dp.dgid == dgid) {
                dp.buf.clear();
                dp.is_remove = true;
                push_cv_.notify_one();
                return;
            }
        }
        data_push_queue_.push_back({dgid, {}, true});
        push_cv_.notify_one();
    }

@@ -761,16 +777,21 @@ namespace authdb {

            lk.unlock();

            // Push all data items sequentially (each is already a full domain buffer)
            // Push all data items sequentially (each is a per-entity payload or remove)
            for (auto &dp : data_batch) {
                try {
                    if (dp.is_remove)
                        pclient_->remove(dp.dgid);
                    else
                        pclient_->store(dp.dgid, dp.buf.data(), dp.buf.size());
                } catch (const std::exception &e) {
                    std::cerr << "[CLUSTER] push_worker: data push failed: " << e.what() << std::endl;
                    if (!dp.is_remove) {
                        std::lock_guard<std::mutex> pguard(push_mutex_);
                        data_push_queue_.push_back(std::move(dp));
                    }
                }
            }

            // Push all sessions sequentially (each needs its own group_id for lookup)
            for (auto &item : sess_batch) {
+4 −0
Original line number Diff line number Diff line
@@ -168,6 +168,9 @@ namespace authdb {
        // Queue a backend data push for background replication
        void queueDataPush(uint64_t dgid, std::vector<uint8_t> buf);

        // Queue a data remove for background processing
        void queueDataRemove(uint64_t dgid);

        // Remove a session from the cluster by SID
        void removeSessionBySid(const uuid::uuid &session_id);

@@ -257,6 +260,7 @@ namespace authdb {
        struct PendingDataPush {
            uint64_t dgid;
            std::vector<uint8_t> buf;
            bool is_remove{false};
        };
        std::deque<PendingDataPush> data_push_queue_;