Commit 88cfc5d4 authored by jan.koester's avatar jan.koester
Browse files

test

parent 7fa24800
Loading
Loading
Loading
Loading
+71 −13
Original line number Diff line number Diff line
@@ -142,18 +142,24 @@ file_block_store::file_block_store(const std::string& base_dir)
}

file_block_store::~file_block_store() {
    if (fd_ >= 0) pp_close(fd_);
    if (fd_ >= 0) {
        std::lock_guard<std::mutex> lock(mutex_);
        flush_impl();
        pp_close(fd_);
    }
}

bool file_block_store::scan_and_build_index() {
    index_.clear();
    total_blocks_ = 0;
    dead_bytes_ = 0;
    write_buf_.clear();
    off_t pos = 0;
    off_t end = pp_lseek(fd_, 0, SEEK_END);
    if (end < 0) return false;
    pp_lseek(fd_, 0, SEEK_SET);
    file_size_ = end;
    write_buf_base_ = end;

    uint8_t hdr[RECORD_HEADER_SIZE];
    while (pos + static_cast<off_t>(RECORD_HEADER_SIZE) <= end) {
@@ -175,6 +181,7 @@ bool file_block_store::scan_and_build_index() {
            auto& entry = index_[group_id][block_idx];
            entry.offset   = pos;
            entry.data_len = data_len;
            entry.in_write_buf = false;
            ++total_blocks_;
        } else {
            dead_bytes_ += static_cast<off_t>(RECORD_HEADER_SIZE + data_len);
@@ -185,6 +192,27 @@ bool file_block_store::scan_and_build_index() {
    return true;
}

void file_block_store::flush_impl() {
    // Must be called with mutex_ held
    if (write_buf_.empty()) return;
    ssize_t w = pp_pwrite(fd_, write_buf_.data(), write_buf_.size(), write_buf_base_);
    if (w == static_cast<ssize_t>(write_buf_.size())) {
        // Mark all buffered entries as no longer in write_buf
        for (auto& [gid, blocks] : index_) {
            for (auto& [bidx, entry] : blocks) {
                entry.in_write_buf = false;
            }
        }
    }
    write_buf_base_ += static_cast<off_t>(write_buf_.size());
    write_buf_.clear();
}

void file_block_store::flush() {
    std::lock_guard<std::mutex> lock(mutex_);
    flush_impl();
}

bool file_block_store::store(uint64_t group_id, uint32_t block_index,
                              const uint8_t* data, size_t len) {
    std::lock_guard<std::mutex> lock(mutex_);
@@ -194,16 +222,23 @@ bool file_block_store::store(uint64_t group_id, uint32_t block_index,
    if (git != index_.end()) {
        auto bit = git->second.find(block_index);
        if (bit != git->second.end()) {
            if (bit->second.in_write_buf) {
                // Old record is still in write buffer — mark deleted in buffer
                off_t buf_off = bit->second.offset - write_buf_base_;
                if (buf_off >= 0 && static_cast<size_t>(buf_off) < write_buf_.size())
                    write_buf_[static_cast<size_t>(buf_off)] = RECORD_DELETED;
            } else {
                uint8_t del = RECORD_DELETED;
                (void)pp_pwrite(fd_, &del, 1, bit->second.offset);
            }
            dead_bytes_ += static_cast<off_t>(RECORD_HEADER_SIZE + bit->second.data_len);
            git->second.erase(bit);
            --total_blocks_;
        }
    }

    off_t pos = pp_lseek(fd_, 0, SEEK_END);
    if (pos < 0) return false;
    // Append to write buffer
    off_t pos = write_buf_base_ + static_cast<off_t>(write_buf_.size());

    uint8_t hdr[RECORD_HEADER_SIZE];
    hdr[0] = RECORD_LIVE;
@@ -212,19 +247,22 @@ bool file_block_store::store(uint64_t group_id, uint32_t block_index,
    std::memcpy(&hdr[9],  &block_index, 4);
    std::memcpy(&hdr[13], &dlen,        4);

    ssize_t w = pp_pwrite(fd_, hdr, RECORD_HEADER_SIZE, pos);
    if (w != static_cast<ssize_t>(RECORD_HEADER_SIZE)) return false;
    if (len > 0) {
        w = pp_pwrite(fd_, data, len, pos + RECORD_HEADER_SIZE);
        if (w != static_cast<ssize_t>(len)) return false;
    }
    write_buf_.insert(write_buf_.end(), hdr, hdr + RECORD_HEADER_SIZE);
    if (len > 0)
        write_buf_.insert(write_buf_.end(), data, data + len);

    block_entry e;
    e.offset   = pos;
    e.data_len = dlen;
    e.in_write_buf = true;
    index_[group_id][block_index] = e;
    ++total_blocks_;
    file_size_ = pos + RECORD_HEADER_SIZE + dlen;

    // Flush if buffer exceeds threshold
    if (write_buf_.size() >= WRITE_BUF_THRESHOLD)
        flush_impl();

    maybe_auto_vacuum();
    return true;
}
@@ -240,6 +278,15 @@ bool file_block_store::fetch(uint64_t group_id, uint32_t block_index,
    const auto& e = bit->second;
    out.resize(e.data_len);
    if (e.data_len > 0) {
        if (e.in_write_buf) {
            // Read from write buffer
            off_t buf_off = e.offset - write_buf_base_ + RECORD_HEADER_SIZE;
            if (buf_off >= 0 && static_cast<size_t>(buf_off + e.data_len) <= write_buf_.size()) {
                std::memcpy(out.data(), write_buf_.data() + buf_off, e.data_len);
                return true;
            }
            return false;
        }
        ssize_t n = pp_pread(fd_, out.data(), e.data_len,
                            e.offset + RECORD_HEADER_SIZE);
        if (n != static_cast<ssize_t>(e.data_len)) return false;
@@ -254,8 +301,14 @@ bool file_block_store::remove_group(uint64_t group_id) {

    // Tombstone all records in this group
    for (auto& [idx, entry] : git->second) {
        if (entry.in_write_buf) {
            off_t buf_off = entry.offset - write_buf_base_;
            if (buf_off >= 0 && static_cast<size_t>(buf_off) < write_buf_.size())
                write_buf_[static_cast<size_t>(buf_off)] = RECORD_DELETED;
        } else {
            uint8_t del = RECORD_DELETED;
            (void)pp_pwrite(fd_, &del, 1, entry.offset);
        }
        dead_bytes_ += static_cast<off_t>(RECORD_HEADER_SIZE + entry.data_len);
        --total_blocks_;
    }
@@ -302,6 +355,9 @@ bool file_block_store::vacuum() {

bool file_block_store::vacuum_impl() {
    // Must be called with mutex_ held
    // Flush any buffered writes first so all data is on disk
    flush_impl();

    std::string tmp_path = data_path_ + ".tmp";
    int tmp_fd = pp_open(tmp_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644);
    if (tmp_fd < 0) return false;
@@ -334,6 +390,7 @@ bool file_block_store::vacuum_impl() {
            block_entry ne;
            ne.offset   = new_pos;
            ne.data_len = entry.data_len;
            ne.in_write_buf = false;
            new_index[gid][idx] = ne;
            ++new_total;

@@ -354,6 +411,7 @@ bool file_block_store::vacuum_impl() {
    total_blocks_ = new_total;
    dead_bytes_ = 0;
    file_size_ = new_pos;
    write_buf_base_ = new_pos;
    return true;
}