Commit 001d63a6 authored by jan.koester's avatar jan.koester
Browse files

test

parent 227f0fc9
Loading
Loading
Loading
Loading
+20 −38
Original line number Diff line number Diff line
@@ -297,17 +297,18 @@ uint64_t client::store(const std::vector<uint8_t>& data) {

void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
                           const uint8_t* stripe_data, size_t stripe_len) {
    std::cerr << "[PARITY] store_stripe: ENTER gid=" << group_id
              << " stripe=" << stripe_index << " len=" << stripe_len << std::endl;
    // Pad to STRIPE_SIZE
    std::vector<uint8_t> padded(STRIPE_SIZE, 0);
    std::memcpy(padded.data(), stripe_data, std::min(stripe_len, STRIPE_SIZE));

    std::cerr << "[PARITY] store_stripe: splitting k=" << k_ << std::endl;
    auto sub_blocks = parity::split(padded.data(), STRIPE_SIZE, k_);
    std::cerr << "[PARITY] store_stripe: encoding parity m=" << m_ << std::endl;
    // Use actual data size (rounded up to multiple of k) instead of full STRIPE_SIZE
    // for small payloads — avoids sending 4MB of mostly zeros for tiny data.
    size_t effective_size = std::min(stripe_len, STRIPE_SIZE);
    // Round up to multiple of k_ so split() produces equal-size blocks
    size_t block_align = (effective_size + k_ - 1) / k_ * k_;
    if (block_align < k_) block_align = k_; // minimum 1 byte per block

    std::vector<uint8_t> padded(block_align, 0);
    std::memcpy(padded.data(), stripe_data, effective_size);

    auto sub_blocks = parity::split(padded.data(), block_align, k_);
    auto parity_blocks = parity_engine_.encode(sub_blocks);
    std::cerr << "[PARITY] store_stripe: parity done, n=" << (k_ + m_) << std::endl;

    size_t n = k_ + m_;

@@ -334,10 +335,7 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,

        if (static_cast<int>(i) == local_node_index_ && local_store_) {
            pending[i].local = true;
            std::cerr << "[PARITY] send_to_node[" << i << "]: local store gid=" << group_id
                      << " bidx=" << block_index << " blen=" << blen << std::endl;
            pending[i].ok = local_store_->store(group_id, block_index, bdata, blen);
            std::cerr << "[PARITY] send_to_node[" << i << "]: local store result=" << pending[i].ok << std::endl;
            if (!pending[i].ok) pending[i].fail_reason = "local store failed";
            pending[i].sent = true;
            return;
@@ -346,14 +344,11 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
        netplus::socketwait local_wait;  // thread-local wait instance

        try {
            std::cerr << "[PARITY] send_to_node[" << i << "]: connecting..." << std::endl;
            ensure_connected(i);
            if (!connections_[i].connected || !connections_[i].socket) {
                std::cerr << "[PARITY] send_to_node[" << i << "]: connect FAILED" << std::endl;
                pending[i].fail_reason = "connect failed (handshake timeout)";
                return;
            }
            std::cerr << "[PARITY] send_to_node[" << i << "]: connected, sending..." << std::endl;
            auto& conn = *connections_[i].socket;
            auto payload = protocol::make_store_block(group_id, block_index, bdata, blen);
            auto msg = protocol::encode_message(msg_type::STORE_BLOCK, payload);
@@ -367,7 +362,6 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
                    // Pump immediately to flush queued UDP datagrams
                    conn.pumpNetwork(MSG_DONTWAIT);
                    pending[i].sent = true;
                    std::cerr << "[PARITY] send_to_node[" << i << "]: sent OK" << std::endl;
                    break;
                }
                conn.pumpNetwork(MSG_DONTWAIT);
@@ -397,27 +391,19 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,

    // Phase 1: store local blocks + send to remote nodes in parallel
    {
        std::cerr << "[PARITY] store_stripe: local_node_index_=" << local_node_index_
                  << " local_store_=" << (local_store_ ? "yes" : "no") << std::endl;
        // Local node first (no thread needed)
        for (size_t i = 0; i < n; i++) {
            if (static_cast<int>(i) == local_node_index_ && local_store_) {
                std::cerr << "[PARITY] store_stripe: calling local send_to_node(" << i << ")" << std::endl;
                send_to_node(i);
                std::cerr << "[PARITY] store_stripe: local send_to_node(" << i << ") done" << std::endl;
            }
        }
        std::cerr << "[PARITY] store_stripe: Phase 1 launching " << n << " async tasks" << std::endl;
        // Remote nodes in parallel
        std::vector<std::future<void>> futures;
        for (size_t i = 0; i < n; i++) {
            if (static_cast<int>(i) == local_node_index_ && local_store_) continue;
            futures.push_back(std::async(std::launch::async, send_to_node, i));
        }
        for (size_t fi = 0; fi < futures.size(); ++fi) {
            futures[fi].get();
            std::cerr << "[PARITY] store_stripe: async task " << fi << " done" << std::endl;
        }
        for (auto& f : futures) f.get();
    }

    // Phase 1b: retry failed remote nodes once with a fresh connection
@@ -439,8 +425,6 @@ void client::store_stripe(uint64_t group_id, uint32_t stripe_index,
    for (auto& p : pending) {
        if (p.sent && !p.local) ++remaining;
    }
    std::cerr << "[PARITY] store_stripe: Phase 2 collecting " << remaining << " responses" << std::endl;

    // Pump all connections once — data may already be buffered from Phase 1
    for (auto& p : pending) {
        if (!p.sent || p.local || p.ok) continue;
@@ -795,14 +779,12 @@ bool client::retrieve_stripe(uint64_t group_id, uint32_t stripe_index,
        data_blocks = parity_engine_.decode(fetched, fetch_indices, block_size);
    }

    out = parity::join(data_blocks, STRIPE_SIZE);
    out = parity::join(data_blocks, 0);
    return true;
}

void client::store(uint64_t group_id, const uint8_t* data, size_t data_len) {
    std::cerr << "[PARITY] store: acquiring mutex gid=" << group_id << std::endl;
    std::lock_guard<std::mutex> guard(mutex_);
    std::cerr << "[PARITY] store: mutex acquired gid=" << group_id << std::endl;
    if (nodes_.size() < k_ + m_) {
        throw std::runtime_error("Not enough nodes for parity store ("
                                 + std::to_string(nodes_.size()) + " < "
@@ -827,25 +809,26 @@ void client::store(uint64_t group_id, const uint8_t* data, size_t data_len) {
            size_t offset = 0;

            while (offset < framed_len) {
                std::vector<uint8_t> stripe(STRIPE_SIZE, 0);
                size_t chunk = std::min<size_t>(STRIPE_SIZE, framed_len - offset);
                std::vector<uint8_t> stripe(chunk, 0);
                size_t filled = 0;

                // Fill from header bytes if still in range
                if (offset < 8) {
                    size_t hdr_copy = std::min<size_t>(8 - offset, STRIPE_SIZE);
                    size_t hdr_copy = std::min<size_t>(8 - offset, chunk);
                    std::memcpy(stripe.data(), header.data() + offset, hdr_copy);
                    filled = hdr_copy;
                }
                // Fill from payload
                if (filled < STRIPE_SIZE && offset + filled >= 8) {
                if (filled < chunk && offset + filled >= 8) {
                    size_t data_off = offset + filled - 8;
                    size_t data_copy = std::min<size_t>(STRIPE_SIZE - filled, data_len - data_off);
                    size_t data_copy = std::min<size_t>(chunk - filled, data_len - data_off);
                    if (data_copy > 0)
                        std::memcpy(stripe.data() + filled, data + data_off, data_copy);
                    filled += data_copy;
                }

                store_stripe(group_id, stripe_idx, stripe.data(), STRIPE_SIZE);
                store_stripe(group_id, stripe_idx, stripe.data(), filled);
                if (stripe_idx % 100 == 0 || stripe_idx + 1 == total_stripes) {
                    DBG_LOG("[PARITY]   stripe " << (stripe_idx+1) << "/" << total_stripes << "\n");
                }
@@ -905,9 +888,8 @@ public:
    void finalize() override {
        if (!ok_ || finalized_) return;
        finalized_ = true;
        // Flush last partial stripe (padded with zeros)
        // Flush last partial stripe (no padding needed — store_stripe handles it)
        if (!stripe_buf_.empty()) {
            stripe_buf_.resize(STRIPE_SIZE, 0);
            flush_stripe();
        }
    }