From 22dd32d4e99af8ac015e1b012aff421ef585d78c Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Wed, 5 Aug 2020 14:47:03 -0700 Subject: [PATCH 1/2] wip kqueue, with selectinterrupt not implemented + we create a queue every time --- ixwebsocket/IXSocket.cpp | 104 ++++++++++++++++++++++++++++++++++++++- ixwebsocket/IXSocket.h | 11 +++++ 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index 9ef78bb7..7ee9bc1c 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -35,12 +35,18 @@ namespace ix : _sockfd(fd) , _selectInterrupt(createSelectInterrupt()) { - ; +#if defined(__APPLE__) + _kqueuefd = kqueue(); +#endif } Socket::~Socket() { close(); + +#if defined(__APPLE__) + ::close(_kqueuefd); +#endif } PollResultType Socket::poll(bool readyToRead, @@ -48,6 +54,101 @@ namespace ix int sockfd, const SelectInterruptPtr& selectInterrupt) { +#if defined(__APPLE__) + int kqueuefd = kqueue(); + + struct kevent ke; + EV_SET(&ke, sockfd, (readyToRead) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0, 0, NULL); + if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error; + + int retval, numevents = 0; + + int nfds = 1; +#if 0 + if (selectInterrupt) + { + nfds = 2; + int interruptFd = selectInterrupt->getFd(); + + struct kevent ke; + EV_SET(&ke, interruptFd, EVFILT_READ, EV_ADD, 0, 0, NULL); + if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error; + } +#endif + + struct kevent *events; + events = (struct kevent*) malloc(sizeof(struct kevent) * nfds); + + if (timeoutMs != 0) + { + struct timespec timeout; + timeout.tv_sec = timeoutMs / 1000; + timeout.tv_nsec = (timeoutMs % 1000) * 1000 * 1000; + retval = kevent(kqueuefd, NULL, 0, events, nfds, &timeout); + } + else + { + retval = kevent(kqueuefd, NULL, 0, events, nfds, NULL); + } + +#if 0 + if (retval > 0) { + int j; + + numevents = retval; + for(j = 0; j < numevents; j++) { + int mask = 0; + struct kevent *e = events+j; + + if (e->filter == EVFILT_READ) mask |= AE_READABLE; + if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; + eventLoop->fired[j].fd = e->ident; + eventLoop->fired[j].mask = mask; + } + } +#else + PollResultType pollResult = PollResultType::ReadyForRead; + if (retval < 0) + { + pollResult = PollResultType::Error; + } + + if (retval > 0) { + struct kevent *e = events; + if (e->filter == EVFILT_READ) + { + pollResult = PollResultType::ReadyForRead; + } + else if (e->filter == EVFILT_WRITE) + { + pollResult = PollResultType::ReadyForWrite; + + int optval = -1; + socklen_t optlen = sizeof(optval); + + // getsockopt() puts the errno value for connect into optval so 0 + // means no-error. + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 || optval != 0) + { + pollResult = PollResultType::Error; + + // set errno to optval so that external callers can have an + // appropriate error description when calling strerror + errno = optval; + } + } + } + else + { + pollResult = PollResultType::Timeout; + } +#endif + + free(events); + ::close(kqueuefd); + + return pollResult; +#else // // We used to use ::select to poll but on Android 9 we get large fds out of // ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching @@ -142,6 +243,7 @@ namespace ix } return pollResult; +#endif } PollResultType Socket::isReadyToRead(int timeoutMs) diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index 84b0b737..ff131c00 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -13,6 +13,13 @@ #include #include +// For kqueue +#if defined(__APPLE__) +#include +#include +#include +#endif + #ifdef _WIN32 #include typedef SSIZE_T ssize_t; @@ -114,5 +121,9 @@ namespace ix static constexpr size_t kChunkSize = 1 << 15; SelectInterruptPtr _selectInterrupt; + +#if defined(__APPLE__) + int _kqueuefd; +#endif }; } // namespace ix From 0834198e74dc0cb5c093b342bc8551e444a2658a Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Wed, 5 Aug 2020 15:00:01 -0700 Subject: [PATCH 2/2] do not create a kqueue everytime we call poll --- ixwebsocket/IXSocket.cpp | 12 +++++++----- ixwebsocket/IXSocket.h | 3 ++- ixwebsocket/IXSocketConnect.cpp | 5 ++++- ixwebsocket/IXSocketServer.cpp | 5 ++++- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index 7ee9bc1c..c55a6814 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -52,10 +52,11 @@ namespace ix PollResultType Socket::poll(bool readyToRead, int timeoutMs, int sockfd, - const SelectInterruptPtr& selectInterrupt) + const SelectInterruptPtr& selectInterrupt, + int kqueuefd) { #if defined(__APPLE__) - int kqueuefd = kqueue(); + // FIXME int kqueuefd = kqueue(); struct kevent ke; EV_SET(&ke, sockfd, (readyToRead) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0, 0, NULL); @@ -145,7 +146,8 @@ namespace ix #endif free(events); - ::close(kqueuefd); + + // ::close(kqueuefd); //FMXE return pollResult; #else @@ -254,7 +256,7 @@ namespace ix } bool readyToRead = true; - return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); + return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd); } PollResultType Socket::isReadyToWrite(int timeoutMs) @@ -265,7 +267,7 @@ namespace ix } bool readyToRead = false; - return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); + return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd); } // Wake up from poll/select by writing to the pipe which is watched by select diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index ff131c00..7f7c1e8b 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -101,7 +101,8 @@ namespace ix static PollResultType poll(bool readyToRead, int timeoutMs, int sockfd, - const SelectInterruptPtr& selectInterrupt); + const SelectInterruptPtr& selectInterrupt, + int kqueuefd); // Used as special codes for pipe communication diff --git a/ixwebsocket/IXSocketConnect.cpp b/ixwebsocket/IXSocketConnect.cpp index fea01897..9732d427 100644 --- a/ixwebsocket/IXSocketConnect.cpp +++ b/ixwebsocket/IXSocketConnect.cpp @@ -66,7 +66,10 @@ namespace ix int timeoutMs = 10; bool readyToRead = false; auto selectInterrupt = std::make_unique(); - PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt); + + int kqueuefd = kqueue(); + PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt, kqueuefd); + ::close(kqueuefd); if (pollResult == PollResultType::Timeout) { diff --git a/ixwebsocket/IXSocketServer.cpp b/ixwebsocket/IXSocketServer.cpp index 6c33420b..8e7c1d87 100644 --- a/ixwebsocket/IXSocketServer.cpp +++ b/ixwebsocket/IXSocketServer.cpp @@ -259,8 +259,11 @@ namespace ix int timeoutMs = 10; bool readyToRead = true; auto selectInterrupt = std::make_unique(); + + int kqueuefd = kqueue(); PollResultType pollResult = - Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt); + Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt, kqueuefd); + ::close(kqueuefd); if (pollResult == PollResultType::Error) {