Commit 488dac79 authored by jan.koester's avatar jan.koester
Browse files

test

parent 6be9429b
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -72,3 +72,10 @@
   ...
   fun:_ZN7netplus11EventWorkerC1EimPNS_15EventWorkerArgsE*
}

{
   eventworker_thread_exit_holds_lock
   Helgrind:Misc
   fun:start_thread
   fun:clone
}
+40 −0
Original line number Diff line number Diff line
@@ -847,6 +847,46 @@ namespace netplus {
        return r > 0 && (out.events & EPOLLOUT);
    }

    int socketwait::waitReadMulti(std::vector<socket*> &sockets,
                                   std::vector<bool> &ready,
                                   int timeout_ms) {
        size_t n = sockets.size();
        ready.assign(n, false);
        if (n == 0) return 0;

        // Register all fds
        for (size_t i = 0; i < n; ++i) {
            if (!sockets[i]) continue;
            SOCKET fd = sockets[i]->fd();
            struct epoll_event ev{};
            ev.events  = EPOLLIN;
            ev.data.u64 = i;
            epoll_ctl(_handle, EPOLL_CTL_ADD, fd, &ev);
        }

        std::vector<struct epoll_event> events(n);
        int r = ::epoll_wait(_handle, events.data(),
                             static_cast<int>(n), timeout_ms);

        int count = 0;
        if (r > 0) {
            for (int j = 0; j < r; ++j) {
                size_t idx = events[j].data.u64;
                if (idx < n && (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLERR))) {
                    ready[idx] = true;
                    ++count;
                }
            }
        }

        // Unregister all fds
        for (size_t i = 0; i < n; ++i) {
            if (!sockets[i]) continue;
            epoll_ctl(_handle, EPOLL_CTL_DEL, sockets[i]->fd(), nullptr);
        }
        return count;
    }

    bool socketwait::isDisconnected(socket &sock) {
        SOCKET fd = sock.fd();
        struct epoll_event ev{};
+38 −0
Original line number Diff line number Diff line
@@ -698,6 +698,44 @@ namespace netplus {
        return r > 0 && FD_ISSET(fd, &wset);
    }

    int socketwait::waitReadMulti(std::vector<socket*> &sockets,
                                   std::vector<bool> &ready,
                                   int timeout_ms) {
        size_t n = sockets.size();
        ready.assign(n, false);
        if (n == 0) return 0;

        fd_set rset, eset;
        FD_ZERO(&rset); FD_ZERO(&eset);
        for (size_t i = 0; i < n; ++i) {
            if (!sockets[i]) continue;
            SOCKET fd = sockets[i]->fd();
            if (fd == INVALID_SOCKET) continue;
            FD_SET(fd, &rset);
            FD_SET(fd, &eset);
        }
        timeval tv_buf;
        timeval* tvp = nullptr;
        if (timeout_ms >= 0) {
            tv_buf.tv_sec  = timeout_ms / 1000;
            tv_buf.tv_usec = (timeout_ms % 1000) * 1000;
            tvp = &tv_buf;
        }
        int r = ::select(0, &rset, nullptr, &eset, tvp);
        if (r <= 0) return 0;
        int count = 0;
        for (size_t i = 0; i < n; ++i) {
            if (!sockets[i]) continue;
            SOCKET fd = sockets[i]->fd();
            if (fd == INVALID_SOCKET) continue;
            if (FD_ISSET(fd, &rset) || FD_ISSET(fd, &eset)) {
                ready[i] = true;
                ++count;
            }
        }
        return count;
    }

    bool socketwait::isDisconnected(socket &sock) {
        SOCKET fd = sock.fd();
        if (fd == INVALID_SOCKET) return true;
+40 −0
Original line number Diff line number Diff line
@@ -962,6 +962,46 @@ bool netplus::socketwait::waitWrite(socket &sock, int timeout_ms) {
    return r > 0;
}

int netplus::socketwait::waitReadMulti(std::vector<socket*> &sockets,
                                        std::vector<bool> &ready,
                                        int timeout_ms) {
    size_t n = sockets.size();
    ready.assign(n, false);
    if (n == 0) return 0;

    std::vector<struct kevent> changes(n);
    size_t nchanges = 0;
    for (size_t i = 0; i < n; ++i) {
        if (!sockets[i]) continue;
        EV_SET(&changes[nchanges], sockets[i]->fd(), EVFILT_READ,
               EV_ADD | EV_ONESHOT, 0, 0, reinterpret_cast<void*>(i));
        ++nchanges;
    }

    struct timespec ts_buf;
    struct timespec* tsp = nullptr;
    if (timeout_ms >= 0) {
        ts_buf.tv_sec  = timeout_ms / 1000;
        ts_buf.tv_nsec = (timeout_ms % 1000) * 1000000L;
        tsp = &ts_buf;
    }

    std::vector<struct kevent> events(n);
    int r = ::kevent(_handle, changes.data(), static_cast<int>(nchanges),
                     events.data(), static_cast<int>(n), tsp);
    int count = 0;
    if (r > 0) {
        for (int j = 0; j < r; ++j) {
            size_t idx = reinterpret_cast<size_t>(events[j].udata);
            if (idx < n) {
                ready[idx] = true;
                ++count;
            }
        }
    }
    return count;
}

bool netplus::socketwait::isDisconnected(socket &sock) {
    SOCKET fd = sock.fd();
    struct kevent change;
+66 −0
Original line number Diff line number Diff line
@@ -934,6 +934,44 @@ namespace netplus {
        return r > 0 && FD_ISSET(fd, &wset);
    }

    int socketwait::waitReadMulti(std::vector<socket*> &sockets,
                                   std::vector<bool> &ready,
                                   int timeout_ms) {
        size_t n = sockets.size();
        ready.assign(n, false);
        if (n == 0) return 0;

        fd_set rset, eset;
        FD_ZERO(&rset); FD_ZERO(&eset);
        for (size_t i = 0; i < n; ++i) {
            if (!sockets[i]) continue;
            SOCKET fd = sockets[i]->fd();
            if (fd == INVALID_SOCKET) continue;
            FD_SET(fd, &rset);
            FD_SET(fd, &eset);
        }
        timeval tv_buf;
        timeval* tvp = nullptr;
        if (timeout_ms >= 0) {
            tv_buf.tv_sec  = timeout_ms / 1000;
            tv_buf.tv_usec = (timeout_ms % 1000) * 1000;
            tvp = &tv_buf;
        }
        int r = ::select(0, &rset, nullptr, &eset, tvp);
        if (r <= 0) return 0;
        int count = 0;
        for (size_t i = 0; i < n; ++i) {
            if (!sockets[i]) continue;
            SOCKET fd = sockets[i]->fd();
            if (fd == INVALID_SOCKET) continue;
            if (FD_ISSET(fd, &rset) || FD_ISSET(fd, &eset)) {
                ready[i] = true;
                ++count;
            }
        }
        return count;
    }

    bool socketwait::isDisconnected(socket &sock) {
        SOCKET fd = sock.fd();
        if (fd == INVALID_SOCKET) return true;
@@ -988,6 +1026,34 @@ namespace netplus {
        return r > 0 && (pfd.revents & POLLOUT);
    }

    int socketwait::waitReadMulti(std::vector<socket*> &sockets,
                                   std::vector<bool> &ready,
                                   int timeout_ms) {
        size_t n = sockets.size();
        ready.assign(n, false);
        if (n == 0) return 0;

        std::vector<struct pollfd> pfds(n);
        for (size_t i = 0; i < n; ++i) {
            pfds[i].fd = sockets[i] ? sockets[i]->fd() : -1;
            pfds[i].events = POLLIN;
            pfds[i].revents = 0;
        }
        int r;
        do {
            r = ::poll(pfds.data(), static_cast<nfds_t>(n), timeout_ms);
        } while (r < 0 && errno == EINTR);
        if (r <= 0) return 0;
        int count = 0;
        for (size_t i = 0; i < n; ++i) {
            if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
                ready[i] = true;
                ++count;
            }
        }
        return count;
    }

    bool socketwait::isDisconnected(socket &sock) {
        struct pollfd pfd{};
        pfd.fd = sock.fd();
Loading