From 577f5dc60b2f129c947fce7a38597781282bf3a5 Mon Sep 17 00:00:00 2001 From: jbauch Date: Wed, 17 May 2017 16:32:26 -0700 Subject: [PATCH] Add methods to change enabled events in PhysicalSocket. This is in preparation for "epoll" integration where additional code needs to run when the enabled events change. BUG=webrtc:7585 Review-Url: https://codereview.webrtc.org/2893723002 Cr-Commit-Position: refs/heads/master@{#18189} --- webrtc/base/physicalsocketserver.cc | 62 ++++++++++++++++++----------- webrtc/base/physicalsocketserver.h | 9 ++++- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc index 8e57d3bb5f..a412703015 100644 --- a/webrtc/base/physicalsocketserver.cc +++ b/webrtc/base/physicalsocketserver.cc @@ -121,7 +121,7 @@ static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; #endif PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s) - : ss_(ss), s_(s), enabled_events_(0), error_(0), + : ss_(ss), s_(s), error_(0), state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), resolver_(nullptr) { #if defined(WEBRTC_WIN) @@ -133,7 +133,7 @@ PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s) EnsureWinsockInit(); #endif if (s_ != INVALID_SOCKET) { - enabled_events_ = DE_READ | DE_WRITE; + SetEnabledEvents(DE_READ | DE_WRITE); int type = SOCK_STREAM; socklen_t len = sizeof(type); @@ -153,8 +153,9 @@ bool PhysicalSocket::Create(int family, int type) { s_ = ::socket(family, type, 0); udp_ = (SOCK_DGRAM == type); UpdateLastError(); - if (udp_) - enabled_events_ = DE_READ | DE_WRITE; + if (udp_) { + SetEnabledEvents(DE_READ | DE_WRITE); + } return s_ != INVALID_SOCKET; } @@ -266,16 +267,17 @@ int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { sockaddr* addr = reinterpret_cast(&addr_storage); int err = ::connect(s_, addr, static_cast(len)); UpdateLastError(); + uint8_t events = DE_READ | DE_WRITE; if (err == 0) { state_ = CS_CONNECTED; } else if (IsBlockingError(GetError())) { state_ = CS_CONNECTING; - enabled_events_ |= DE_CONNECT; + events |= DE_CONNECT; } else { return SOCKET_ERROR; } - enabled_events_ |= DE_READ | DE_WRITE; + EnableEvents(events); return 0; } @@ -341,7 +343,7 @@ int PhysicalSocket::Send(const void* pv, size_t cb) { RTC_DCHECK(sent <= static_cast(cb)); if ((sent > 0 && sent < static_cast(cb)) || (sent < 0 && IsBlockingError(GetError()))) { - enabled_events_ |= DE_WRITE; + EnableEvents(DE_WRITE); } return sent; } @@ -366,7 +368,7 @@ int PhysicalSocket::SendTo(const void* buffer, RTC_DCHECK(sent <= static_cast(length)); if ((sent > 0 && sent < static_cast(length)) || (sent < 0 && IsBlockingError(GetError()))) { - enabled_events_ |= DE_WRITE; + EnableEvents(DE_WRITE); } return sent; } @@ -381,7 +383,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { LOG(LS_WARNING) << "EOF from socket; deferring close event"; // Must turn this back on so that the select() loop will notice the close // event. - enabled_events_ |= DE_READ; + EnableEvents(DE_READ); SetError(EWOULDBLOCK); return SOCKET_ERROR; } @@ -392,7 +394,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { int error = GetError(); bool success = (received >= 0) || IsBlockingError(error); if (udp_ || success) { - enabled_events_ |= DE_READ; + EnableEvents(DE_READ); } if (!success) { LOG_F(LS_VERBOSE) << "Error = " << error; @@ -418,7 +420,7 @@ int PhysicalSocket::RecvFrom(void* buffer, int error = GetError(); bool success = (received >= 0) || IsBlockingError(error); if (udp_ || success) { - enabled_events_ |= DE_READ; + EnableEvents(DE_READ); } if (!success) { LOG_F(LS_VERBOSE) << "Error = " << error; @@ -431,7 +433,7 @@ int PhysicalSocket::Listen(int backlog) { UpdateLastError(); if (err == 0) { state_ = CS_CONNECTING; - enabled_events_ |= DE_ACCEPT; + EnableEvents(DE_ACCEPT); #if !defined(NDEBUG) dbg_addr_ = "Listening @ "; dbg_addr_.append(GetLocalAddress().ToString()); @@ -443,7 +445,7 @@ int PhysicalSocket::Listen(int backlog) { AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { // Always re-subscribe DE_ACCEPT to make sure new incoming connections will // trigger an event even if DoAccept returns an error here. - enabled_events_ |= DE_ACCEPT; + EnableEvents(DE_ACCEPT); sockaddr_storage addr_storage; socklen_t addr_len = sizeof(addr_storage); sockaddr* addr = reinterpret_cast(&addr_storage); @@ -463,7 +465,7 @@ int PhysicalSocket::Close() { UpdateLastError(); s_ = INVALID_SOCKET; state_ = CS_CLOSED; - enabled_events_ = 0; + SetEnabledEvents(0); if (resolver_) { resolver_->Destroy(false); resolver_ = nullptr; @@ -525,6 +527,18 @@ void PhysicalSocket::MaybeRemapSendError() { #endif } +void PhysicalSocket::SetEnabledEvents(uint8_t events) { + enabled_events_ = events; +} + +void PhysicalSocket::EnableEvents(uint8_t events) { + enabled_events_ |= events; +} + +void PhysicalSocket::DisableEvents(uint8_t events) { + enabled_events_ &= ~events; +} + int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { switch (opt) { case OPT_DONTFRAGMENT: @@ -699,7 +713,7 @@ bool SocketDispatcher::IsDescriptorClosed() { #endif // WEBRTC_POSIX uint32_t SocketDispatcher::GetRequestedEvents() { - return enabled_events_; + return enabled_events(); } void SocketDispatcher::OnPreEvent(uint32_t ff) { @@ -723,7 +737,7 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { if (ff != DE_CONNECT) LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; - enabled_events_ &= ~DE_CONNECT; + DisableEvents(DE_CONNECT); #if !defined(NDEBUG) dbg_addr_ = "Connected @ "; dbg_addr_.append(GetRemoteAddress().ToString()); @@ -731,15 +745,15 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { SignalConnectEvent(this); } if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { - enabled_events_ &= ~DE_ACCEPT; + DisableEvents(DE_ACCEPT); SignalReadEvent(this); } if ((ff & DE_READ) != 0) { - enabled_events_ &= ~DE_READ; + DisableEvents(DE_READ); SignalReadEvent(this); } if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { - enabled_events_ &= ~DE_WRITE; + DisableEvents(DE_WRITE); SignalWriteEvent(this); } if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { @@ -754,24 +768,24 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { // Make sure we deliver connect/accept first. Otherwise, consumers may see // something like a READ followed by a CONNECT, which would be odd. if ((ff & DE_CONNECT) != 0) { - enabled_events_ &= ~DE_CONNECT; + DisableEvents(DE_CONNECT); SignalConnectEvent(this); } if ((ff & DE_ACCEPT) != 0) { - enabled_events_ &= ~DE_ACCEPT; + DisableEvents(DE_ACCEPT); SignalReadEvent(this); } if ((ff & DE_READ) != 0) { - enabled_events_ &= ~DE_READ; + DisableEvents(DE_READ); SignalReadEvent(this); } if ((ff & DE_WRITE) != 0) { - enabled_events_ &= ~DE_WRITE; + DisableEvents(DE_WRITE); SignalWriteEvent(this); } if ((ff & DE_CLOSE) != 0) { // The socket is now dead to us, so stop checking it. - enabled_events_ = 0; + SetEnabledEvents(0); SignalCloseEvent(this, err); } } diff --git a/webrtc/base/physicalsocketserver.h b/webrtc/base/physicalsocketserver.h index f2994b2a67..5f843da20c 100644 --- a/webrtc/base/physicalsocketserver.h +++ b/webrtc/base/physicalsocketserver.h @@ -171,11 +171,15 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { void UpdateLastError(); void MaybeRemapSendError(); + uint8_t enabled_events() const { return enabled_events_; } + void SetEnabledEvents(uint8_t events); + void EnableEvents(uint8_t events); + void DisableEvents(uint8_t events); + static int TranslateOption(Option opt, int* slevel, int* sopt); PhysicalSocketServer* ss_; SOCKET s_; - uint8_t enabled_events_; bool udp_; CriticalSection crit_; int error_ GUARDED_BY(crit_); @@ -185,6 +189,9 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { #if !defined(NDEBUG) std::string dbg_addr_; #endif + + private: + uint8_t enabled_events_ = 0; }; class SocketDispatcher : public Dispatcher, public PhysicalSocket {