Loading .gitignore +3 −0 Original line number Diff line number Diff line Loading @@ -21,3 +21,6 @@ test_tls_client.py server_error.txt server_error.txt server_output.txt server_error.txt server_error.txt server_output.txt server_error.txt +2123 −106 File changed.Preview size limit exceeded, changes collapsed. Show changes server_output.txt +40 −2 Original line number Diff line number Diff line Certificate validity: OK. / [SSL] loadServerPrivateKeyDer: looksPkcs8=0 children=9 [SSL] Loaded RSA private key [IOCP] runEventloop: Starting 4 threads. [SSL] ===== handshake_after_accept ENTER state=26 [SSL] handshake_after_accept loop: state=0 [SSL] Processing READ_CLIENT_HELLO state [SSL] READ_CLIENT_HELLO: got 428 bytes of handshake message [SSL] handshake_after_accept loop: state=1 [SSL] Entering TLS13_SEND_SERVER_HELLO state [SSL] Cleared secrets, checking keyshares... [SSL] use_x25519=1 use_p256=1 [SSL] About to derive handshake keys, group=29 x25519_shared.size=32 ecdhe_shared.size=0 [TLS] ECDHE shared secret (first 16 bytes): 98d8b65bcef6b5169b8dfa8b1b655fba [TLS] Derived keys - c_key: 52fffa0af1c49f8d... s_key: 78fb42529a1f221b [TLS] Derived IVs - c_iv: f9a93de230a75b091abb6d67 s_iv: 91b6107e2e019b6d0130c70b [SSL] Handshake keys derived successfully [SSL] ===== handshake_after_accept ENTER state=2 [SSL] handshake_after_accept loop: state=2 [SSL] TLS13_SEND_ENCRYPTED_FLIGHT: encflight_queued=0 [TLS] CertificateVerify: toSign buffer size=130 [TLS] CertificateVerify: transcript hash (first 16 bytes): 9f 02 fc e0 6f 82 f4 15 87 07 1c 4f 49 73 53 60 [TLS] RSA-PSS-SHA256 signature generated: size=256 [TLS] RSA-PSS signature (first 16 bytes): 24 61 f6 ca f7 2a 72 d5 45 a1 b4 b5 36 52 5c 53 [TLS] RSA-PSS signature (last 16 bytes): 71 64 dc 2e a2 26 4a 9f 49 0f e5 57 ce 58 f1 71 [TLS] CertificateVerify message complete: total size=260 (alg_id=2 + len=2 + sig=256) [TLS] CertificateVerify (first 20 bytes): 08 04 01 00 24 61 f6 ca f7 2a 72 d5 45 a1 b4 b5 36 52 5c 53 [SSL] TLS13_SEND_ENCRYPTED_FLIGHT: after flush_out, hasPendingWrite=0 [SSL] TLS13_SEND_ENCRYPTED_FLIGHT: transitioning to TLS13_WAIT_CLIENT_FINISHED [SSL] ===== handshake_after_accept ENTER state=3 [SSL] handshake_after_accept loop: state=3 [SSL] _tls13_read_record_handshake: got record, size=6 [SSL] _tls13_read_record_handshake: outer_type=0x14 ver=0x303 rlen=1 [SSL] _tls13_read_record_handshake: skipping CCS record [SSL] _tls13_read_record_handshake: got record, size=58 [SSL] _tls13_read_record_handshake: outer_type=0x17 ver=0x303 rlen=53 [TLS] record decrypted: size=37 stripped=37 type=0x16 [SSL] ===== handshake_after_accept ENTER state=9 [SSL] handshake_after_accept loop: state=9 [TLS] _tls13_recv_record: recv seq=0 data_len=112 is_client=0 handshake_keys=0 [TLS] IV (first 12 bytes): f6e0eb4aaa619573314ca59c src/event/iocp.cpp +147 −145 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define BLOCKSIZE 16384 #endif // Set to 1 to enable debug logging - REBUILD CHECK 2 // Set to 1 to enable debug logging - REBUILD CHECK 4 #define IOCP_DEBUG 1 #if IOCP_DEBUG Loading Loading @@ -301,67 +301,52 @@ namespace netplus { // ------------------------------------------------------------------------- static bool try_post_send(event* ev, EventState* /*st*/, con& c, int tid) { #if IOCP_DEBUG std::cerr << "[IOCP] try_post_send entry: WritePending=" << c.WritePending.load() << " SendData.size()=" << c.SendData.size() << " SendOff=" << c.SendOff << std::endl; std::cerr << "[IOCP] try_post_send entry: WritePending=" << c.WritePending.load() << " SendData.size()=" << c.SendData.size() << std::endl; #endif // Don't start new writes if connection is closing if (c.Closing.load()) return false; if (c.WritePending.load()) return false; const size_t avail = c.SendData.size(); if (avail <= c.SendOff) return false; if (c.SendData.empty()) return false; bool isSSL = (c.csock->getSocketType() == sockettype::SSL); if (isSSL) { // For SSL: First gather ALL data by calling ResponseEvent in a loop, // then send it chunk by chunk with async writes. // Send one chunk at a time, completion handler continues c.WritePending.store(true); try { // Only gather more data if this is the first call (SendOff == 0) if (c.SendOff == 0) { // Loop: call ResponseEvent to gather more data until no more is added while (true) { size_t prevSize = c.SendData.size(); ev->ResponseEvent(c, tid, (ULONG_PTR)prevSize); if (c.SendData.size() == prevSize) { break; } // Send one chunk from SendData buffer size_t sendlen = std::min<size_t>(c.SendData.size(), 16384); const char* ptr = c.SendData.data(); #if IOCP_DEBUG std::cerr << "[IOCP] SSL ResponseEvent added " << (c.SendData.size() - prevSize) << " bytes, total=" << c.SendData.size() << std::endl; std::cerr << "[IOCP] try_post_send: sending " << sendlen << " bytes (total=" << c.SendData.size() << ")" << std::endl; #endif buffer sbuf(ptr, sendlen); // sendData handles SSL encryption and calls flush_out internally // For plain TCP, sendData also works (but we use flush_out for async) size_t consumed = c.csock->sendData(sbuf, 0); #if IOCP_DEBUG std::cerr << "[IOCP] sendData consumed " << consumed << " bytes, hasPendingWrite=" << c.csock->hasPendingWrite() << std::endl; #endif // Remove sent data from front of buffer if (consumed > 0) { c.SendData.erase(c.SendData.begin(), c.SendData.begin() + consumed); } // Check if async write is pending (SSL uses IOCP) if (c.csock->hasPendingWrite()) { return true; // Write in progress, completion will continue } size_t remaining = c.SendData.size() - c.SendOff; if (remaining == 0) { // For synchronous completion (plain TCP sendData), continue sending if more data c.WritePending.store(false); c.SendData.clear(); c.SendOff = 0; return false; } const char* ptr = c.SendData.data() + c.SendOff; size_t oldOff = c.SendOff; #if IOCP_DEBUG std::cerr << "[IOCP] SSL try_post_send: sending " << remaining << " bytes from offset " << c.SendOff << std::endl; #endif buffer sbuf(ptr, remaining); // Update SendOff BEFORE sendData to prevent race with completion handler // sendData will consume up to 16KB per TLS record size_t toSend = remaining < 16384 ? remaining : 16384; c.SendOff += toSend; size_t sent = c.csock->sendData(sbuf, 0); #if IOCP_DEBUG std::cerr << "[IOCP] SSL sendData returned " << sent << " bytes, SendOff now=" << c.SendOff << std::endl; #endif // If sendData consumed less than expected, adjust SendOff if (sent < toSend) { c.SendOff = oldOff + sent; // If more data remains and no async pending, recurse if (!c.SendData.empty()) { return try_post_send(ev, nullptr, c, tid); } // Don't clear SendData yet - completion handler will call try_post_send again return false; // Async write pending return false; } catch (NetException& e) { c.WritePending.store(false); if (e.getErrorType() != NetException::Note) { Loading @@ -370,32 +355,6 @@ namespace netplus { // Note = would block, will retry later return false; } } else { // For plain TCP: use async WSASend (via flush_out) const size_t remaining = c.SendData.size() - c.SendOff; const char* ptr = c.SendData.data() + c.SendOff; c.WritePending.store(true); try { // Ensure _SendBuffer is large enough if (!c.csock->_SendBuffer) { c.csock->_SendBuffer = std::make_unique<buffer>(remaining); } if (c.csock->_SendBuffer->size < remaining) { c.csock->_SendBuffer = std::make_unique<buffer>(remaining); } std::memcpy(c.csock->_SendBuffer->data.buf, ptr, remaining); c.csock->_SendBuffer->data.len = (ULONG)remaining; c.csock->flush_out(); return false; // Async - completion will come later } catch (...) { c.WritePending.store(false); throw; } } } // ------------------------------------------------------------------------- Loading Loading @@ -536,14 +495,16 @@ namespace netplus { std::cerr << "[IOCP] Found connection, operation=" << (buf->operation == OP_READ ? "READ" : "WRITE") << " bytes=" << bytes << " InternalHigh=" << buf->overlapped.InternalHigh << std::endl; #endif // Always use the bytes value from IOCP completion - InternalHigh may be stale // For READ completions: push data directly into the socket's buffer // This avoids race conditions with InternalHigh being overwritten if (buf->operation == OP_READ && bytes > 0) { owner->csock->pushReceivedData( reinterpret_cast<const uint8_t*>(buf->data.buf), bytes ); #if IOCP_DEBUG if (buf->overlapped.InternalHigh != bytes) { std::cerr << "[IOCP] Fixup: setting InternalHigh from " << buf->overlapped.InternalHigh << " to " << bytes << std::endl; } std::cerr << "[IOCP] Pushed " << bytes << " bytes to socket buffer" << std::endl; #endif buf->overlapped.InternalHigh = bytes; } if (!ok) { Loading Loading @@ -587,9 +548,22 @@ namespace netplus { continue; } // For SSL, close() may post an async close_notify write // Set WritePending temporarily to check after close try { owner->csock->close(); } catch (...) {} // If close triggered an async write, defer removal to write completion if (owner->WritePending.load() || owner->csock->hasPendingWrite()) { #if IOCP_DEBUG std::cerr << "[IOCP] READ=0: close triggered pending write, deferring removal" << std::endl; #endif continue; } ev->DisconnectEvent(*owner, tid, 0); remove_con(st, cs); try { owner->csock->close(); } catch (...) {} continue; } Loading Loading @@ -727,9 +701,9 @@ handshake_continue: #if IOCP_DEBUG std::cerr << "[IOCP] Calling try_post_send, SendData.size()=" << owner->SendData.size() << std::endl; #endif try_post_send(ev, st, *owner, tid); bool writePosted = try_post_send(ev, st, *owner, tid); #if IOCP_DEBUG std::cerr << "[IOCP] try_post_send completed" << std::endl; std::cerr << "[IOCP] try_post_send returned " << writePosted << ", WritePending=" << owner->WritePending.load() << std::endl; #endif } catch (std::exception& ex) { #if IOCP_DEBUG Loading Loading @@ -767,14 +741,8 @@ handshake_continue: continue; } // For SSL, the send encrypts all pending data at once // So we clear SendData entirely on completion bool isSSL = (owner->csock->getSocketType() == sockettype::SSL); // CRITICAL: Clear the pending IOCP write flag owner->csock->setPendingWrite(false); #if IOCP_DEBUG std::cerr << "[IOCP] Write completed, bytes=" << bytes << " cleared _pendingIocpWrite, isSSL=" << isSSL << std::endl; std::cerr << "[IOCP] Write completed, bytes=" << bytes << std::endl; #endif // Check if connection is closing - cleanup now that write is done Loading @@ -788,18 +756,7 @@ handshake_continue: try { owner->csock->close(); } catch (...) {} continue; } if (!isSSL) { // Plain TCP: track bytes sent owner->SendOff += (size_t)bytes; // finished sending? clear buffer if (owner->SendOff >= owner->SendData.size()) { owner->SendData.clear(); owner->SendOff = 0; } } // For SSL: Don't clear SendData/SendOff here - handled in try_post_send // SendData is managed by try_post_send (erase pattern) // If handshake not done, need to continue it if (!owner->csock->getHandshakeDone()) { Loading @@ -807,16 +764,15 @@ handshake_continue: std::cerr << "[IOCP] Write completed during handshake, continuing handshake state machine..." << std::endl; #endif // Clear the pending write flag BEFORE calling handshake_after_accept owner->csock->setPendingWrite(false); owner->WritePending.store(false); owner->csock->setPendingWrite(false); std::lock_guard<std::mutex> hsLock(owner->event_mutex); try { // Continue the handshake state machine owner->csock->handshake_after_accept(); #if IOCP_DEBUG std::cerr << "[IOCP] handshake_after_accept completed, handshakeDone=" << owner->csock->getHandshakeDone() << " hasPendingWrite=" << owner->csock->hasPendingWrite() << std::endl; std::cerr << "[IOCP] handshake_after_accept completed, handshakeDone=" << owner->csock->getHandshakeDone() << std::endl; #endif // If handshake is done, post recv for application data Loading @@ -825,10 +781,8 @@ handshake_continue: post_recv(st, *owner); } catch (...) {} } // If there's a new pending write, the state machine has already called flush_out() // and we'll get another WRITE completion // If handshake needs more data from client, post recv else if (!owner->csock->hasPendingWrite()) { // Handshake needs more data from client, post recv else { try { post_recv(st, *owner); } catch (...) {} Loading Loading @@ -859,12 +813,13 @@ handshake_continue: continue; } // Use mutex to serialize ResponseEvent/try_post_send calls { std::lock_guard<std::mutex> sendLock(owner->event_mutex); // NO MUTEX HERE - avoid deadlock with flush_out // WritePending atomic protects against concurrent try_post_send calls owner->WritePending.store(false); owner->csock->setPendingWrite(false); // Check if connection is closing (peer closed or error) if (owner->Closing.load()) { #if IOCP_DEBUG Loading @@ -876,29 +831,51 @@ handshake_continue: continue; } // For SSL: data was already gathered before sending, don't call ResponseEvent again // For plain: call ResponseEvent to allow application to add more data bool isSSL = (owner->csock->getSocketType() == sockettype::SSL); if (!isSSL) { // Call ResponseEvent FIRST (if SendData empty), then send data try { // Call ResponseEvent when SendData empty - app adds more data if (owner->SendData.empty() && !owner->csock->hasPendingWrite()) { #if IOCP_DEBUG std::cerr << "[IOCP] SendData empty, calling ResponseEvent" << std::endl; #endif ev->ResponseEvent(*owner, tid, (ULONG_PTR)bytes); #if IOCP_DEBUG std::cerr << "[IOCP] ResponseEvent returned, SendData.size()=" << owner->SendData.size() << std::endl; #endif } // Now send whatever data is in SendData if (!owner->SendData.empty()) { #if IOCP_DEBUG if (isSSL) { std::cerr << "[IOCP] SSL write completion: " << bytes << " bytes sent, SendOff=" << owner->SendOff << " SendData.size()=" << owner->SendData.size() << std::endl; } else { std::cerr << "[IOCP] Write completion: " << bytes << " bytes sent, called ResponseEvent, SendData.size()=" << owner->SendData.size() << std::endl; } std::cerr << "[IOCP] Sending data, SendData.size()=" << owner->SendData.size() << std::endl; #endif // Don't clear SendData here - let try_post_send handle it to avoid race condition bool posted = try_post_send(ev, st, *owner, tid); #if IOCP_DEBUG std::cerr << "[IOCP] try_post_send returned " << posted << ", hasPendingWrite=" << owner->csock->hasPendingWrite() << std::endl; #endif if (posted) { // Write posted, completion will call us again continue; } } // continue sending if more data is pending // If no more data to send and not closing, post recv for next request (keep-alive) if (owner->SendData.empty() && !owner->csock->hasPendingWrite() && !owner->Closing.load()) { #if IOCP_DEBUG std::cerr << "[IOCP] All done, posting recv for keep-alive" << std::endl; #endif try { try_post_send(ev, st, *owner, tid); post_recv(st, *owner); } catch (...) {} } } catch (std::exception& ex) { #if IOCP_DEBUG std::cerr << "[IOCP] Write completion exception: " << ex.what() << std::endl; #endif try_cleanup_con(ev, st, owner, cs, tid); } catch (...) { try_cleanup_con(ev, st, owner, cs, tid); } } // send buffer is internal - no delete // if (buf) delete buf; Loading Loading @@ -941,10 +918,35 @@ handshake_continue: EventState* st = ST(this); if (!st) return; // wake all workers // Close server socket first to stop AcceptEx from blocking if (_ServerSocket) { closesocket((SOCKET)_ServerSocket->fd()); } // Cancel all pending I/O on registered connections { std::lock_guard<std::mutex> lk(st->conMutex); for (auto& kv : st->sockToCon) { CancelIoEx((HANDLE)kv.first, nullptr); } } // Cancel pending accepts { std::lock_guard<std::mutex> lk(st->accMutex); for (auto& kv : st->acceptByOv) { if (kv.second && kv.second->csock) { CancelIoEx((HANDLE)kv.second->csock->fd(), nullptr); } } } // wake all workers multiple times to ensure they exit for (int round = 0; round < 3; ++round) { for (size_t i = 0; i < st->workers.size(); ++i) { PostQueuedCompletionStatus(st->iocp, 0, 0, nullptr); } } for (auto& t : st->workers) { if (t.joinable()) t.join(); Loading src/socket.h +15 −11 Original line number Diff line number Diff line Loading @@ -162,6 +162,8 @@ namespace netplus { virtual void handshake_after_connect(){}; #ifdef Windows virtual bool hasPendingWrite() const { return _pendingIocpWrite.load(); } virtual void setPendingWrite(bool pending) { _pendingIocpWrite.store(pending); } virtual void accept(LPFN_ACCEPTEX lpfnAcceptEx, std::unique_ptr<socket>& csock) = 0; virtual void prime_read() = 0; std::unique_ptr<buffer> _ReadBuffer; Loading @@ -178,12 +180,13 @@ namespace netplus { virtual size_t sendData(buffer& data, int flags = 0) = 0; virtual size_t recvData(buffer& data, int flags = 0) = 0; virtual bool hasPendingWrite() const { return false; } virtual void setPendingWrite(bool pending) {} // Only used for SSL/IOCP virtual bool getHandshakeDone() { return true; } virtual bool hasBufferedData() const { return false; } virtual int getSocketType() const { return _Type; } // Push received data directly into buffer (for IOCP) virtual void pushReceivedData(const uint8_t* data, size_t len) { (void)data; (void)len; } virtual void connect(const std::string& addr, int port, bool nonblock = false) = 0; virtual void getAddress(std::string& addr) = 0; Loading @@ -207,14 +210,10 @@ namespace netplus { int _Type; ULONG_PTR _Extension; protected: #ifdef Windows WSAOVERLAPPED _Overlapped; WSABUF _wsaBuf; bool _Wait; int _Timeout; std::atomic<bool> _pendingIocpWrite{false}; #endif protected: // ✅ legacy helper still available void copyAddrInfo(ULONG_PTR* dest, ULONG_PTR src, size_t srclen); Loading Loading @@ -346,10 +345,10 @@ namespace netplus { // Check _send_queue (queued but not flushed) // Check _send_off (current record being transmitted via WSASend) // Check _pendingIocpWrite (async WSASend posted but not completed) return (!_send_queue.empty()) || (_send_off > 0) || _pendingIocpWrite; return (!_send_queue.empty()) || (_send_off > 0) || _pendingIocpWrite.load(); } void setPendingWrite(bool pending) override { _pendingIocpWrite = pending; } void setPendingWrite(bool pending) override { _pendingIocpWrite.store(pending); } bool getHandshakeDone() override { return _handshakeDone; } Loading @@ -369,6 +368,11 @@ namespace netplus { // Check if there's buffered data waiting to be processed bool hasBufferedData() const { return !_rx_tcp_buf.empty(); } // Push received data directly into the buffer (called by IOCP) void pushReceivedData(const uint8_t* data, size_t len) override { _rx_tcp_buf.insert(_rx_tcp_buf.end(), data, data + len); } private: // --- crypto helpers --- std::vector<uint8_t> _sha1_hash(const std::vector<uint8_t>& input); Loading Loading @@ -547,7 +551,7 @@ namespace netplus { std::deque<std::vector<uint8_t>> _send_queue; std::vector<uint8_t> _send_record; size_t _send_off = 0; bool _pendingIocpWrite = false; // IOCP WSASend posted but not yet completed std::atomic<bool> _pendingIocpWrite{false}; // IOCP WSASend posted but not yet completed std::vector<uint8_t> _rx_record_buf; std::vector<uint8_t> _rx_handshake_buf; Loading Loading
.gitignore +3 −0 Original line number Diff line number Diff line Loading @@ -21,3 +21,6 @@ test_tls_client.py server_error.txt server_error.txt server_output.txt server_error.txt server_error.txt server_output.txt
server_error.txt +2123 −106 File changed.Preview size limit exceeded, changes collapsed. Show changes
server_output.txt +40 −2 Original line number Diff line number Diff line Certificate validity: OK. / [SSL] loadServerPrivateKeyDer: looksPkcs8=0 children=9 [SSL] Loaded RSA private key [IOCP] runEventloop: Starting 4 threads. [SSL] ===== handshake_after_accept ENTER state=26 [SSL] handshake_after_accept loop: state=0 [SSL] Processing READ_CLIENT_HELLO state [SSL] READ_CLIENT_HELLO: got 428 bytes of handshake message [SSL] handshake_after_accept loop: state=1 [SSL] Entering TLS13_SEND_SERVER_HELLO state [SSL] Cleared secrets, checking keyshares... [SSL] use_x25519=1 use_p256=1 [SSL] About to derive handshake keys, group=29 x25519_shared.size=32 ecdhe_shared.size=0 [TLS] ECDHE shared secret (first 16 bytes): 98d8b65bcef6b5169b8dfa8b1b655fba [TLS] Derived keys - c_key: 52fffa0af1c49f8d... s_key: 78fb42529a1f221b [TLS] Derived IVs - c_iv: f9a93de230a75b091abb6d67 s_iv: 91b6107e2e019b6d0130c70b [SSL] Handshake keys derived successfully [SSL] ===== handshake_after_accept ENTER state=2 [SSL] handshake_after_accept loop: state=2 [SSL] TLS13_SEND_ENCRYPTED_FLIGHT: encflight_queued=0 [TLS] CertificateVerify: toSign buffer size=130 [TLS] CertificateVerify: transcript hash (first 16 bytes): 9f 02 fc e0 6f 82 f4 15 87 07 1c 4f 49 73 53 60 [TLS] RSA-PSS-SHA256 signature generated: size=256 [TLS] RSA-PSS signature (first 16 bytes): 24 61 f6 ca f7 2a 72 d5 45 a1 b4 b5 36 52 5c 53 [TLS] RSA-PSS signature (last 16 bytes): 71 64 dc 2e a2 26 4a 9f 49 0f e5 57 ce 58 f1 71 [TLS] CertificateVerify message complete: total size=260 (alg_id=2 + len=2 + sig=256) [TLS] CertificateVerify (first 20 bytes): 08 04 01 00 24 61 f6 ca f7 2a 72 d5 45 a1 b4 b5 36 52 5c 53 [SSL] TLS13_SEND_ENCRYPTED_FLIGHT: after flush_out, hasPendingWrite=0 [SSL] TLS13_SEND_ENCRYPTED_FLIGHT: transitioning to TLS13_WAIT_CLIENT_FINISHED [SSL] ===== handshake_after_accept ENTER state=3 [SSL] handshake_after_accept loop: state=3 [SSL] _tls13_read_record_handshake: got record, size=6 [SSL] _tls13_read_record_handshake: outer_type=0x14 ver=0x303 rlen=1 [SSL] _tls13_read_record_handshake: skipping CCS record [SSL] _tls13_read_record_handshake: got record, size=58 [SSL] _tls13_read_record_handshake: outer_type=0x17 ver=0x303 rlen=53 [TLS] record decrypted: size=37 stripped=37 type=0x16 [SSL] ===== handshake_after_accept ENTER state=9 [SSL] handshake_after_accept loop: state=9 [TLS] _tls13_recv_record: recv seq=0 data_len=112 is_client=0 handshake_keys=0 [TLS] IV (first 12 bytes): f6e0eb4aaa619573314ca59c
src/event/iocp.cpp +147 −145 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define BLOCKSIZE 16384 #endif // Set to 1 to enable debug logging - REBUILD CHECK 2 // Set to 1 to enable debug logging - REBUILD CHECK 4 #define IOCP_DEBUG 1 #if IOCP_DEBUG Loading Loading @@ -301,67 +301,52 @@ namespace netplus { // ------------------------------------------------------------------------- static bool try_post_send(event* ev, EventState* /*st*/, con& c, int tid) { #if IOCP_DEBUG std::cerr << "[IOCP] try_post_send entry: WritePending=" << c.WritePending.load() << " SendData.size()=" << c.SendData.size() << " SendOff=" << c.SendOff << std::endl; std::cerr << "[IOCP] try_post_send entry: WritePending=" << c.WritePending.load() << " SendData.size()=" << c.SendData.size() << std::endl; #endif // Don't start new writes if connection is closing if (c.Closing.load()) return false; if (c.WritePending.load()) return false; const size_t avail = c.SendData.size(); if (avail <= c.SendOff) return false; if (c.SendData.empty()) return false; bool isSSL = (c.csock->getSocketType() == sockettype::SSL); if (isSSL) { // For SSL: First gather ALL data by calling ResponseEvent in a loop, // then send it chunk by chunk with async writes. // Send one chunk at a time, completion handler continues c.WritePending.store(true); try { // Only gather more data if this is the first call (SendOff == 0) if (c.SendOff == 0) { // Loop: call ResponseEvent to gather more data until no more is added while (true) { size_t prevSize = c.SendData.size(); ev->ResponseEvent(c, tid, (ULONG_PTR)prevSize); if (c.SendData.size() == prevSize) { break; } // Send one chunk from SendData buffer size_t sendlen = std::min<size_t>(c.SendData.size(), 16384); const char* ptr = c.SendData.data(); #if IOCP_DEBUG std::cerr << "[IOCP] SSL ResponseEvent added " << (c.SendData.size() - prevSize) << " bytes, total=" << c.SendData.size() << std::endl; std::cerr << "[IOCP] try_post_send: sending " << sendlen << " bytes (total=" << c.SendData.size() << ")" << std::endl; #endif buffer sbuf(ptr, sendlen); // sendData handles SSL encryption and calls flush_out internally // For plain TCP, sendData also works (but we use flush_out for async) size_t consumed = c.csock->sendData(sbuf, 0); #if IOCP_DEBUG std::cerr << "[IOCP] sendData consumed " << consumed << " bytes, hasPendingWrite=" << c.csock->hasPendingWrite() << std::endl; #endif // Remove sent data from front of buffer if (consumed > 0) { c.SendData.erase(c.SendData.begin(), c.SendData.begin() + consumed); } // Check if async write is pending (SSL uses IOCP) if (c.csock->hasPendingWrite()) { return true; // Write in progress, completion will continue } size_t remaining = c.SendData.size() - c.SendOff; if (remaining == 0) { // For synchronous completion (plain TCP sendData), continue sending if more data c.WritePending.store(false); c.SendData.clear(); c.SendOff = 0; return false; } const char* ptr = c.SendData.data() + c.SendOff; size_t oldOff = c.SendOff; #if IOCP_DEBUG std::cerr << "[IOCP] SSL try_post_send: sending " << remaining << " bytes from offset " << c.SendOff << std::endl; #endif buffer sbuf(ptr, remaining); // Update SendOff BEFORE sendData to prevent race with completion handler // sendData will consume up to 16KB per TLS record size_t toSend = remaining < 16384 ? remaining : 16384; c.SendOff += toSend; size_t sent = c.csock->sendData(sbuf, 0); #if IOCP_DEBUG std::cerr << "[IOCP] SSL sendData returned " << sent << " bytes, SendOff now=" << c.SendOff << std::endl; #endif // If sendData consumed less than expected, adjust SendOff if (sent < toSend) { c.SendOff = oldOff + sent; // If more data remains and no async pending, recurse if (!c.SendData.empty()) { return try_post_send(ev, nullptr, c, tid); } // Don't clear SendData yet - completion handler will call try_post_send again return false; // Async write pending return false; } catch (NetException& e) { c.WritePending.store(false); if (e.getErrorType() != NetException::Note) { Loading @@ -370,32 +355,6 @@ namespace netplus { // Note = would block, will retry later return false; } } else { // For plain TCP: use async WSASend (via flush_out) const size_t remaining = c.SendData.size() - c.SendOff; const char* ptr = c.SendData.data() + c.SendOff; c.WritePending.store(true); try { // Ensure _SendBuffer is large enough if (!c.csock->_SendBuffer) { c.csock->_SendBuffer = std::make_unique<buffer>(remaining); } if (c.csock->_SendBuffer->size < remaining) { c.csock->_SendBuffer = std::make_unique<buffer>(remaining); } std::memcpy(c.csock->_SendBuffer->data.buf, ptr, remaining); c.csock->_SendBuffer->data.len = (ULONG)remaining; c.csock->flush_out(); return false; // Async - completion will come later } catch (...) { c.WritePending.store(false); throw; } } } // ------------------------------------------------------------------------- Loading Loading @@ -536,14 +495,16 @@ namespace netplus { std::cerr << "[IOCP] Found connection, operation=" << (buf->operation == OP_READ ? "READ" : "WRITE") << " bytes=" << bytes << " InternalHigh=" << buf->overlapped.InternalHigh << std::endl; #endif // Always use the bytes value from IOCP completion - InternalHigh may be stale // For READ completions: push data directly into the socket's buffer // This avoids race conditions with InternalHigh being overwritten if (buf->operation == OP_READ && bytes > 0) { owner->csock->pushReceivedData( reinterpret_cast<const uint8_t*>(buf->data.buf), bytes ); #if IOCP_DEBUG if (buf->overlapped.InternalHigh != bytes) { std::cerr << "[IOCP] Fixup: setting InternalHigh from " << buf->overlapped.InternalHigh << " to " << bytes << std::endl; } std::cerr << "[IOCP] Pushed " << bytes << " bytes to socket buffer" << std::endl; #endif buf->overlapped.InternalHigh = bytes; } if (!ok) { Loading Loading @@ -587,9 +548,22 @@ namespace netplus { continue; } // For SSL, close() may post an async close_notify write // Set WritePending temporarily to check after close try { owner->csock->close(); } catch (...) {} // If close triggered an async write, defer removal to write completion if (owner->WritePending.load() || owner->csock->hasPendingWrite()) { #if IOCP_DEBUG std::cerr << "[IOCP] READ=0: close triggered pending write, deferring removal" << std::endl; #endif continue; } ev->DisconnectEvent(*owner, tid, 0); remove_con(st, cs); try { owner->csock->close(); } catch (...) {} continue; } Loading Loading @@ -727,9 +701,9 @@ handshake_continue: #if IOCP_DEBUG std::cerr << "[IOCP] Calling try_post_send, SendData.size()=" << owner->SendData.size() << std::endl; #endif try_post_send(ev, st, *owner, tid); bool writePosted = try_post_send(ev, st, *owner, tid); #if IOCP_DEBUG std::cerr << "[IOCP] try_post_send completed" << std::endl; std::cerr << "[IOCP] try_post_send returned " << writePosted << ", WritePending=" << owner->WritePending.load() << std::endl; #endif } catch (std::exception& ex) { #if IOCP_DEBUG Loading Loading @@ -767,14 +741,8 @@ handshake_continue: continue; } // For SSL, the send encrypts all pending data at once // So we clear SendData entirely on completion bool isSSL = (owner->csock->getSocketType() == sockettype::SSL); // CRITICAL: Clear the pending IOCP write flag owner->csock->setPendingWrite(false); #if IOCP_DEBUG std::cerr << "[IOCP] Write completed, bytes=" << bytes << " cleared _pendingIocpWrite, isSSL=" << isSSL << std::endl; std::cerr << "[IOCP] Write completed, bytes=" << bytes << std::endl; #endif // Check if connection is closing - cleanup now that write is done Loading @@ -788,18 +756,7 @@ handshake_continue: try { owner->csock->close(); } catch (...) {} continue; } if (!isSSL) { // Plain TCP: track bytes sent owner->SendOff += (size_t)bytes; // finished sending? clear buffer if (owner->SendOff >= owner->SendData.size()) { owner->SendData.clear(); owner->SendOff = 0; } } // For SSL: Don't clear SendData/SendOff here - handled in try_post_send // SendData is managed by try_post_send (erase pattern) // If handshake not done, need to continue it if (!owner->csock->getHandshakeDone()) { Loading @@ -807,16 +764,15 @@ handshake_continue: std::cerr << "[IOCP] Write completed during handshake, continuing handshake state machine..." << std::endl; #endif // Clear the pending write flag BEFORE calling handshake_after_accept owner->csock->setPendingWrite(false); owner->WritePending.store(false); owner->csock->setPendingWrite(false); std::lock_guard<std::mutex> hsLock(owner->event_mutex); try { // Continue the handshake state machine owner->csock->handshake_after_accept(); #if IOCP_DEBUG std::cerr << "[IOCP] handshake_after_accept completed, handshakeDone=" << owner->csock->getHandshakeDone() << " hasPendingWrite=" << owner->csock->hasPendingWrite() << std::endl; std::cerr << "[IOCP] handshake_after_accept completed, handshakeDone=" << owner->csock->getHandshakeDone() << std::endl; #endif // If handshake is done, post recv for application data Loading @@ -825,10 +781,8 @@ handshake_continue: post_recv(st, *owner); } catch (...) {} } // If there's a new pending write, the state machine has already called flush_out() // and we'll get another WRITE completion // If handshake needs more data from client, post recv else if (!owner->csock->hasPendingWrite()) { // Handshake needs more data from client, post recv else { try { post_recv(st, *owner); } catch (...) {} Loading Loading @@ -859,12 +813,13 @@ handshake_continue: continue; } // Use mutex to serialize ResponseEvent/try_post_send calls { std::lock_guard<std::mutex> sendLock(owner->event_mutex); // NO MUTEX HERE - avoid deadlock with flush_out // WritePending atomic protects against concurrent try_post_send calls owner->WritePending.store(false); owner->csock->setPendingWrite(false); // Check if connection is closing (peer closed or error) if (owner->Closing.load()) { #if IOCP_DEBUG Loading @@ -876,29 +831,51 @@ handshake_continue: continue; } // For SSL: data was already gathered before sending, don't call ResponseEvent again // For plain: call ResponseEvent to allow application to add more data bool isSSL = (owner->csock->getSocketType() == sockettype::SSL); if (!isSSL) { // Call ResponseEvent FIRST (if SendData empty), then send data try { // Call ResponseEvent when SendData empty - app adds more data if (owner->SendData.empty() && !owner->csock->hasPendingWrite()) { #if IOCP_DEBUG std::cerr << "[IOCP] SendData empty, calling ResponseEvent" << std::endl; #endif ev->ResponseEvent(*owner, tid, (ULONG_PTR)bytes); #if IOCP_DEBUG std::cerr << "[IOCP] ResponseEvent returned, SendData.size()=" << owner->SendData.size() << std::endl; #endif } // Now send whatever data is in SendData if (!owner->SendData.empty()) { #if IOCP_DEBUG if (isSSL) { std::cerr << "[IOCP] SSL write completion: " << bytes << " bytes sent, SendOff=" << owner->SendOff << " SendData.size()=" << owner->SendData.size() << std::endl; } else { std::cerr << "[IOCP] Write completion: " << bytes << " bytes sent, called ResponseEvent, SendData.size()=" << owner->SendData.size() << std::endl; } std::cerr << "[IOCP] Sending data, SendData.size()=" << owner->SendData.size() << std::endl; #endif // Don't clear SendData here - let try_post_send handle it to avoid race condition bool posted = try_post_send(ev, st, *owner, tid); #if IOCP_DEBUG std::cerr << "[IOCP] try_post_send returned " << posted << ", hasPendingWrite=" << owner->csock->hasPendingWrite() << std::endl; #endif if (posted) { // Write posted, completion will call us again continue; } } // continue sending if more data is pending // If no more data to send and not closing, post recv for next request (keep-alive) if (owner->SendData.empty() && !owner->csock->hasPendingWrite() && !owner->Closing.load()) { #if IOCP_DEBUG std::cerr << "[IOCP] All done, posting recv for keep-alive" << std::endl; #endif try { try_post_send(ev, st, *owner, tid); post_recv(st, *owner); } catch (...) {} } } catch (std::exception& ex) { #if IOCP_DEBUG std::cerr << "[IOCP] Write completion exception: " << ex.what() << std::endl; #endif try_cleanup_con(ev, st, owner, cs, tid); } catch (...) { try_cleanup_con(ev, st, owner, cs, tid); } } // send buffer is internal - no delete // if (buf) delete buf; Loading Loading @@ -941,10 +918,35 @@ handshake_continue: EventState* st = ST(this); if (!st) return; // wake all workers // Close server socket first to stop AcceptEx from blocking if (_ServerSocket) { closesocket((SOCKET)_ServerSocket->fd()); } // Cancel all pending I/O on registered connections { std::lock_guard<std::mutex> lk(st->conMutex); for (auto& kv : st->sockToCon) { CancelIoEx((HANDLE)kv.first, nullptr); } } // Cancel pending accepts { std::lock_guard<std::mutex> lk(st->accMutex); for (auto& kv : st->acceptByOv) { if (kv.second && kv.second->csock) { CancelIoEx((HANDLE)kv.second->csock->fd(), nullptr); } } } // wake all workers multiple times to ensure they exit for (int round = 0; round < 3; ++round) { for (size_t i = 0; i < st->workers.size(); ++i) { PostQueuedCompletionStatus(st->iocp, 0, 0, nullptr); } } for (auto& t : st->workers) { if (t.joinable()) t.join(); Loading
src/socket.h +15 −11 Original line number Diff line number Diff line Loading @@ -162,6 +162,8 @@ namespace netplus { virtual void handshake_after_connect(){}; #ifdef Windows virtual bool hasPendingWrite() const { return _pendingIocpWrite.load(); } virtual void setPendingWrite(bool pending) { _pendingIocpWrite.store(pending); } virtual void accept(LPFN_ACCEPTEX lpfnAcceptEx, std::unique_ptr<socket>& csock) = 0; virtual void prime_read() = 0; std::unique_ptr<buffer> _ReadBuffer; Loading @@ -178,12 +180,13 @@ namespace netplus { virtual size_t sendData(buffer& data, int flags = 0) = 0; virtual size_t recvData(buffer& data, int flags = 0) = 0; virtual bool hasPendingWrite() const { return false; } virtual void setPendingWrite(bool pending) {} // Only used for SSL/IOCP virtual bool getHandshakeDone() { return true; } virtual bool hasBufferedData() const { return false; } virtual int getSocketType() const { return _Type; } // Push received data directly into buffer (for IOCP) virtual void pushReceivedData(const uint8_t* data, size_t len) { (void)data; (void)len; } virtual void connect(const std::string& addr, int port, bool nonblock = false) = 0; virtual void getAddress(std::string& addr) = 0; Loading @@ -207,14 +210,10 @@ namespace netplus { int _Type; ULONG_PTR _Extension; protected: #ifdef Windows WSAOVERLAPPED _Overlapped; WSABUF _wsaBuf; bool _Wait; int _Timeout; std::atomic<bool> _pendingIocpWrite{false}; #endif protected: // ✅ legacy helper still available void copyAddrInfo(ULONG_PTR* dest, ULONG_PTR src, size_t srclen); Loading Loading @@ -346,10 +345,10 @@ namespace netplus { // Check _send_queue (queued but not flushed) // Check _send_off (current record being transmitted via WSASend) // Check _pendingIocpWrite (async WSASend posted but not completed) return (!_send_queue.empty()) || (_send_off > 0) || _pendingIocpWrite; return (!_send_queue.empty()) || (_send_off > 0) || _pendingIocpWrite.load(); } void setPendingWrite(bool pending) override { _pendingIocpWrite = pending; } void setPendingWrite(bool pending) override { _pendingIocpWrite.store(pending); } bool getHandshakeDone() override { return _handshakeDone; } Loading @@ -369,6 +368,11 @@ namespace netplus { // Check if there's buffered data waiting to be processed bool hasBufferedData() const { return !_rx_tcp_buf.empty(); } // Push received data directly into the buffer (called by IOCP) void pushReceivedData(const uint8_t* data, size_t len) override { _rx_tcp_buf.insert(_rx_tcp_buf.end(), data, data + len); } private: // --- crypto helpers --- std::vector<uint8_t> _sha1_hash(const std::vector<uint8_t>& input); Loading Loading @@ -547,7 +551,7 @@ namespace netplus { std::deque<std::vector<uint8_t>> _send_queue; std::vector<uint8_t> _send_record; size_t _send_off = 0; bool _pendingIocpWrite = false; // IOCP WSASend posted but not yet completed std::atomic<bool> _pendingIocpWrite{false}; // IOCP WSASend posted but not yet completed std::vector<uint8_t> _rx_record_buf; std::vector<uint8_t> _rx_handshake_buf; Loading