Implement WaitPoll for Fuchsia

Fuchsia's libc provides `select` and `poll` but not `epoll`.

This CL adds a `WaitPoll` method, which is modeled after `WaitSelect` but uses `poll`. The pre-existing `WaitPoll` method was renamed to `WaitPollOneDispatcher`.

TESTED="2p video call on Fuchsia. WaitPoll is faster compared to
WaitSelect, primarily because WaitSelect pessimistically calls
getsockopt(SO_ERROR) on each fd, while WaitPoll does so only on fds that
have entered an error state."

Original author: tombergan@google.com

Bug: None
Change-Id: I83cc824fca40d691fd93712c1c933ff21b3f877c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/296826
Reviewed-by: Markus Handell <handellm@webrtc.org>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Tom Bergan <tombergan@google.com>
Reviewed-by: Taylor Brandstetter <deadbeef@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39564}
This commit is contained in:
Taylor Brandstetter 2023-03-14 16:45:49 -07:00 committed by WebRTC LUCI CQ
parent 4e1c9570ed
commit 5136600626
2 changed files with 161 additions and 48 deletions

View File

@ -25,6 +25,8 @@
#if defined(WEBRTC_USE_EPOLL)
// "poll" will be used to wait for the signal dispatcher.
#include <poll.h>
#elif defined(WEBRTC_USE_POLL)
#include <poll.h>
#endif
#include <sys/ioctl.h>
#include <sys/select.h>
@ -1266,17 +1268,22 @@ bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
RTC_DCHECK(!waiting_);
ScopedSetTrue s(&waiting_);
const int cmsWait = ToCmsWait(max_wait_duration);
#if defined(WEBRTC_USE_POLL)
return WaitPoll(cmsWait, process_io);
#else
#if defined(WEBRTC_USE_EPOLL)
// We don't keep a dedicated "epoll" descriptor containing only the non-IO
// (i.e. signaling) dispatcher, so "poll" will be used instead of the default
// "select" to support sockets larger than FD_SETSIZE.
if (!process_io) {
return WaitPoll(cmsWait, signal_wakeup_);
return WaitPollOneDispatcher(cmsWait, signal_wakeup_);
} else if (epoll_fd_ != INVALID_SOCKET) {
return WaitEpoll(cmsWait);
}
#endif
return WaitSelect(cmsWait, process_io);
#endif
}
// `error_event` is true if we are responding to an event where we know an
@ -1346,6 +1353,34 @@ static void ProcessEvents(Dispatcher* dispatcher,
}
}
#if defined(WEBRTC_USE_POLL) || defined(WEBRTC_USE_EPOLL)
static void ProcessPollEvents(Dispatcher* dispatcher, const pollfd& pfd) {
bool readable = (pfd.revents & (POLLIN | POLLPRI));
bool writable = (pfd.revents & POLLOUT);
bool error = (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP));
ProcessEvents(dispatcher, readable, writable, error, error);
}
static pollfd DispatcherToPollfd(Dispatcher* dispatcher) {
pollfd fd{
.fd = dispatcher->GetDescriptor(),
.events = 0,
.revents = 0,
};
uint32_t ff = dispatcher->GetRequestedEvents();
if (ff & (DE_READ | DE_ACCEPT)) {
fd.events |= POLLIN;
}
if (ff & (DE_WRITE | DE_CONNECT)) {
fd.events |= POLLOUT;
}
return fd;
}
#endif // WEBRTC_USE_POLL || WEBRTC_USE_EPOLL
bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
// Calculate timing information
@ -1387,7 +1422,6 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
for (auto const& kv : dispatcher_by_key_) {
uint64_t key = kv.first;
Dispatcher* pdispatcher = kv.second;
// Query dispatchers for read and write wait state
if (!process_io && (pdispatcher != signal_wakeup_))
continue;
current_dispatcher_keys_.push_back(key);
@ -1428,9 +1462,9 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
} else {
// We have signaled descriptors
CritScope cr(&crit_);
// Iterate only on the dispatchers whose sockets were passed into
// WSAEventSelect; this avoids the ABA problem (a socket being
// destroyed and a new one created with the same file descriptor).
// Iterate only on the dispatchers whose file descriptors were passed into
// select; this avoids the ABA problem (a socket being destroyed and a new
// one created with the same file descriptor).
for (uint64_t key : current_dispatcher_keys_) {
if (!dispatcher_by_key_.count(key))
continue;
@ -1547,11 +1581,11 @@ void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) {
bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
int64_t tvWait = -1;
int64_t tvStop = -1;
int64_t msWait = -1;
int64_t msStop = -1;
if (cmsWait != kForeverMs) {
tvWait = cmsWait;
tvStop = TimeAfter(cmsWait);
msWait = cmsWait;
msStop = TimeAfter(cmsWait);
}
fWait_ = true;
@ -1561,7 +1595,7 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
// 0 means timeout
// > 0 means count of descriptors ready
int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
static_cast<int>(tvWait));
static_cast<int>(msWait));
if (n < 0) {
if (errno != EINTR) {
RTC_LOG_E(LS_ERROR, EN, errno) << "epoll";
@ -1595,8 +1629,8 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
}
if (cmsWait != kForeverMs) {
tvWait = TimeDiff(tvStop, TimeMillis());
if (tvWait <= 0) {
msWait = TimeDiff(msStop, TimeMillis());
if (msWait <= 0) {
// Return success on timeout.
return true;
}
@ -1606,37 +1640,27 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
return true;
}
bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
bool PhysicalSocketServer::WaitPollOneDispatcher(int cmsWait,
Dispatcher* dispatcher) {
RTC_DCHECK(dispatcher);
int64_t tvWait = -1;
int64_t tvStop = -1;
int64_t msWait = -1;
int64_t msStop = -1;
if (cmsWait != kForeverMs) {
tvWait = cmsWait;
tvStop = TimeAfter(cmsWait);
msWait = cmsWait;
msStop = TimeAfter(cmsWait);
}
fWait_ = true;
struct pollfd fds = {0};
int fd = dispatcher->GetDescriptor();
fds.fd = fd;
const int fd = dispatcher->GetDescriptor();
while (fWait_) {
uint32_t ff = dispatcher->GetRequestedEvents();
fds.events = 0;
if (ff & (DE_READ | DE_ACCEPT)) {
fds.events |= POLLIN;
}
if (ff & (DE_WRITE | DE_CONNECT)) {
fds.events |= POLLOUT;
}
fds.revents = 0;
auto fds = DispatcherToPollfd(dispatcher);
// Wait then call handlers as appropriate
// < 0 means error
// 0 means timeout
// > 0 means count of descriptors ready
int n = poll(&fds, 1, static_cast<int>(tvWait));
int n = poll(&fds, 1, static_cast<int>(msWait));
if (n < 0) {
if (errno != EINTR) {
RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
@ -1653,17 +1677,12 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
// We have signaled descriptors (should only be the passed dispatcher).
RTC_DCHECK_EQ(n, 1);
RTC_DCHECK_EQ(fds.fd, fd);
bool readable = (fds.revents & (POLLIN | POLLPRI));
bool writable = (fds.revents & POLLOUT);
bool error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
ProcessEvents(dispatcher, readable, writable, error, error);
ProcessPollEvents(dispatcher, fds);
}
if (cmsWait != kForeverMs) {
tvWait = TimeDiff(tvStop, TimeMillis());
if (tvWait < 0) {
msWait = TimeDiff(msStop, TimeMillis());
if (msWait < 0) {
// Return success on timeout.
return true;
}
@ -1673,7 +1692,80 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
return true;
}
#endif // WEBRTC_USE_EPOLL
#elif defined(WEBRTC_USE_POLL)
bool PhysicalSocketServer::WaitPoll(int cmsWait, bool process_io) {
int64_t msWait = -1;
int64_t msStop = -1;
if (cmsWait != kForeverMs) {
msWait = cmsWait;
msStop = TimeAfter(cmsWait);
}
std::vector<pollfd> pollfds;
fWait_ = true;
while (fWait_) {
{
CritScope cr(&crit_);
current_dispatcher_keys_.clear();
pollfds.clear();
pollfds.reserve(dispatcher_by_key_.size());
for (auto const& kv : dispatcher_by_key_) {
uint64_t key = kv.first;
Dispatcher* pdispatcher = kv.second;
if (!process_io && (pdispatcher != signal_wakeup_))
continue;
current_dispatcher_keys_.push_back(key);
pollfds.push_back(DispatcherToPollfd(pdispatcher));
}
}
// Wait then call handlers as appropriate
// < 0 means error
// 0 means timeout
// > 0 means count of descriptors ready
int n = poll(pollfds.data(), pollfds.size(), static_cast<int>(msWait));
if (n < 0) {
if (errno != EINTR) {
RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
return false;
}
// Else ignore the error and keep going. If this EINTR was for one of the
// signals managed by this PhysicalSocketServer, the
// PosixSignalDeliveryDispatcher will be in the signaled state in the next
// iteration.
} else if (n == 0) {
// If timeout, return success
return true;
} else {
// We have signaled descriptors
CritScope cr(&crit_);
// Iterate only on the dispatchers whose file descriptors were passed into
// poll; this avoids the ABA problem (a socket being destroyed and a new
// one created with the same file descriptor).
for (size_t i = 0; i < current_dispatcher_keys_.size(); ++i) {
uint64_t key = current_dispatcher_keys_[i];
if (!dispatcher_by_key_.count(key))
continue;
ProcessPollEvents(dispatcher_by_key_.at(key), pollfds[i]);
}
}
if (cmsWait != kForeverMs) {
msWait = TimeDiff(msStop, TimeMillis());
if (msWait < 0) {
// Return success on timeout.
return true;
}
}
}
return true;
}
#endif // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL
#endif // WEBRTC_POSIX

View File

@ -12,10 +12,21 @@
#define RTC_BASE_PHYSICAL_SOCKET_SERVER_H_
#include "api/units/time_delta.h"
#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
#if defined(WEBRTC_POSIX)
#if defined(WEBRTC_LINUX)
// On Linux, use epoll.
#include <sys/epoll.h>
#define WEBRTC_USE_EPOLL 1
#endif
#elif defined(WEBRTC_FUCHSIA)
// Fuchsia implements select and poll but not epoll, and testing shows that poll
// is faster than select.
#include <poll.h>
#define WEBRTC_USE_POLL 1
#else
// On other POSIX systems, use select by default.
#endif // WEBRTC_LINUX, WEBRTC_FUCHSIA
#endif // WEBRTC_POSIX
#include <array>
#include <memory>
@ -89,15 +100,16 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
static constexpr int kForeverMs = -1;
static int ToCmsWait(webrtc::TimeDelta max_wait_duration);
#if defined(WEBRTC_POSIX)
bool WaitSelect(int cmsWait, bool process_io);
#endif // WEBRTC_POSIX
#if defined(WEBRTC_USE_EPOLL)
void AddEpoll(Dispatcher* dispatcher, uint64_t key);
void RemoveEpoll(Dispatcher* dispatcher);
void UpdateEpoll(Dispatcher* dispatcher, uint64_t key);
bool WaitEpoll(int cmsWait);
bool WaitPoll(int cmsWait, Dispatcher* dispatcher);
bool WaitPollOneDispatcher(int cmsWait, Dispatcher* dispatcher);
// This array is accessed in isolation by a thread calling into Wait().
// It's useless to use a SequenceChecker to guard it because a socket
@ -105,7 +117,16 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
// to have to reset the sequence checker on Wait calls.
std::array<epoll_event, kNumEpollEvents> epoll_events_;
const int epoll_fd_ = INVALID_SOCKET;
#endif // WEBRTC_USE_EPOLL
#elif defined(WEBRTC_USE_POLL)
void AddPoll(Dispatcher* dispatcher, uint64_t key);
void RemovePoll(Dispatcher* dispatcher);
void UpdatePoll(Dispatcher* dispatcher, uint64_t key);
bool WaitPoll(int cmsWait, bool process_io);
#endif // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL
#endif // WEBRTC_POSIX
// uint64_t keys are used to uniquely identify a dispatcher in order to avoid
// the ABA problem during the epoll loop (a dispatcher being destroyed and
// replaced by one with the same address).
@ -116,9 +137,9 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
std::unordered_map<Dispatcher*, uint64_t> key_by_dispatcher_
RTC_GUARDED_BY(crit_);
// A list of dispatcher keys that we're interested in for the current
// select() or WSAWaitForMultipleEvents() loop. Again, used to avoid the ABA
// problem (a socket being destroyed and a new one created with the same
// handle, erroneously receiving the events from the destroyed socket).
// select(), poll(), or WSAWaitForMultipleEvents() loop. Again, used to avoid
// the ABA problem (a socket being destroyed and a new one created with the
// same handle, erroneously receiving the events from the destroyed socket).
//
// Kept as a member variable just for efficiency.
std::vector<uint64_t> current_dispatcher_keys_;