diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc index 4ab33fb673..655d397913 100644 --- a/webrtc/base/physicalsocketserver.cc +++ b/webrtc/base/physicalsocketserver.cc @@ -21,6 +21,10 @@ #include #include #include +#if defined(WEBRTC_USE_EPOLL) +// "poll" will be used to wait for the signal dispatcher. +#include +#endif #include #include #include @@ -80,6 +84,16 @@ int64_t GetSocketRecvTimestamp(int socket) { typedef char* SockOptArg; #endif +#if defined(WEBRTC_USE_EPOLL) +// POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17. +#if !defined(POLLRDHUP) +#define POLLRDHUP 0x2000 +#endif +#if !defined(EPOLLRDHUP) +#define EPOLLRDHUP 0x2000 +#endif +#endif + namespace rtc { std::unique_ptr SocketServer::CreateDefault() { @@ -774,6 +788,14 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { #elif defined(WEBRTC_POSIX) void SocketDispatcher::OnEvent(uint32_t ff, int err) { +#if defined(WEBRTC_USE_EPOLL) + // Remember currently enabled events so we can combine multiple changes + // into one update call later. + // The signal handlers might re-enable events disabled here, so we can't + // keep a list of events to disable at the end of the method. This list + // would not be updated with the events enabled by the signal handlers. + StartBatchedEventUpdates(); +#endif // Make sure we deliver connect/accept first. Otherwise, consumers may see // something like a READ followed by a CONNECT, which would be odd. if ((ff & DE_CONNECT) != 0) { @@ -797,10 +819,65 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { SetEnabledEvents(0); SignalCloseEvent(this, err); } +#if defined(WEBRTC_USE_EPOLL) + FinishBatchedEventUpdates(); +#endif } #endif // WEBRTC_POSIX +#if defined(WEBRTC_USE_EPOLL) + +static int GetEpollEvents(uint32_t ff) { + int events = 0; + if (ff & (DE_READ | DE_ACCEPT)) { + events |= EPOLLIN; + } + if (ff & (DE_WRITE | DE_CONNECT)) { + events |= EPOLLOUT; + } + return events; +} + +void SocketDispatcher::StartBatchedEventUpdates() { + RTC_DCHECK_EQ(saved_enabled_events_, -1); + saved_enabled_events_ = enabled_events(); +} + +void SocketDispatcher::FinishBatchedEventUpdates() { + RTC_DCHECK_NE(saved_enabled_events_, -1); + uint8_t old_events = static_cast(saved_enabled_events_); + saved_enabled_events_ = -1; + MaybeUpdateDispatcher(old_events); +} + +void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { + if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) && + saved_enabled_events_ == -1) { + ss_->Update(this); + } +} + +void SocketDispatcher::SetEnabledEvents(uint8_t events) { + uint8_t old_events = enabled_events(); + PhysicalSocket::SetEnabledEvents(events); + MaybeUpdateDispatcher(old_events); +} + +void SocketDispatcher::EnableEvents(uint8_t events) { + uint8_t old_events = enabled_events(); + PhysicalSocket::EnableEvents(events); + MaybeUpdateDispatcher(old_events); +} + +void SocketDispatcher::DisableEvents(uint8_t events) { + uint8_t old_events = enabled_events(); + PhysicalSocket::DisableEvents(events); + MaybeUpdateDispatcher(old_events); +} + +#endif // WEBRTC_USE_EPOLL + int SocketDispatcher::Close() { if (s_ == INVALID_SOCKET) return 0; @@ -1129,6 +1206,17 @@ class Signaler : public EventDispatcher { PhysicalSocketServer::PhysicalSocketServer() : fWait_(false) { +#if defined(WEBRTC_USE_EPOLL) + // Since Linux 2.6.8, the size argument is ignored, but must be greater than + // zero. Before that the size served as hint to the kernel for the amount of + // space to initially allocate in internal data structures. + epoll_fd_ = epoll_create(FD_SETSIZE); + if (epoll_fd_ == -1) { + // Not an error, will fall back to "select" below. + LOG_E(LS_WARNING, EN, errno) << "epoll_create"; + epoll_fd_ = INVALID_SOCKET; + } +#endif signal_wakeup_ = new Signaler(this, &fWait_); #if defined(WEBRTC_WIN) socket_ev_ = WSACreateEvent(); @@ -1143,6 +1231,11 @@ PhysicalSocketServer::~PhysicalSocketServer() { signal_dispatcher_.reset(); #endif delete signal_wakeup_; +#if defined(WEBRTC_USE_EPOLL) + if (epoll_fd_ != INVALID_SOCKET) { + close(epoll_fd_); + } +#endif RTC_DCHECK(dispatchers_.empty()); } @@ -1190,40 +1283,148 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { CritScope cs(&crit_); - // Prevent duplicates. This can cause dead dispatchers to stick around. - DispatcherList::iterator pos = std::find(dispatchers_.begin(), - dispatchers_.end(), - pdispatcher); - if (pos != dispatchers_.end()) - return; - dispatchers_.push_back(pdispatcher); + if (processing_dispatchers_) { + // A dispatcher is being added while a "Wait" call is processing the + // list of socket events. + // Defer adding to "dispatchers_" set until processing is done to avoid + // invalidating the iterator in "Wait". + pending_remove_dispatchers_.erase(pdispatcher); + pending_add_dispatchers_.insert(pdispatcher); + } else { + dispatchers_.insert(pdispatcher); + } +#if defined(WEBRTC_USE_EPOLL) + if (epoll_fd_ != INVALID_SOCKET) { + AddEpoll(pdispatcher); + } +#endif // WEBRTC_USE_EPOLL } void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { CritScope cs(&crit_); - DispatcherList::iterator pos = std::find(dispatchers_.begin(), - dispatchers_.end(), - pdispatcher); - // We silently ignore duplicate calls to Add, so we should silently ignore - // the (expected) symmetric calls to Remove. Note that this may still hide - // a real issue, so we at least log a warning about it. - if (pos == dispatchers_.end()) { + if (processing_dispatchers_) { + // A dispatcher is being removed while a "Wait" call is processing the + // list of socket events. + // Defer removal from "dispatchers_" set until processing is done to avoid + // invalidating the iterator in "Wait". + if (!pending_add_dispatchers_.erase(pdispatcher) && + dispatchers_.find(pdispatcher) == dispatchers_.end()) { + LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " + << "dispatcher, potentially from a duplicate call to " + << "Add."; + return; + } + + pending_remove_dispatchers_.insert(pdispatcher); + } else if (!dispatchers_.erase(pdispatcher)) { LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " << "dispatcher, potentially from a duplicate call to Add."; return; } - size_t index = pos - dispatchers_.begin(); - dispatchers_.erase(pos); - for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); - ++it) { - if (index < **it) { - --**it; +#if defined(WEBRTC_USE_EPOLL) + if (epoll_fd_ != INVALID_SOCKET) { + RemoveEpoll(pdispatcher); + } +#endif // WEBRTC_USE_EPOLL +} + +void PhysicalSocketServer::Update(Dispatcher* pdispatcher) { +#if defined(WEBRTC_USE_EPOLL) + if (epoll_fd_ == INVALID_SOCKET) { + return; + } + + CritScope cs(&crit_); + if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { + return; + } + + UpdateEpoll(pdispatcher); +#endif +} + +void PhysicalSocketServer::AddRemovePendingDispatchers() { + if (!pending_add_dispatchers_.empty()) { + for (Dispatcher* pdispatcher : pending_add_dispatchers_) { + dispatchers_.insert(pdispatcher); } + pending_add_dispatchers_.clear(); + } + + if (!pending_remove_dispatchers_.empty()) { + for (Dispatcher* pdispatcher : pending_remove_dispatchers_) { + dispatchers_.erase(pdispatcher); + } + pending_remove_dispatchers_.clear(); } } #if defined(WEBRTC_POSIX) + bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { +#if defined(WEBRTC_USE_EPOLL) + // We don't keep a dedicated "epoll" descriptor containing only the non-IO + // (i.e. signaling) dispatcher, so "poll" will be used instead of the default + // "select" to support sockets larger than FD_SETSIZE. + if (!process_io) { + return WaitPoll(cmsWait, signal_wakeup_); + } else if (epoll_fd_ != INVALID_SOCKET) { + return WaitEpoll(cmsWait); + } +#endif + return WaitSelect(cmsWait, process_io); +} + +static void ProcessEvents(Dispatcher* dispatcher, + bool readable, + bool writable, + bool check_error) { + int errcode = 0; + // TODO(pthatcher): Should we set errcode if getsockopt fails? + if (check_error) { + socklen_t len = sizeof(errcode); + ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode, + &len); + } + + uint32_t ff = 0; + + // Check readable descriptors. If we're waiting on an accept, signal + // that. Otherwise we're waiting for data, check to see if we're + // readable or really closed. + // TODO(pthatcher): Only peek at TCP descriptors. + if (readable) { + if (dispatcher->GetRequestedEvents() & DE_ACCEPT) { + ff |= DE_ACCEPT; + } else if (errcode || dispatcher->IsDescriptorClosed()) { + ff |= DE_CLOSE; + } else { + ff |= DE_READ; + } + } + + // Check writable descriptors. If we're waiting on a connect, detect + // success versus failure by the reaped error code. + if (writable) { + if (dispatcher->GetRequestedEvents() & DE_CONNECT) { + if (!errcode) { + ff |= DE_CONNECT; + } else { + ff |= DE_CLOSE; + } + } else { + ff |= DE_WRITE; + } + } + + // Tell the descriptor about the event. + if (ff != 0) { + dispatcher->OnPreEvent(ff); + dispatcher->OnEvent(ff, errcode); + } +} + +bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { // Calculate timing information struct timeval* ptvWait = nullptr; @@ -1266,13 +1467,17 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { int fdmax = -1; { CritScope cr(&crit_); - for (size_t i = 0; i < dispatchers_.size(); ++i) { + // TODO(jbauch): Support re-entrant waiting. + RTC_DCHECK(!processing_dispatchers_); + for (Dispatcher* pdispatcher : dispatchers_) { // Query dispatchers for read and write wait state - Dispatcher *pdispatcher = dispatchers_[i]; RTC_DCHECK(pdispatcher); if (!process_io && (pdispatcher != signal_wakeup_)) continue; int fd = pdispatcher->GetDescriptor(); + // "select"ing a file descriptor that is equal to or larger than + // FD_SETSIZE will result in undefined behavior. + RTC_DCHECK_LT(fd, FD_SETSIZE); if (fd > fdmax) fdmax = fd; @@ -1306,55 +1511,28 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { } else { // We have signaled descriptors CritScope cr(&crit_); - for (size_t i = 0; i < dispatchers_.size(); ++i) { - Dispatcher *pdispatcher = dispatchers_[i]; + processing_dispatchers_ = true; + for (Dispatcher* pdispatcher : dispatchers_) { int fd = pdispatcher->GetDescriptor(); - uint32_t ff = 0; - int errcode = 0; - // Reap any error code, which can be signaled through reads or writes. - // TODO(pthatcher): Should we set errcode if getsockopt fails? - if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { - socklen_t len = sizeof(errcode); - ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); - } - - // Check readable descriptors. If we're waiting on an accept, signal - // that. Otherwise we're waiting for data, check to see if we're - // readable or really closed. - // TODO(pthatcher): Only peek at TCP descriptors. - if (FD_ISSET(fd, &fdsRead)) { + bool readable = FD_ISSET(fd, &fdsRead); + if (readable) { FD_CLR(fd, &fdsRead); - if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { - ff |= DE_ACCEPT; - } else if (errcode || pdispatcher->IsDescriptorClosed()) { - ff |= DE_CLOSE; - } else { - ff |= DE_READ; - } } - // Check writable descriptors. If we're waiting on a connect, detect - // success versus failure by the reaped error code. - if (FD_ISSET(fd, &fdsWrite)) { + bool writable = FD_ISSET(fd, &fdsWrite); + if (writable) { FD_CLR(fd, &fdsWrite); - if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { - if (!errcode) { - ff |= DE_CONNECT; - } else { - ff |= DE_CLOSE; - } - } else { - ff |= DE_WRITE; - } } - // Tell the descriptor about the event. - if (ff != 0) { - pdispatcher->OnPreEvent(ff); - pdispatcher->OnEvent(ff, errcode); - } + // The error code can be signaled through reads or writes. + ProcessEvents(pdispatcher, readable, writable, readable || writable); } + + processing_dispatchers_ = false; + // Process deferred dispatchers that have been added/removed while the + // events were handled above. + AddRemovePendingDispatchers(); } // Recalc the time remaining to wait. Doing it here means it doesn't get @@ -1381,6 +1559,214 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { return true; } +#if defined(WEBRTC_USE_EPOLL) + +// Initial number of events to process with one call to "epoll_wait". +static const size_t kInitialEpollEvents = 128; + +// Maximum number of events to process with one call to "epoll_wait". +static const size_t kMaxEpollEvents = 8192; + +void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); + int fd = pdispatcher->GetDescriptor(); + RTC_DCHECK(fd != INVALID_SOCKET); + if (fd == INVALID_SOCKET) { + return; + } + + struct epoll_event event = {0}; + event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); + event.data.ptr = pdispatcher; + int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event); + RTC_DCHECK_EQ(err, 0); + if (err == -1) { + LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD"; + } +} + +void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); + int fd = pdispatcher->GetDescriptor(); + RTC_DCHECK(fd != INVALID_SOCKET); + if (fd == INVALID_SOCKET) { + return; + } + + struct epoll_event event = {0}; + int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event); + RTC_DCHECK(err == 0 || errno == ENOENT); + if (err == -1) { + if (errno == ENOENT) { + // Socket has already been closed. + LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; + } else { + LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; + } + } +} + +void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) { + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); + int fd = pdispatcher->GetDescriptor(); + RTC_DCHECK(fd != INVALID_SOCKET); + if (fd == INVALID_SOCKET) { + return; + } + + struct epoll_event event = {0}; + event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); + event.data.ptr = pdispatcher; + int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event); + RTC_DCHECK_EQ(err, 0); + if (err == -1) { + LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD"; + } +} + +bool PhysicalSocketServer::WaitEpoll(int cmsWait) { + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); + int64_t tvWait = -1; + int64_t tvStop = -1; + if (cmsWait != kForever) { + tvWait = cmsWait; + tvStop = TimeAfter(cmsWait); + } + + if (epoll_events_.empty()) { + // The initial space to receive events is created only if epoll is used. + epoll_events_.resize(kInitialEpollEvents); + } + + fWait_ = true; + + while (fWait_) { + // Wait then call handlers as appropriate + // < 0 means error + // 0 means timeout + // > 0 means count of descriptors ready + int n = epoll_wait(epoll_fd_, &epoll_events_[0], + static_cast(epoll_events_.size()), + static_cast(tvWait)); + if (n < 0) { + if (errno != EINTR) { + LOG_E(LS_ERROR, EN, errno) << "epoll"; + return false; + } + // Else ignore the error and keep going. If this EINTR was for one of the + // signals managed by this PhysicalSocketServer, the + // PosixSignalDeliveryDispatcher will be in the signaled state in the next + // iteration. + } else if (n == 0) { + // If timeout, return success + return true; + } else { + // We have signaled descriptors + CritScope cr(&crit_); + for (int i = 0; i < n; ++i) { + const epoll_event& event = epoll_events_[i]; + Dispatcher* pdispatcher = static_cast(event.data.ptr); + if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { + // The dispatcher for this socket no longer exists. + continue; + } + + bool readable = (event.events & (EPOLLIN | EPOLLPRI)); + bool writable = (event.events & EPOLLOUT); + bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)); + + ProcessEvents(pdispatcher, readable, writable, check_error); + } + } + + if (static_cast(n) == epoll_events_.size() && + epoll_events_.size() < kMaxEpollEvents) { + // We used the complete space to receive events, increase size for future + // iterations. + epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); + } + + if (cmsWait != kForever) { + tvWait = TimeDiff(tvStop, TimeMillis()); + if (tvWait < 0) { + // Return success on timeout. + return true; + } + } + } + + return true; +} + +bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { + RTC_DCHECK(dispatcher); + int64_t tvWait = -1; + int64_t tvStop = -1; + if (cmsWait != kForever) { + tvWait = cmsWait; + tvStop = TimeAfter(cmsWait); + } + + fWait_ = true; + + struct pollfd fds = {0}; + int fd = dispatcher->GetDescriptor(); + fds.fd = fd; + + while (fWait_) { + uint32_t ff = dispatcher->GetRequestedEvents(); + fds.events = 0; + if (ff & (DE_READ | DE_ACCEPT)) { + fds.events |= POLLIN; + } + if (ff & (DE_WRITE | DE_CONNECT)) { + fds.events |= POLLOUT; + } + fds.revents = 0; + + // Wait then call handlers as appropriate + // < 0 means error + // 0 means timeout + // > 0 means count of descriptors ready + int n = poll(&fds, 1, static_cast(tvWait)); + if (n < 0) { + if (errno != EINTR) { + LOG_E(LS_ERROR, EN, errno) << "poll"; + return false; + } + // Else ignore the error and keep going. If this EINTR was for one of the + // signals managed by this PhysicalSocketServer, the + // PosixSignalDeliveryDispatcher will be in the signaled state in the next + // iteration. + } else if (n == 0) { + // If timeout, return success + return true; + } else { + // We have signaled descriptors (should only be the passed dispatcher). + RTC_DCHECK_EQ(n, 1); + RTC_DCHECK_EQ(fds.fd, fd); + + bool readable = (fds.revents & (POLLIN | POLLPRI)); + bool writable = (fds.revents & POLLOUT); + bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP)); + + ProcessEvents(dispatcher, readable, writable, check_error); + } + + if (cmsWait != kForever) { + tvWait = TimeDiff(tvStop, TimeMillis()); + if (tvWait < 0) { + // Return success on timeout. + return true; + } + } + } + + return true; +} + +#endif // WEBRTC_USE_EPOLL + static void GlobalSignalHandler(int signum) { PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); } @@ -1454,12 +1840,13 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { { CritScope cr(&crit_); - size_t i = 0; - iterators_.push_back(&i); - // Don't track dispatchers_.size(), because we want to pick up any new - // dispatchers that were added while processing the loop. - while (i < dispatchers_.size()) { - Dispatcher* disp = dispatchers_[i++]; + // TODO(jbauch): Support re-entrant waiting. + RTC_DCHECK(!processing_dispatchers_); + + // Calling "CheckSignalClose" might remove a closed dispatcher from the + // set. This must be deferred to prevent invalidating the iterator. + processing_dispatchers_ = true; + for (Dispatcher* disp : dispatchers_) { if (!process_io && (disp != signal_wakeup_)) continue; SOCKET s = disp->GetSocket(); @@ -1474,8 +1861,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { event_owners.push_back(disp); } } - RTC_DCHECK(iterators_.back() == &i); - iterators_.pop_back(); + + processing_dispatchers_ = false; + // Process deferred dispatchers that have been added/removed while the + // events were handled above. + AddRemovePendingDispatchers(); } // Which is shorter, the delay wait or the asked wait? @@ -1509,14 +1899,15 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { int index = dw - WSA_WAIT_EVENT_0; if (index > 0) { --index; // The first event is the socket event - event_owners[index]->OnPreEvent(0); - event_owners[index]->OnEvent(0, 0); + Dispatcher* disp = event_owners[index]; + // The dispatcher could have been removed while waiting for events. + if (dispatchers_.find(disp) != dispatchers_.end()) { + disp->OnPreEvent(0); + disp->OnEvent(0, 0); + } } else if (process_io) { - size_t i = 0, end = dispatchers_.size(); - iterators_.push_back(&i); - iterators_.push_back(&end); // Don't iterate over new dispatchers. - while (i < end) { - Dispatcher* disp = dispatchers_[i++]; + processing_dispatchers_ = true; + for (Dispatcher* disp : dispatchers_) { SOCKET s = disp->GetSocket(); if (s == INVALID_SOCKET) continue; @@ -1577,10 +1968,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { } } } - RTC_DCHECK(iterators_.back() == &end); - iterators_.pop_back(); - RTC_DCHECK(iterators_.back() == &i); - iterators_.pop_back(); + + processing_dispatchers_ = false; + // Process deferred dispatchers that have been added/removed while the + // events were handled above. + AddRemovePendingDispatchers(); } // Reset the network event until new activity occurs diff --git a/webrtc/base/physicalsocketserver.h b/webrtc/base/physicalsocketserver.h index 5f843da20c..dec37c256c 100644 --- a/webrtc/base/physicalsocketserver.h +++ b/webrtc/base/physicalsocketserver.h @@ -11,7 +11,13 @@ #ifndef WEBRTC_BASE_PHYSICALSOCKETSERVER_H__ #define WEBRTC_BASE_PHYSICALSOCKETSERVER_H__ +#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) +#include +#define WEBRTC_USE_EPOLL 1 +#endif + #include +#include #include #include "webrtc/base/nethelpers.h" @@ -76,6 +82,7 @@ class PhysicalSocketServer : public SocketServer { void Add(Dispatcher* dispatcher); void Remove(Dispatcher* dispatcher); + void Update(Dispatcher* dispatcher); #if defined(WEBRTC_POSIX) // Sets the function to be executed in response to the specified POSIX signal. @@ -95,16 +102,30 @@ class PhysicalSocketServer : public SocketServer { #endif private: - typedef std::vector DispatcherList; - typedef std::vector IteratorList; + typedef std::set DispatcherSet; + + void AddRemovePendingDispatchers(); #if defined(WEBRTC_POSIX) + bool WaitSelect(int cms, bool process_io); static bool InstallSignal(int signum, void (*handler)(int)); std::unique_ptr signal_dispatcher_; -#endif - DispatcherList dispatchers_; - IteratorList iterators_; +#endif // WEBRTC_POSIX +#if defined(WEBRTC_USE_EPOLL) + void AddEpoll(Dispatcher* dispatcher); + void RemoveEpoll(Dispatcher* dispatcher); + void UpdateEpoll(Dispatcher* dispatcher); + bool WaitEpoll(int cms); + bool WaitPoll(int cms, Dispatcher* dispatcher); + + int epoll_fd_ = INVALID_SOCKET; + std::vector epoll_events_; +#endif // WEBRTC_USE_EPOLL + DispatcherSet dispatchers_; + DispatcherSet pending_add_dispatchers_; + DispatcherSet pending_remove_dispatchers_; + bool processing_dispatchers_ = false; Signaler* signal_wakeup_; CriticalSection crit_; bool fWait_; @@ -172,9 +193,9 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { void MaybeRemapSendError(); uint8_t enabled_events() const { return enabled_events_; } - void SetEnabledEvents(uint8_t events); - void EnableEvents(uint8_t events); - void DisableEvents(uint8_t events); + virtual void SetEnabledEvents(uint8_t events); + virtual void EnableEvents(uint8_t events); + virtual void DisableEvents(uint8_t events); static int TranslateOption(Option opt, int* slevel, int* sopt); @@ -220,13 +241,28 @@ class SocketDispatcher : public Dispatcher, public PhysicalSocket { int Close() override; -#if defined(WEBRTC_WIN) +#if defined(WEBRTC_USE_EPOLL) + protected: + void StartBatchedEventUpdates(); + void FinishBatchedEventUpdates(); + + void SetEnabledEvents(uint8_t events) override; + void EnableEvents(uint8_t events) override; + void DisableEvents(uint8_t events) override; +#endif + private: +#if defined(WEBRTC_WIN) static int next_id_; int id_; bool signal_close_; int signal_err_; #endif // WEBRTC_WIN +#if defined(WEBRTC_USE_EPOLL) + void MaybeUpdateDispatcher(uint8_t old_events); + + int saved_enabled_events_ = -1; +#endif }; } // namespace rtc