Loading src/event/kqueue.cpp +49 −3 Original line number Diff line number Diff line Loading @@ -59,6 +59,10 @@ namespace netplus { static std::shared_mutex POLL_HANDLER_MUTEX; // Wakeup pipe: write end written by the signal handler (async-signal-safe) // to interrupt kevent() on shutdown. Read end registered in every kqueue. static int WAKEUP_PIPE[2] = {-1, -1}; // ------------------------------------------------------------ // Utility: safe close (no deadlock, no double close) // ------------------------------------------------------------ Loading Loading @@ -218,7 +222,7 @@ namespace netplus { poll(socket* serversocket, eventapi* evcon, int kqfd, int timeout) : pollapi(evcon, timeout), _kqfd(kqfd), _Events(new struct kevent[serversocket->getMaxconnections()]), _Events(new struct kevent[serversocket->getMaxconnections() + 1]), _ServerSocket(serversocket), _EventCount(0), _batch(kqfd) Loading @@ -228,7 +232,7 @@ namespace netplus { SSOCKETS.emplace_back(_ServerSocket); } for (int i = 0; i < _ServerSocket->getMaxconnections(); ++i) { for (int i = 0; i < _ServerSocket->getMaxconnections() + 1; ++i) { _Events[i].ident = 0; _Events[i].filter = 0; _Events[i].flags = 0; Loading @@ -236,6 +240,14 @@ namespace netplus { _Events[i].data = 0; _Events[i].udata = nullptr; } // Register the global wakeup pipe so the signal handler can // interrupt kevent() on shutdown without waiting for the timeout. if (WAKEUP_PIPE[0] >= 0) { struct kevent wev; EV_SET(&wev, WAKEUP_PIPE[0], EVFILT_READ, EV_ADD, 0, 0, nullptr); kevent(_kqfd, &wev, 1, nullptr, 0, nullptr); } } const char* getpolltype() override { return "KQUEUE"; } Loading Loading @@ -712,6 +724,15 @@ namespace netplus { } for (int i = 0; i < wait; ++i) { int fd = pollptr.getEventFd(i); // Wakeup pipe event — signal handler asked us to stop. if (fd == WAKEUP_PIPE[0]) { char c; (void)read(WAKEUP_PIPE[0], &c, 1); break; // while(Running) will then exit } int evtype = pollptr.pollState(i); if (evtype == pollapi::EVCON) { Loading @@ -723,7 +744,6 @@ namespace netplus { continue; pollptr.ConnectEventHandler(tid, args); } else { int fd = pollptr.getEventFd(i); int flags = pollptr.getEventFlags(i); pollptr.IoEventHandler(fd, flags, tid, args); } Loading Loading @@ -786,6 +806,11 @@ namespace netplus { if (signum != SIGINT) return; event::Running = false; // Wake up all kevent() calls so threads notice Running==false immediately. if (WAKEUP_PIPE[1] >= 0) { char c = 0; (void)write(WAKEUP_PIPE[1], &c, 1); } } // Called from runEventloop() AFTER all worker threads have Loading Loading @@ -849,6 +874,13 @@ void netplus::event::runEventloop(ULONG_PTR args) { signal(SIGINT, netplus::signalHandler); signal(SIGPIPE, SIG_IGN); // Create the global wakeup pipe once per run. if (netplus::WAKEUP_PIPE[0] < 0) { pipe(netplus::WAKEUP_PIPE); fcntl(netplus::WAKEUP_PIPE[0], F_SETFL, O_NONBLOCK); fcntl(netplus::WAKEUP_PIPE[1], F_SETFL, O_NONBLOCK); } netplus::EventWorkerArgs eargs; MAINWORKERLOOP: Loading Loading @@ -910,8 +942,22 @@ MAINWORKERLOOP: if (netplus::event::Restart) { netplus::event::Restart = false; // Reset the wakeup pipe for the next run. if (netplus::WAKEUP_PIPE[0] >= 0) { close(netplus::WAKEUP_PIPE[0]); close(netplus::WAKEUP_PIPE[1]); netplus::WAKEUP_PIPE[0] = netplus::WAKEUP_PIPE[1] = -1; } netplus::event::Running = true; goto MAINWORKERLOOP; } // Final shutdown: release the wakeup pipe. if (netplus::WAKEUP_PIPE[0] >= 0) { close(netplus::WAKEUP_PIPE[0]); close(netplus::WAKEUP_PIPE[1]); netplus::WAKEUP_PIPE[0] = netplus::WAKEUP_PIPE[1] = -1; } } // ================================================================ Loading Loading
src/event/kqueue.cpp +49 −3 Original line number Diff line number Diff line Loading @@ -59,6 +59,10 @@ namespace netplus { static std::shared_mutex POLL_HANDLER_MUTEX; // Wakeup pipe: write end written by the signal handler (async-signal-safe) // to interrupt kevent() on shutdown. Read end registered in every kqueue. static int WAKEUP_PIPE[2] = {-1, -1}; // ------------------------------------------------------------ // Utility: safe close (no deadlock, no double close) // ------------------------------------------------------------ Loading Loading @@ -218,7 +222,7 @@ namespace netplus { poll(socket* serversocket, eventapi* evcon, int kqfd, int timeout) : pollapi(evcon, timeout), _kqfd(kqfd), _Events(new struct kevent[serversocket->getMaxconnections()]), _Events(new struct kevent[serversocket->getMaxconnections() + 1]), _ServerSocket(serversocket), _EventCount(0), _batch(kqfd) Loading @@ -228,7 +232,7 @@ namespace netplus { SSOCKETS.emplace_back(_ServerSocket); } for (int i = 0; i < _ServerSocket->getMaxconnections(); ++i) { for (int i = 0; i < _ServerSocket->getMaxconnections() + 1; ++i) { _Events[i].ident = 0; _Events[i].filter = 0; _Events[i].flags = 0; Loading @@ -236,6 +240,14 @@ namespace netplus { _Events[i].data = 0; _Events[i].udata = nullptr; } // Register the global wakeup pipe so the signal handler can // interrupt kevent() on shutdown without waiting for the timeout. if (WAKEUP_PIPE[0] >= 0) { struct kevent wev; EV_SET(&wev, WAKEUP_PIPE[0], EVFILT_READ, EV_ADD, 0, 0, nullptr); kevent(_kqfd, &wev, 1, nullptr, 0, nullptr); } } const char* getpolltype() override { return "KQUEUE"; } Loading Loading @@ -712,6 +724,15 @@ namespace netplus { } for (int i = 0; i < wait; ++i) { int fd = pollptr.getEventFd(i); // Wakeup pipe event — signal handler asked us to stop. if (fd == WAKEUP_PIPE[0]) { char c; (void)read(WAKEUP_PIPE[0], &c, 1); break; // while(Running) will then exit } int evtype = pollptr.pollState(i); if (evtype == pollapi::EVCON) { Loading @@ -723,7 +744,6 @@ namespace netplus { continue; pollptr.ConnectEventHandler(tid, args); } else { int fd = pollptr.getEventFd(i); int flags = pollptr.getEventFlags(i); pollptr.IoEventHandler(fd, flags, tid, args); } Loading Loading @@ -786,6 +806,11 @@ namespace netplus { if (signum != SIGINT) return; event::Running = false; // Wake up all kevent() calls so threads notice Running==false immediately. if (WAKEUP_PIPE[1] >= 0) { char c = 0; (void)write(WAKEUP_PIPE[1], &c, 1); } } // Called from runEventloop() AFTER all worker threads have Loading Loading @@ -849,6 +874,13 @@ void netplus::event::runEventloop(ULONG_PTR args) { signal(SIGINT, netplus::signalHandler); signal(SIGPIPE, SIG_IGN); // Create the global wakeup pipe once per run. if (netplus::WAKEUP_PIPE[0] < 0) { pipe(netplus::WAKEUP_PIPE); fcntl(netplus::WAKEUP_PIPE[0], F_SETFL, O_NONBLOCK); fcntl(netplus::WAKEUP_PIPE[1], F_SETFL, O_NONBLOCK); } netplus::EventWorkerArgs eargs; MAINWORKERLOOP: Loading Loading @@ -910,8 +942,22 @@ MAINWORKERLOOP: if (netplus::event::Restart) { netplus::event::Restart = false; // Reset the wakeup pipe for the next run. if (netplus::WAKEUP_PIPE[0] >= 0) { close(netplus::WAKEUP_PIPE[0]); close(netplus::WAKEUP_PIPE[1]); netplus::WAKEUP_PIPE[0] = netplus::WAKEUP_PIPE[1] = -1; } netplus::event::Running = true; goto MAINWORKERLOOP; } // Final shutdown: release the wakeup pipe. if (netplus::WAKEUP_PIPE[0] >= 0) { close(netplus::WAKEUP_PIPE[0]); close(netplus::WAKEUP_PIPE[1]); netplus::WAKEUP_PIPE[0] = netplus::WAKEUP_PIPE[1] = -1; } } // ================================================================ Loading