From 51366006261b19dc75b56f7d2a0d2ba5580d5434 Mon Sep 17 00:00:00 2001 From: Taylor Brandstetter Date: Tue, 14 Mar 2023 16:45:49 -0700 Subject: [PATCH] Implement WaitPoll for Fuchsia Fuchsia's libc provides `select` and `poll` but not `epoll`. This CL adds a `WaitPoll` method, which is modeled after `WaitSelect` but uses `poll`. The pre-existing `WaitPoll` method was renamed to `WaitPollOneDispatcher`. TESTED="2p video call on Fuchsia. WaitPoll is faster compared to WaitSelect, primarily because WaitSelect pessimistically calls getsockopt(SO_ERROR) on each fd, while WaitPoll does so only on fds that have entered an error state." Original author: tombergan@google.com Bug: None Change-Id: I83cc824fca40d691fd93712c1c933ff21b3f877c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/296826 Reviewed-by: Markus Handell Commit-Queue: Mirko Bonadei Reviewed-by: Tom Bergan Reviewed-by: Taylor Brandstetter Cr-Commit-Position: refs/heads/main@{#39564} --- rtc_base/physical_socket_server.cc | 172 ++++++++++++++++++++++------- rtc_base/physical_socket_server.h | 37 +++++-- 2 files changed, 161 insertions(+), 48 deletions(-) diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc index b7d69140e0..be7d9680e4 100644 --- a/rtc_base/physical_socket_server.cc +++ b/rtc_base/physical_socket_server.cc @@ -25,6 +25,8 @@ #if defined(WEBRTC_USE_EPOLL) // "poll" will be used to wait for the signal dispatcher. #include +#elif defined(WEBRTC_USE_POLL) +#include #endif #include #include @@ -1266,17 +1268,22 @@ bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration, RTC_DCHECK(!waiting_); ScopedSetTrue s(&waiting_); const int cmsWait = ToCmsWait(max_wait_duration); + +#if defined(WEBRTC_USE_POLL) + return WaitPoll(cmsWait, process_io); +#else #if defined(WEBRTC_USE_EPOLL) // We don't keep a dedicated "epoll" descriptor containing only the non-IO // (i.e. signaling) dispatcher, so "poll" will be used instead of the default // "select" to support sockets larger than FD_SETSIZE. if (!process_io) { - return WaitPoll(cmsWait, signal_wakeup_); + return WaitPollOneDispatcher(cmsWait, signal_wakeup_); } else if (epoll_fd_ != INVALID_SOCKET) { return WaitEpoll(cmsWait); } #endif return WaitSelect(cmsWait, process_io); +#endif } // `error_event` is true if we are responding to an event where we know an @@ -1346,6 +1353,34 @@ static void ProcessEvents(Dispatcher* dispatcher, } } +#if defined(WEBRTC_USE_POLL) || defined(WEBRTC_USE_EPOLL) +static void ProcessPollEvents(Dispatcher* dispatcher, const pollfd& pfd) { + bool readable = (pfd.revents & (POLLIN | POLLPRI)); + bool writable = (pfd.revents & POLLOUT); + bool error = (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP)); + + ProcessEvents(dispatcher, readable, writable, error, error); +} + +static pollfd DispatcherToPollfd(Dispatcher* dispatcher) { + pollfd fd{ + .fd = dispatcher->GetDescriptor(), + .events = 0, + .revents = 0, + }; + + uint32_t ff = dispatcher->GetRequestedEvents(); + if (ff & (DE_READ | DE_ACCEPT)) { + fd.events |= POLLIN; + } + if (ff & (DE_WRITE | DE_CONNECT)) { + fd.events |= POLLOUT; + } + + return fd; +} +#endif // WEBRTC_USE_POLL || WEBRTC_USE_EPOLL + bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { // Calculate timing information @@ -1387,7 +1422,6 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { for (auto const& kv : dispatcher_by_key_) { uint64_t key = kv.first; Dispatcher* pdispatcher = kv.second; - // Query dispatchers for read and write wait state if (!process_io && (pdispatcher != signal_wakeup_)) continue; current_dispatcher_keys_.push_back(key); @@ -1428,9 +1462,9 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { } else { // We have signaled descriptors CritScope cr(&crit_); - // Iterate only on the dispatchers whose sockets were passed into - // WSAEventSelect; this avoids the ABA problem (a socket being - // destroyed and a new one created with the same file descriptor). + // Iterate only on the dispatchers whose file descriptors were passed into + // select; this avoids the ABA problem (a socket being destroyed and a new + // one created with the same file descriptor). for (uint64_t key : current_dispatcher_keys_) { if (!dispatcher_by_key_.count(key)) continue; @@ -1547,11 +1581,11 @@ void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) { bool PhysicalSocketServer::WaitEpoll(int cmsWait) { RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); - int64_t tvWait = -1; - int64_t tvStop = -1; + int64_t msWait = -1; + int64_t msStop = -1; if (cmsWait != kForeverMs) { - tvWait = cmsWait; - tvStop = TimeAfter(cmsWait); + msWait = cmsWait; + msStop = TimeAfter(cmsWait); } fWait_ = true; @@ -1561,7 +1595,7 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) { // 0 means timeout // > 0 means count of descriptors ready int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(), - static_cast(tvWait)); + static_cast(msWait)); if (n < 0) { if (errno != EINTR) { RTC_LOG_E(LS_ERROR, EN, errno) << "epoll"; @@ -1595,8 +1629,8 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) { } if (cmsWait != kForeverMs) { - tvWait = TimeDiff(tvStop, TimeMillis()); - if (tvWait <= 0) { + msWait = TimeDiff(msStop, TimeMillis()); + if (msWait <= 0) { // Return success on timeout. return true; } @@ -1606,37 +1640,27 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) { return true; } -bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { +bool PhysicalSocketServer::WaitPollOneDispatcher(int cmsWait, + Dispatcher* dispatcher) { RTC_DCHECK(dispatcher); - int64_t tvWait = -1; - int64_t tvStop = -1; + int64_t msWait = -1; + int64_t msStop = -1; if (cmsWait != kForeverMs) { - tvWait = cmsWait; - tvStop = TimeAfter(cmsWait); + msWait = cmsWait; + msStop = TimeAfter(cmsWait); } fWait_ = true; - - struct pollfd fds = {0}; - int fd = dispatcher->GetDescriptor(); - fds.fd = fd; + const int fd = dispatcher->GetDescriptor(); while (fWait_) { - uint32_t ff = dispatcher->GetRequestedEvents(); - fds.events = 0; - if (ff & (DE_READ | DE_ACCEPT)) { - fds.events |= POLLIN; - } - if (ff & (DE_WRITE | DE_CONNECT)) { - fds.events |= POLLOUT; - } - fds.revents = 0; + auto fds = DispatcherToPollfd(dispatcher); // Wait then call handlers as appropriate // < 0 means error // 0 means timeout // > 0 means count of descriptors ready - int n = poll(&fds, 1, static_cast(tvWait)); + int n = poll(&fds, 1, static_cast(msWait)); if (n < 0) { if (errno != EINTR) { RTC_LOG_E(LS_ERROR, EN, errno) << "poll"; @@ -1653,17 +1677,12 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { // We have signaled descriptors (should only be the passed dispatcher). RTC_DCHECK_EQ(n, 1); RTC_DCHECK_EQ(fds.fd, fd); - - bool readable = (fds.revents & (POLLIN | POLLPRI)); - bool writable = (fds.revents & POLLOUT); - bool error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP)); - - ProcessEvents(dispatcher, readable, writable, error, error); + ProcessPollEvents(dispatcher, fds); } if (cmsWait != kForeverMs) { - tvWait = TimeDiff(tvStop, TimeMillis()); - if (tvWait < 0) { + msWait = TimeDiff(msStop, TimeMillis()); + if (msWait < 0) { // Return success on timeout. return true; } @@ -1673,7 +1692,80 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { return true; } -#endif // WEBRTC_USE_EPOLL +#elif defined(WEBRTC_USE_POLL) + +bool PhysicalSocketServer::WaitPoll(int cmsWait, bool process_io) { + int64_t msWait = -1; + int64_t msStop = -1; + if (cmsWait != kForeverMs) { + msWait = cmsWait; + msStop = TimeAfter(cmsWait); + } + + std::vector pollfds; + fWait_ = true; + + while (fWait_) { + { + CritScope cr(&crit_); + current_dispatcher_keys_.clear(); + pollfds.clear(); + pollfds.reserve(dispatcher_by_key_.size()); + + for (auto const& kv : dispatcher_by_key_) { + uint64_t key = kv.first; + Dispatcher* pdispatcher = kv.second; + if (!process_io && (pdispatcher != signal_wakeup_)) + continue; + current_dispatcher_keys_.push_back(key); + pollfds.push_back(DispatcherToPollfd(pdispatcher)); + } + } + + // Wait then call handlers as appropriate + // < 0 means error + // 0 means timeout + // > 0 means count of descriptors ready + int n = poll(pollfds.data(), pollfds.size(), static_cast(msWait)); + if (n < 0) { + if (errno != EINTR) { + RTC_LOG_E(LS_ERROR, EN, errno) << "poll"; + return false; + } + // Else ignore the error and keep going. If this EINTR was for one of the + // signals managed by this PhysicalSocketServer, the + // PosixSignalDeliveryDispatcher will be in the signaled state in the next + // iteration. + } else if (n == 0) { + // If timeout, return success + return true; + } else { + // We have signaled descriptors + CritScope cr(&crit_); + // Iterate only on the dispatchers whose file descriptors were passed into + // poll; this avoids the ABA problem (a socket being destroyed and a new + // one created with the same file descriptor). + for (size_t i = 0; i < current_dispatcher_keys_.size(); ++i) { + uint64_t key = current_dispatcher_keys_[i]; + if (!dispatcher_by_key_.count(key)) + continue; + ProcessPollEvents(dispatcher_by_key_.at(key), pollfds[i]); + } + } + + if (cmsWait != kForeverMs) { + msWait = TimeDiff(msStop, TimeMillis()); + if (msWait < 0) { + // Return success on timeout. + return true; + } + } + } + + return true; +} + +#endif // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL #endif // WEBRTC_POSIX diff --git a/rtc_base/physical_socket_server.h b/rtc_base/physical_socket_server.h index 7b11780d1e..650db80931 100644 --- a/rtc_base/physical_socket_server.h +++ b/rtc_base/physical_socket_server.h @@ -12,10 +12,21 @@ #define RTC_BASE_PHYSICAL_SOCKET_SERVER_H_ #include "api/units/time_delta.h" -#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) + +#if defined(WEBRTC_POSIX) +#if defined(WEBRTC_LINUX) +// On Linux, use epoll. #include #define WEBRTC_USE_EPOLL 1 -#endif +#elif defined(WEBRTC_FUCHSIA) +// Fuchsia implements select and poll but not epoll, and testing shows that poll +// is faster than select. +#include +#define WEBRTC_USE_POLL 1 +#else +// On other POSIX systems, use select by default. +#endif // WEBRTC_LINUX, WEBRTC_FUCHSIA +#endif // WEBRTC_POSIX #include #include @@ -89,15 +100,16 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer { static constexpr int kForeverMs = -1; static int ToCmsWait(webrtc::TimeDelta max_wait_duration); + #if defined(WEBRTC_POSIX) bool WaitSelect(int cmsWait, bool process_io); -#endif // WEBRTC_POSIX + #if defined(WEBRTC_USE_EPOLL) void AddEpoll(Dispatcher* dispatcher, uint64_t key); void RemoveEpoll(Dispatcher* dispatcher); void UpdateEpoll(Dispatcher* dispatcher, uint64_t key); bool WaitEpoll(int cmsWait); - bool WaitPoll(int cmsWait, Dispatcher* dispatcher); + bool WaitPollOneDispatcher(int cmsWait, Dispatcher* dispatcher); // This array is accessed in isolation by a thread calling into Wait(). // It's useless to use a SequenceChecker to guard it because a socket @@ -105,7 +117,16 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer { // to have to reset the sequence checker on Wait calls. std::array epoll_events_; const int epoll_fd_ = INVALID_SOCKET; -#endif // WEBRTC_USE_EPOLL + +#elif defined(WEBRTC_USE_POLL) + void AddPoll(Dispatcher* dispatcher, uint64_t key); + void RemovePoll(Dispatcher* dispatcher); + void UpdatePoll(Dispatcher* dispatcher, uint64_t key); + bool WaitPoll(int cmsWait, bool process_io); + +#endif // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL +#endif // WEBRTC_POSIX + // uint64_t keys are used to uniquely identify a dispatcher in order to avoid // the ABA problem during the epoll loop (a dispatcher being destroyed and // replaced by one with the same address). @@ -116,9 +137,9 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer { std::unordered_map key_by_dispatcher_ RTC_GUARDED_BY(crit_); // A list of dispatcher keys that we're interested in for the current - // select() or WSAWaitForMultipleEvents() loop. Again, used to avoid the ABA - // problem (a socket being destroyed and a new one created with the same - // handle, erroneously receiving the events from the destroyed socket). + // select(), poll(), or WSAWaitForMultipleEvents() loop. Again, used to avoid + // the ABA problem (a socket being destroyed and a new one created with the + // same handle, erroneously receiving the events from the destroyed socket). // // Kept as a member variable just for efficiency. std::vector current_dispatcher_keys_;