Commit 315d40f3 authored by jan.koester's avatar jan.koester
Browse files

test

parent fd5a7f8a
Loading
Loading
Loading
Loading
Loading
+67 −43
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
#include <chrono>
#include <map>
#include <tuple>
#include <shared_mutex>

#include <crypto/sha.h>
#include <netplus/exception.h>
@@ -47,6 +48,24 @@

namespace authdb {

    // Static members
    std::shared_mutex ClusterBackend::s_cache_mutex_;
    std::map<std::string, std::shared_ptr<DomainCache>> ClusterBackend::s_domain_cache_;

    std::shared_ptr<DomainCache> ClusterBackend::getDomainCache() {
        {
            std::shared_lock<std::shared_mutex> rl(s_cache_mutex_);
            auto it = s_domain_cache_.find(_Domain);
            if (it != s_domain_cache_.end())
                return it->second;
        }
        std::unique_lock<std::shared_mutex> wl(s_cache_mutex_);
        auto &ptr = s_domain_cache_[_Domain];
        if (!ptr)
            ptr = std::make_shared<DomainCache>();
        return ptr;
    }

    uint64_t ClusterBackend::domainGroupId() const {
        std::vector<uint8_t> input(_Domain.begin(), _Domain.end());
        auto hash = netplus::sha256_hash(input);
@@ -66,10 +85,9 @@ namespace authdb {
    }

    // Validate that all records in buffer have consistent offsets.
    // Returns false if any record's datasize would exceed the buffer.
    static bool bufferValid(const std::vector<uint8_t> &buf) {
        if (buf.size() < sizeof(AuthHeader))
            return buf.empty(); // empty is ok, too-small header is not
            return buf.empty();

        size_t rd = sizeof(AuthHeader);
        while (rd + sizeof(AuthData::Record) <= buf.size()) {
@@ -83,6 +101,11 @@ namespace authdb {
        return true;
    }

    void ClusterBackend::forceRefresh() {
        auto cache = getDomainCache();
        cache->last_fetch = {};
    }

    static uint64_t fnv1a_hash(const std::vector<uint8_t> &data) {
        uint64_t hash = 14695981039346656037ULL;
        for (auto b : data) {
@@ -207,22 +230,42 @@ namespace authdb {
        if (!g_Cluster || !g_Cluster->isRunning())
            return;

        uint64_t dgid = domainGroupId();
        DBG_LOG("[CLUSTER-BE] fetchFromCluster domain=" << _Domain
                  << " buf_size=" << _Buffer.size() << " dirty=" << _Dirty.load() << "\n");

        auto cache = getDomainCache();
        auto now = std::chrono::steady_clock::now();
        bool do_peer_fetch = (now - _LastPeerFetch) >= std::chrono::seconds(30);

        // Fast path: check if cached data is fresh (no lock needed for read)
        bool do_peer_fetch = (now - cache->last_fetch) >= std::chrono::seconds(30);

        uint64_t cur_epoch = g_Cluster->getRecoveryEpoch();
        if (cur_epoch != _lastRecoveryEpoch) {
        if (cur_epoch != cache->recovery_epoch)
            do_peer_fetch = true;
            _lastRecoveryEpoch = cur_epoch;
            std::cerr << "[CLUSTER-BE] recovery detected, forcing refresh for domain=" << _Domain << std::endl;

        if (!do_peer_fetch) {
            // Use cached data if this instance has no data yet
            if (_Buffer.size() <= sizeof(AuthHeader) && !cache->buffer.empty()) {
                _Buffer = cache->buffer;
                _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader));
                DBG_LOG("[CLUSTER-BE] fetchFromCluster domain=" << _Domain
                          << " using cached buf_size=" << _Buffer.size() << "\n");
            }
            return;
        }

        if (!do_peer_fetch)
        // Serialize fetches per domain — if another thread is already fetching,
        // wait for its result instead of hammering the cluster.
        std::unique_lock<std::mutex> flk(cache->fetch_mutex, std::try_to_lock);
        if (!flk.owns_lock()) {
            std::lock_guard<std::mutex> wait_lk(cache->fetch_mutex);
            if (!cache->buffer.empty()) {
                _Buffer = cache->buffer;
                _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader));
            }
            return;
        }

        uint64_t dgid = domainGroupId();
        DBG_LOG("[CLUSTER-BE] fetchFromCluster domain=" << _Domain
                  << " buf_size=" << _Buffer.size() << " dirty=" << _Dirty.load() << "\n");

        auto &client = g_Cluster->getClient();
        if (!client) return;
@@ -231,7 +274,8 @@ namespace authdb {
        std::vector<uint8_t> manifest;
        try {
            manifest = client->retrieve(dgid);
            _LastPeerFetch = now;
            cache->last_fetch = now;
            cache->recovery_epoch = cur_epoch;
        } catch (const std::exception &e) {
            std::cerr << "ClusterBackend::fetchFromCluster: manifest retrieve failed: "
                      << e.what() << std::endl;
@@ -258,7 +302,6 @@ namespace authdb {
        moff += sizeof(uint32_t);

        if (magic != MANIFEST_MAGIC) {
            // Old format or corrupted — needs re-import
            _ClusterDataExists.store(true);
            std::cerr << "[CLUSTER-BE] domain=" << _Domain
                      << " manifest has wrong magic (legacy format?), needs re-import" << std::endl;
@@ -269,7 +312,7 @@ namespace authdb {
        std::memcpy(&entity_count, manifest.data() + moff, sizeof(uint32_t));
        moff += sizeof(uint32_t);

        // 3. Parse entity keys
        // 3. Parse entity keys from manifest
        std::vector<EntityKey> keys;
        keys.reserve(entity_count);
        for (uint32_t i = 0; i < entity_count; i++) {
@@ -303,6 +346,10 @@ namespace authdb {
        }

        _ClusterDataExists.store(_Buffer.size() > sizeof(AuthHeader));

        // Update shared cache
        cache->buffer = _Buffer;

        DBG_LOG("[CLUSTER-BE] fetchFromCluster rebuilt buf_size=" << _Buffer.size()
                  << " entities=" << keys.size() << "\n");
    }
@@ -336,13 +383,7 @@ namespace authdb {
    ClusterBackend::ClusterBackend(const char *domain, size_t version, const char *filepath)
        : _Domain(domain), _Pos(0), _Dirty(false)
    {
        // Initialize recovery epoch to current cluster value so newly created
        // instances don't trigger a redundant forced refresh.
        if (g_Cluster)
            _lastRecoveryEpoch = g_Cluster->getRecoveryEpoch();

        // Check if the local block store already has shards for this domain.
        // This tells the wizard guard that data exists even before peers are up.
        if (g_Cluster && g_Cluster->isRunning()) {
            uint64_t dgid = domainGroupId();
            auto &store = g_Cluster->getStore();
@@ -404,29 +445,8 @@ namespace authdb {
    }

    void ClusterBackend::prefetch() {
        // Fast path: if buffer has data and was recently fetched, skip entirely.
        // This avoids _FetchMutex contention when many threads call prefetch().
        if (!_Buffer.empty()) {
            auto now = std::chrono::steady_clock::now();
            bool due = (now - _LastPeerFetch) >= std::chrono::seconds(30);
            uint64_t cur_epoch = g_Cluster ? g_Cluster->getRecoveryEpoch() : _lastRecoveryEpoch;
            if (!due && cur_epoch == _lastRecoveryEpoch)
                return;  // data is fresh, no fetch needed
        }

        // Serialize fetches per-domain: if another thread is already fetching,
        // wait for its result instead of hammering the cluster in parallel.
        std::unique_lock<std::mutex> flk(_FetchMutex, std::try_to_lock);
        if (!flk.owns_lock()) {
            // Another thread is already fetching — wait for it to finish.
            DBG_LOG("[CLUSTER-BE] prefetch() domain=" << _Domain
                      << " — waiting for concurrent fetch\n");
            std::lock_guard<std::mutex> wait_lk(_FetchMutex);
            DBG_LOG("[CLUSTER-BE] prefetch() domain=" << _Domain
                      << " — concurrent fetch done, buf_size=" << _Buffer.size() << "\n");
            return;
        }

        // fetchFromCluster() handles the 30s throttle and domain-level
        // serialization via the shared DomainCache.
        bool was_dirty = _Dirty.load();
        DBG_LOG("[CLUSTER-BE] prefetch() domain=" << _Domain
                  << " buf_size=" << _Buffer.size() << " dirty=" << was_dirty << "\n");
@@ -482,6 +502,10 @@ namespace authdb {

                if (manifest_changed)
                    pushManifest(current_keys);

                // Update shared cache so other instances see the latest data
                auto cache = getDomainCache();
                cache->buffer = _Buffer;
            }
            _Dirty = false;
        }
+17 −4
Original line number Diff line number Diff line
@@ -34,6 +34,8 @@
#include <mutex>
#include <map>
#include <set>
#include <shared_mutex>
#include <memory>

#include "../backend.h"
#include "../authdb.h"
@@ -45,6 +47,16 @@
#endif

namespace authdb {

    // Shared per-domain cache: avoids redundant cluster fetches when multiple
    // ClusterBackend instances exist for the same domain (e.g. AD plugin).
    struct DomainCache {
        std::vector<uint8_t> buffer;
        std::chrono::steady_clock::time_point last_fetch{};
        uint64_t recovery_epoch{0};
        std::mutex fetch_mutex;
    };

    class VISIBILITY ClusterBackend : public AuthBackendApi {
    public:
        ClusterBackend(const char *domain, size_t version, const char *filepath = nullptr);
@@ -71,7 +83,7 @@ namespace authdb {
        bool clusterDataExists() const override { return _ClusterDataExists.load(); }

        // Force next fetchFromCluster to bypass the 30s throttle
        void forceRefresh() { _LastPeerFetch = {}; }
        void forceRefresh();

    private:
        struct EntityKey {
@@ -96,14 +108,15 @@ namespace authdb {
        void pushManifest(const std::set<EntityKey> &keys);
        void pushManifestSync(const std::set<EntityKey> &keys);

        static std::shared_mutex s_cache_mutex_;
        static std::map<std::string, std::shared_ptr<DomainCache>> s_domain_cache_;
        std::shared_ptr<DomainCache> getDomainCache();

        std::string              _Domain;
        std::vector<uint8_t>     _Buffer;
        size_t                   _Pos;
        std::atomic<bool>        _Dirty;
        std::atomic<bool>        _ClusterDataExists{false};
        std::chrono::steady_clock::time_point _LastPeerFetch{};
        uint64_t _lastRecoveryEpoch{0};
        std::mutex _FetchMutex;  // serialize per-domain cluster fetches
        std::map<EntityKey, uint64_t> entity_hashes_; // per-entity content hashes
    };
};