Commit 7fa24800 authored by jan.koester's avatar jan.koester
Browse files

test

parent 179f559d
Loading
Loading
Loading
Loading
+28 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
#include <functional>
#include <fstream>
#include <mutex>
#include <unordered_map>

namespace paritypp {

@@ -122,6 +123,33 @@ public:
    // total_size: total payload size (needed for length framing header).
    std::unique_ptr<store_session> begin_store(uint64_t group_id, size_t total_size);

    // --- Batch API ---
    // Store multiple items packed into a single parity group.
    // Each item has a uint64_t key and a data payload.
    // Format: [4B 'BTCH'][4B count][for each: 8B key, 4B len, data...]
    struct batch_item {
        uint64_t key;
        std::vector<uint8_t> data;
    };

    // Store a batch of items as one parity group under batch_gid.
    void store_batch(uint64_t batch_gid, const std::vector<batch_item>& items);

    // Retrieve all items from a batch group.
    std::vector<batch_item> retrieve_batch(uint64_t batch_gid);

    // Retrieve a single item by key from a batch group.
    // Returns true if found, false otherwise.
    bool retrieve_batch_item(uint64_t batch_gid, uint64_t item_key,
                             std::vector<uint8_t>& out);

    // Encode items into batch payload (static utility).
    static std::vector<uint8_t> encode_batch(const std::vector<batch_item>& items);

    // Decode batch payload into items (static utility).
    static bool decode_batch(const std::vector<uint8_t>& payload,
                             std::vector<batch_item>& items);

    friend class client_store_session;

private:
+12 −1
Original line number Diff line number Diff line
@@ -88,15 +88,21 @@ public:
    // Returns true on success.
    bool vacuum() override;

    // Flush buffered writes to disk immediately.
    void flush();

private:
    static constexpr uint8_t RECORD_LIVE    = 0x00;
    static constexpr uint8_t RECORD_DELETED = 0xFF;
    // Record header: [1B flags][8B group_id][4B block_index][4B data_len]
    static constexpr size_t RECORD_HEADER_SIZE = 17;
    // Write buffer threshold: flush when buffer exceeds this size
    static constexpr size_t WRITE_BUF_THRESHOLD = 256 * 1024; // 256 KB

    struct block_entry {
        off_t  offset;    // file offset of the record header
        uint32_t data_len;
        bool in_write_buf; // true if data is still in write_buf_ (not yet flushed)
    };

    std::string data_path_;
@@ -107,11 +113,16 @@ private:
        std::unordered_map<uint32_t, block_entry>> index_;
    uint32_t total_blocks_ = 0;
    off_t dead_bytes_ = 0;     // accumulated tombstoned bytes (header + data)
    off_t file_size_  = 0;    // current file size
    off_t file_size_  = 0;    // current file size (including unflushed)

    // Write coalescing buffer
    std::vector<uint8_t> write_buf_;
    off_t write_buf_base_ = 0; // file offset where write_buf_ starts

    bool scan_and_build_index();
    void maybe_auto_vacuum();
    bool vacuum_impl(); // internal vacuum, caller must hold mutex_
    void flush_impl();  // flush write buffer, caller must hold mutex_
};

// QUIC-based parity storage server
+91 −0
Original line number Diff line number Diff line
@@ -1291,4 +1291,95 @@ void client::mark_node_dead(size_t node_index) {
    }
}

// --- Batch API implementation ---

static constexpr uint8_t BATCH_MAGIC[4] = {'B', 'T', 'C', 'H'};

std::vector<uint8_t> client::encode_batch(const std::vector<batch_item>& items) {
    // Calculate total size: 4B magic + 4B count + per-item (8B key + 4B len + data)
    size_t total = 8;
    for (const auto& item : items)
        total += 12 + item.data.size();

    std::vector<uint8_t> payload(total);
    size_t pos = 0;

    // Magic
    std::memcpy(payload.data() + pos, BATCH_MAGIC, 4);
    pos += 4;

    // Item count
    uint32_t count = static_cast<uint32_t>(items.size());
    protocol::encode_u32(payload.data() + pos, count);
    pos += 4;

    // Items
    for (const auto& item : items) {
        protocol::encode_u64(payload.data() + pos, item.key);
        pos += 8;
        uint32_t dlen = static_cast<uint32_t>(item.data.size());
        protocol::encode_u32(payload.data() + pos, dlen);
        pos += 4;
        if (dlen > 0) {
            std::memcpy(payload.data() + pos, item.data.data(), dlen);
            pos += dlen;
        }
    }
    return payload;
}

bool client::decode_batch(const std::vector<uint8_t>& payload,
                           std::vector<batch_item>& items) {
    items.clear();
    if (payload.size() < 8) return false;

    // Check magic
    if (std::memcmp(payload.data(), BATCH_MAGIC, 4) != 0) return false;

    uint32_t count = protocol::decode_u32(payload.data() + 4);
    size_t pos = 8;

    items.reserve(count);
    for (uint32_t i = 0; i < count; ++i) {
        if (pos + 12 > payload.size()) return false;
        batch_item item;
        item.key = protocol::decode_u64(payload.data() + pos);
        pos += 8;
        uint32_t dlen = protocol::decode_u32(payload.data() + pos);
        pos += 4;
        if (pos + dlen > payload.size()) return false;
        item.data.assign(payload.data() + pos, payload.data() + pos + dlen);
        pos += dlen;
        items.push_back(std::move(item));
    }
    return true;
}

void client::store_batch(uint64_t batch_gid, const std::vector<batch_item>& items) {
    auto payload = encode_batch(items);
    store(batch_gid, payload.data(), payload.size());
}

std::vector<client::batch_item> client::retrieve_batch(uint64_t batch_gid) {
    auto payload = retrieve(batch_gid);
    std::vector<batch_item> items;
    if (!decode_batch(payload, items))
        throw std::runtime_error("invalid batch payload");
    return items;
}

bool client::retrieve_batch_item(uint64_t batch_gid, uint64_t item_key,
                                  std::vector<uint8_t>& out) {
    auto payload = retrieve(batch_gid);
    std::vector<batch_item> items;
    if (!decode_batch(payload, items)) return false;
    for (const auto& item : items) {
        if (item.key == item_key) {
            out = item.data;
            return true;
        }
    }
    return false;
}

} // namespace paritypp