From 257f81b98e437c22964b9be40d967b68c5ad83c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niels=20M=C3=B6ller?= Date: Thu, 17 Jun 2021 16:58:59 +0200 Subject: [PATCH] Update VirtualSocketServer locking to match documentation. Add GUARDED_BY annotation on members claimed to be protected by the lock, and add missing lock operations. Also mark a few members const. Bug: webrtc:11567, webrtc:2079 Change-Id: I8f12ca7627df0c24e07fa2ae24a387c6a0ed76cf Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/208224 Reviewed-by: Harald Alvestrand Commit-Queue: Niels Moller Cr-Commit-Position: refs/heads/master@{#34340} --- rtc_base/virtual_socket_server.cc | 85 +++++++++++++++++-------------- rtc_base/virtual_socket_server.h | 12 ++--- 2 files changed, 54 insertions(+), 43 deletions(-) diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc index 8140fcb6aa..0746982557 100644 --- a/rtc_base/virtual_socket_server.cc +++ b/rtc_base/virtual_socket_server.cc @@ -164,6 +164,8 @@ int VirtualSocket::Close() { } if (SOCK_STREAM == type_) { + CritScope cs(&crit_); + // Cancel pending sockets if (listen_queue_) { while (!listen_queue_->empty()) { @@ -231,6 +233,8 @@ int VirtualSocket::RecvFrom(void* pv, if (timestamp) { *timestamp = -1; } + + CritScope cs(&crit_); // If we don't have a packet, then either error or wait for one to arrive. if (recv_buffer_.empty()) { if (async_) { @@ -273,6 +277,7 @@ int VirtualSocket::RecvFrom(void* pv, } int VirtualSocket::Listen(int backlog) { + CritScope cs(&crit_); RTC_DCHECK(SOCK_STREAM == type_); RTC_DCHECK(CS_CLOSED == state_); if (local_addr_.IsNil()) { @@ -286,6 +291,7 @@ int VirtualSocket::Listen(int backlog) { } VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { + CritScope cs(&crit_); if (nullptr == listen_queue_) { error_ = EINVAL; return nullptr; @@ -341,47 +347,52 @@ int VirtualSocket::SetOption(Option opt, int value) { } void VirtualSocket::OnMessage(Message* pmsg) { - if (pmsg->message_id == MSG_ID_PACKET) { - RTC_DCHECK(nullptr != pmsg->pdata); - Packet* packet = static_cast(pmsg->pdata); + bool signal_read_event = false; + bool signal_close_event = false; + int error_to_signal = 0; + { + CritScope cs(&crit_); + if (pmsg->message_id == MSG_ID_PACKET) { + RTC_DCHECK(nullptr != pmsg->pdata); + Packet* packet = static_cast(pmsg->pdata); - recv_buffer_.push_back(packet); - - if (async_) { - SignalReadEvent(this); - } - } else if (pmsg->message_id == MSG_ID_CONNECT) { - RTC_DCHECK(nullptr != pmsg->pdata); - MessageAddress* data = static_cast(pmsg->pdata); - if (listen_queue_ != nullptr) { - listen_queue_->push_back(data->addr); - if (async_) { - SignalReadEvent(this); + recv_buffer_.push_back(packet); + signal_read_event = async_; + } else if (pmsg->message_id == MSG_ID_CONNECT) { + RTC_DCHECK(nullptr != pmsg->pdata); + MessageAddress* data = static_cast(pmsg->pdata); + if (listen_queue_ != nullptr) { + listen_queue_->push_back(data->addr); + signal_read_event = async_; + } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { + CompleteConnect(data->addr, true); + } else { + RTC_LOG(LS_VERBOSE) + << "Socket at " << local_addr_.ToString() << " is not listening"; + server_->Disconnect(data->addr); } - } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { - CompleteConnect(data->addr, true); + delete data; + } else if (pmsg->message_id == MSG_ID_DISCONNECT) { + RTC_DCHECK(SOCK_STREAM == type_); + if (CS_CLOSED != state_) { + error_to_signal = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; + state_ = CS_CLOSED; + remote_addr_.Clear(); + signal_close_event = async_; + } + } else if (pmsg->message_id == MSG_ID_SIGNALREADEVENT) { + signal_read_event = !recv_buffer_.empty(); } else { - RTC_LOG(LS_VERBOSE) << "Socket at " << local_addr_.ToString() - << " is not listening"; - server_->Disconnect(data->addr); + RTC_NOTREACHED(); } - delete data; - } else if (pmsg->message_id == MSG_ID_DISCONNECT) { - RTC_DCHECK(SOCK_STREAM == type_); - if (CS_CLOSED != state_) { - int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; - state_ = CS_CLOSED; - remote_addr_.Clear(); - if (async_) { - SignalCloseEvent(this, error); - } - } - } else if (pmsg->message_id == MSG_ID_SIGNALREADEVENT) { - if (!recv_buffer_.empty()) { - SignalReadEvent(this); - } - } else { - RTC_NOTREACHED(); + } + // Signal events without holding `crit_`, to avoid lock order inversion with + // sigslot locks. + if (signal_read_event) { + SignalReadEvent(this); + } + if (signal_close_event) { + SignalCloseEvent(this, error_to_signal); } } diff --git a/rtc_base/virtual_socket_server.h b/rtc_base/virtual_socket_server.h index faf31f007a..a17a6d6053 100644 --- a/rtc_base/virtual_socket_server.h +++ b/rtc_base/virtual_socket_server.h @@ -400,16 +400,16 @@ class VirtualSocket : public AsyncSocket, void OnSocketServerReadyToSend(); - VirtualSocketServer* server_; - int type_; - bool async_; + VirtualSocketServer* const server_; + const int type_; + const bool async_; ConnState state_; int error_; SocketAddress local_addr_; SocketAddress remote_addr_; // Pending sockets which can be Accepted - ListenQueue* listen_queue_; + ListenQueue* listen_queue_ RTC_GUARDED_BY(crit_) RTC_PT_GUARDED_BY(crit_); // Data which tcp has buffered for sending SendBuffer send_buffer_; @@ -417,7 +417,7 @@ class VirtualSocket : public AsyncSocket, // Set back to true when the socket can send again. bool ready_to_send_ = true; - // Critical section to protect the recv_buffer and queue_ + // Critical section to protect the recv_buffer and listen_queue_ RecursiveCriticalSection crit_; // Network model that enforces bandwidth and capacity constraints @@ -428,7 +428,7 @@ class VirtualSocket : public AsyncSocket, int64_t last_delivery_time_ = 0; // Data which has been received from the network - RecvBuffer recv_buffer_; + RecvBuffer recv_buffer_ RTC_GUARDED_BY(crit_); // The amount of data which is in flight or in recv_buffer_ size_t recv_buffer_size_;