Loading src/event/epoll.cpp +50 −4 Original line number Original line Diff line number Diff line Loading @@ -38,6 +38,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include <unistd.h> #include <unistd.h> #include <signal.h> #include <signal.h> #include <sys/epoll.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <fcntl.h> #include <fcntl.h> #include <errno.h> #include <errno.h> #include <cstring> #include <cstring> Loading @@ -56,6 +57,10 @@ namespace netplus { static std::shared_mutex POLL_HANDLER_MUTEX; static std::shared_mutex POLL_HANDLER_MUTEX; // Wakeup-FD: written by the signal handler to interrupt epoll_wait on shutdown. // async-signal-safe: write(2) is listed as AS-safe. static int WAKEUP_FD = -1; // ------------------------------------------------------------ // ------------------------------------------------------------ // Utility: safe close (no deadlock, no double close) // Utility: safe close (no deadlock, no double close) // ------------------------------------------------------------ // ------------------------------------------------------------ Loading Loading @@ -134,7 +139,7 @@ namespace netplus { poll(socket* serversocket, eventapi* evcon, int pollfd, int timeout) poll(socket* serversocket, eventapi* evcon, int pollfd, int timeout) : pollapi(evcon, timeout), : pollapi(evcon, timeout), _pollFD(pollfd), _pollFD(pollfd), _Events(new epoll_event[serversocket->getMaxconnections()]), _Events(new epoll_event[serversocket->getMaxconnections() + 1]), _ServerSocket(serversocket) _ServerSocket(serversocket) { { { { Loading @@ -142,10 +147,19 @@ namespace netplus { SSOCKETS.emplace_back(_ServerSocket); SSOCKETS.emplace_back(_ServerSocket); } } for (int i = 0; i < _ServerSocket->getMaxconnections(); ++i) { for (int i = 0; i < _ServerSocket->getMaxconnections() + 1; ++i) { _Events[i].events = -1; _Events[i].events = -1; _Events[i].data.fd = _ServerSocket->fd(); _Events[i].data.fd = _ServerSocket->fd(); } } // Register the global wakeup eventfd so the signal handler can // interrupt epoll_wait on shutdown without waiting for the timeout. if (WAKEUP_FD >= 0) { epoll_event wev{}; wev.events = EPOLLIN; wev.data.fd = WAKEUP_FD; epoll_ctl(_pollFD, EPOLL_CTL_ADD, WAKEUP_FD, &wev); } } } const char* getpolltype() override { return "EPOLL"; } const char* getpolltype() override { return "EPOLL"; } Loading Loading @@ -607,6 +621,15 @@ namespace netplus { // the batch. With EPOLLONESHOT, skipped // the batch. With EPOLLONESHOT, skipped // events leave fds permanently disabled. // events leave fds permanently disabled. try { try { int fd = pollptr.getEventFd(i); // Wakeup-FD event — signal handler asked us to stop. if (fd == WAKEUP_FD) { uint64_t val; (void)read(WAKEUP_FD, &val, sizeof(val)); break; } int evtype = pollptr.pollState(i); int evtype = pollptr.pollState(i); if (evtype == pollapi::EVCON) { if (evtype == pollapi::EVCON) { Loading @@ -614,7 +637,6 @@ namespace netplus { continue; continue; } } int fd = pollptr.getEventFd(i); int flags = pollptr.getEventFlags(i); int flags = pollptr.getEventFlags(i); pollptr.IoEventHandler(fd, flags, tid, args); pollptr.IoEventHandler(fd, flags, tid, args); Loading Loading @@ -678,6 +700,11 @@ namespace netplus { if (signum != SIGINT) if (signum != SIGINT) return; return; event::Running = false; event::Running = false; // Wake up all epoll_wait calls so threads notice Running==false immediately. if (WAKEUP_FD >= 0) { uint64_t val = 1; (void)write(WAKEUP_FD, &val, sizeof(val)); } } } // Called from runEventloop() AFTER all worker threads have // Called from runEventloop() AFTER all worker threads have Loading Loading @@ -745,6 +772,13 @@ namespace netplus { signal(SIGINT, signalHandler); signal(SIGINT, signalHandler); signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN); // Create the global wakeup eventfd once per run. // EFD_SEMAPHORE is NOT used: a single write wakes all epoll instances // because each epoll_wait sees EPOLLIN and drains the counter independently. if (WAKEUP_FD < 0) { WAKEUP_FD = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); } EventWorkerArgs eargs; EventWorkerArgs eargs; MAINWORKERLOOP: MAINWORKERLOOP: Loading Loading @@ -807,8 +841,20 @@ namespace netplus { if (event::Restart) { if (event::Restart) { event::Restart = false; event::Restart = false; // Reset the wakeup eventfd for the next run. if (WAKEUP_FD >= 0) { close(WAKEUP_FD); WAKEUP_FD = -1; } event::Running = true; goto MAINWORKERLOOP; goto MAINWORKERLOOP; } } // Final shutdown: release the wakeup eventfd. if (WAKEUP_FD >= 0) { close(WAKEUP_FD); WAKEUP_FD = -1; } } } // ================================================================ // ================================================================ Loading Loading
src/event/epoll.cpp +50 −4 Original line number Original line Diff line number Diff line Loading @@ -38,6 +38,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include <unistd.h> #include <unistd.h> #include <signal.h> #include <signal.h> #include <sys/epoll.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <fcntl.h> #include <fcntl.h> #include <errno.h> #include <errno.h> #include <cstring> #include <cstring> Loading @@ -56,6 +57,10 @@ namespace netplus { static std::shared_mutex POLL_HANDLER_MUTEX; static std::shared_mutex POLL_HANDLER_MUTEX; // Wakeup-FD: written by the signal handler to interrupt epoll_wait on shutdown. // async-signal-safe: write(2) is listed as AS-safe. static int WAKEUP_FD = -1; // ------------------------------------------------------------ // ------------------------------------------------------------ // Utility: safe close (no deadlock, no double close) // Utility: safe close (no deadlock, no double close) // ------------------------------------------------------------ // ------------------------------------------------------------ Loading Loading @@ -134,7 +139,7 @@ namespace netplus { poll(socket* serversocket, eventapi* evcon, int pollfd, int timeout) poll(socket* serversocket, eventapi* evcon, int pollfd, int timeout) : pollapi(evcon, timeout), : pollapi(evcon, timeout), _pollFD(pollfd), _pollFD(pollfd), _Events(new epoll_event[serversocket->getMaxconnections()]), _Events(new epoll_event[serversocket->getMaxconnections() + 1]), _ServerSocket(serversocket) _ServerSocket(serversocket) { { { { Loading @@ -142,10 +147,19 @@ namespace netplus { SSOCKETS.emplace_back(_ServerSocket); SSOCKETS.emplace_back(_ServerSocket); } } for (int i = 0; i < _ServerSocket->getMaxconnections(); ++i) { for (int i = 0; i < _ServerSocket->getMaxconnections() + 1; ++i) { _Events[i].events = -1; _Events[i].events = -1; _Events[i].data.fd = _ServerSocket->fd(); _Events[i].data.fd = _ServerSocket->fd(); } } // Register the global wakeup eventfd so the signal handler can // interrupt epoll_wait on shutdown without waiting for the timeout. if (WAKEUP_FD >= 0) { epoll_event wev{}; wev.events = EPOLLIN; wev.data.fd = WAKEUP_FD; epoll_ctl(_pollFD, EPOLL_CTL_ADD, WAKEUP_FD, &wev); } } } const char* getpolltype() override { return "EPOLL"; } const char* getpolltype() override { return "EPOLL"; } Loading Loading @@ -607,6 +621,15 @@ namespace netplus { // the batch. With EPOLLONESHOT, skipped // the batch. With EPOLLONESHOT, skipped // events leave fds permanently disabled. // events leave fds permanently disabled. try { try { int fd = pollptr.getEventFd(i); // Wakeup-FD event — signal handler asked us to stop. if (fd == WAKEUP_FD) { uint64_t val; (void)read(WAKEUP_FD, &val, sizeof(val)); break; } int evtype = pollptr.pollState(i); int evtype = pollptr.pollState(i); if (evtype == pollapi::EVCON) { if (evtype == pollapi::EVCON) { Loading @@ -614,7 +637,6 @@ namespace netplus { continue; continue; } } int fd = pollptr.getEventFd(i); int flags = pollptr.getEventFlags(i); int flags = pollptr.getEventFlags(i); pollptr.IoEventHandler(fd, flags, tid, args); pollptr.IoEventHandler(fd, flags, tid, args); Loading Loading @@ -678,6 +700,11 @@ namespace netplus { if (signum != SIGINT) if (signum != SIGINT) return; return; event::Running = false; event::Running = false; // Wake up all epoll_wait calls so threads notice Running==false immediately. if (WAKEUP_FD >= 0) { uint64_t val = 1; (void)write(WAKEUP_FD, &val, sizeof(val)); } } } // Called from runEventloop() AFTER all worker threads have // Called from runEventloop() AFTER all worker threads have Loading Loading @@ -745,6 +772,13 @@ namespace netplus { signal(SIGINT, signalHandler); signal(SIGINT, signalHandler); signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN); // Create the global wakeup eventfd once per run. // EFD_SEMAPHORE is NOT used: a single write wakes all epoll instances // because each epoll_wait sees EPOLLIN and drains the counter independently. if (WAKEUP_FD < 0) { WAKEUP_FD = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); } EventWorkerArgs eargs; EventWorkerArgs eargs; MAINWORKERLOOP: MAINWORKERLOOP: Loading Loading @@ -807,8 +841,20 @@ namespace netplus { if (event::Restart) { if (event::Restart) { event::Restart = false; event::Restart = false; // Reset the wakeup eventfd for the next run. if (WAKEUP_FD >= 0) { close(WAKEUP_FD); WAKEUP_FD = -1; } event::Running = true; goto MAINWORKERLOOP; goto MAINWORKERLOOP; } } // Final shutdown: release the wakeup eventfd. if (WAKEUP_FD >= 0) { close(WAKEUP_FD); WAKEUP_FD = -1; } } } // ================================================================ // ================================================================ Loading