diff --git a/p2p/base/tcp_port.cc b/p2p/base/tcp_port.cc index 2760c02819..5d4080201b 100644 --- a/p2p/base/tcp_port.cc +++ b/p2p/base/tcp_port.cc @@ -356,7 +356,11 @@ TCPConnection::TCPConnection(rtc::WeakPtr tcp_port, connection_pending_(false), pretending_to_be_writable_(false), reconnection_timeout_(cricket::CONNECTION_WRITE_CONNECT_TIMEOUT) { + RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_EQ(port()->GetProtocol(), PROTO_TCP); // Needs to be TCPPort. + + SignalDestroyed.connect(this, &TCPConnection::OnDestroyed); + if (outgoing_) { CreateOutgoingTcpSocket(); } else { @@ -496,6 +500,7 @@ void TCPConnection::OnConnect(rtc::AsyncPacketSocket* socket) { } void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) { + RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_EQ(socket, socket_.get()); RTC_LOG(LS_INFO) << ToString() << ": Connection closed with error " << error; @@ -532,12 +537,13 @@ void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) { // initial connect() (i.e. `pretending_to_be_writable_` is false) . We have // to manually destroy here as this connection, as never connected, will not // be scheduled for ping to trigger destroy. - socket_->UnsubscribeClose(this); + DisconnectSocketSignals(socket_.get()); port()->DestroyConnectionAsync(this); } } void TCPConnection::MaybeReconnect() { + RTC_DCHECK_RUN_ON(network_thread()); // Only reconnect for an outgoing TCPConnection when OnClose was signaled and // no outstanding reconnect is pending. if (connected() || connection_pending_ || !outgoing_) { @@ -557,15 +563,25 @@ void TCPConnection::OnReadPacket(rtc::AsyncPacketSocket* socket, size_t size, const rtc::SocketAddress& remote_addr, const int64_t& packet_time_us) { + RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_EQ(socket, socket_.get()); Connection::OnReadPacket(data, size, packet_time_us); } void TCPConnection::OnReadyToSend(rtc::AsyncPacketSocket* socket) { + RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_EQ(socket, socket_.get()); Connection::OnReadyToSend(); } +void TCPConnection::OnDestroyed(Connection* c) { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK_EQ(c, this); + if (socket_) { + DisconnectSocketSignals(socket_.get()); + } +} + void TCPConnection::CreateOutgoingTcpSocket() { RTC_DCHECK(outgoing_); int opts = (remote_candidate().protocol() == SSLTCP_PROTOCOL_NAME) @@ -573,7 +589,7 @@ void TCPConnection::CreateOutgoingTcpSocket() { : 0; if (socket_) { - socket_->UnsubscribeClose(this); + DisconnectSocketSignals(socket_.get()); } rtc::PacketSocketTcpOptions tcp_opts; @@ -616,4 +632,13 @@ void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) { }); } +void TCPConnection::DisconnectSocketSignals(rtc::AsyncPacketSocket* socket) { + if (outgoing_) { + socket->SignalConnect.disconnect(this); + } + socket->SignalReadPacket.disconnect(this); + socket->SignalReadyToSend.disconnect(this); + socket->UnsubscribeClose(this); +} + } // namespace cricket diff --git a/p2p/base/tcp_port.h b/p2p/base/tcp_port.h index ff69e6e48b..a1bbaa9f35 100644 --- a/p2p/base/tcp_port.h +++ b/p2p/base/tcp_port.h @@ -153,13 +153,19 @@ class TCPConnection : public Connection, public sigslot::has_slots<> { StunMessage* response) override; private: + friend class TCPPort; // For `MaybeReconnect()`. + // Helper function to handle the case when Ping or Send fails with error // related to socket close. void MaybeReconnect(); - void CreateOutgoingTcpSocket(); + void CreateOutgoingTcpSocket() RTC_RUN_ON(network_thread()); - void ConnectSocketSignals(rtc::AsyncPacketSocket* socket); + void ConnectSocketSignals(rtc::AsyncPacketSocket* socket) + RTC_RUN_ON(network_thread()); + + void DisconnectSocketSignals(rtc::AsyncPacketSocket* socket) + RTC_RUN_ON(network_thread()); void OnConnect(rtc::AsyncPacketSocket* socket); void OnClose(rtc::AsyncPacketSocket* socket, int error); @@ -169,6 +175,7 @@ class TCPConnection : public Connection, public sigslot::has_slots<> { const rtc::SocketAddress& remote_addr, const int64_t& packet_time_us); void OnReadyToSend(rtc::AsyncPacketSocket* socket); + void OnDestroyed(Connection* c); TCPPort* tcp_port() { RTC_DCHECK_EQ(port()->GetProtocol(), PROTO_TCP); @@ -177,7 +184,7 @@ class TCPConnection : public Connection, public sigslot::has_slots<> { std::unique_ptr socket_; int error_; - bool outgoing_; + const bool outgoing_; // Guard against multiple outgoing tcp connection during a reconnect. bool connection_pending_; @@ -194,8 +201,6 @@ class TCPConnection : public Connection, public sigslot::has_slots<> { int reconnection_timeout_; webrtc::ScopedTaskSafety network_safety_; - - friend class TCPPort; }; } // namespace cricket