From 0cb1cfa69ebb3a75de39f6d75d5448314c558575 Mon Sep 17 00:00:00 2001 From: Mirko Bonadei Date: Fri, 25 Feb 2022 10:45:32 +0000 Subject: [PATCH] Reland "Removing MessageHandler dependency from Connection." This reverts commit 05ea12e5136493a8977e0bb4a81a6ff8d06ec92f. Reason for revert: Speculative revert. Original change's description: > Revert "Removing MessageHandler dependency from Connection." > > This reverts commit 3202e29f72b4f511fcf6e92ef9b0dcbfee6089ff. > > Reason for revert: Introduced a crash in the task posted by Destroy() > > Original change's description: > > Removing MessageHandler dependency from Connection. > > > > Bug: webrtc:11988 > > Change-Id: Ic35bb5baeafbda7210012dceb0d6d5f5b3eb95c9 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249941 > > Reviewed-by: Niels Moller > > Commit-Queue: Tomas Gunnarsson > > Cr-Commit-Position: refs/heads/main@{#35890} > > No-Try: True > Bug: webrtc:11988 > Change-Id: Ie70ee145fde75b8cf76b02784176970e7a78e001 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/252541 > Auto-Submit: Taylor Brandstetter > Reviewed-by: Mirko Bonadei > Owners-Override: Mirko Bonadei > Reviewed-by: Tomas Gunnarsson > Commit-Queue: Tomas Gunnarsson > Cr-Commit-Position: refs/heads/main@{#36078} No-Try: True Bug: webrtc:11988 Change-Id: Idfd42d016e81d4352839c33dcb4ea3b0dafea08b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/252584 Owners-Override: Mirko Bonadei Bot-Commit: rubber-stamper@appspot.gserviceaccount.com Commit-Queue: Mirko Bonadei Cr-Commit-Position: refs/heads/main@{#36081} --- p2p/base/connection.cc | 36 ++++++++-------- p2p/base/connection.h | 8 +--- p2p/base/p2p_transport_channel_unittest.cc | 3 ++ p2p/base/tcp_port.cc | 42 ++++++++----------- p2p/base/tcp_port.h | 10 ++--- pc/scenario_tests/goog_cc_test.cc | 3 ++ .../tests/remote_estimate_test.cc | 2 + .../tests/unsignaled_stream_test.cc | 2 + 8 files changed, 51 insertions(+), 55 deletions(-) diff --git a/p2p/base/connection.cc b/p2p/base/connection.cc index 8575d65945..a04f318acb 100644 --- a/p2p/base/connection.cc +++ b/p2p/base/connection.cc @@ -18,6 +18,7 @@ #include #include "absl/algorithm/container.h" +#include "absl/memory/memory.h" #include "absl/strings/match.h" #include "p2p/base/port_allocator.h" #include "rtc_base/checks.h" @@ -833,15 +834,25 @@ void Connection::Prune() { void Connection::Destroy() { RTC_DCHECK_RUN_ON(network_thread_); - // TODO(deadbeef, nisse): This may leak if an application closes a - // PeerConnection and then quickly destroys the PeerConnectionFactory (along - // with the networking thread on which this message is posted). Also affects - // tests, with a workaround in - // AutoSocketServerThread::~AutoSocketServerThread. - RTC_LOG(LS_VERBOSE) << ToString() << ": Connection destroyed"; - // TODO(bugs.webrtc.org/11988): Use PostTask. - port_->thread()->Post(RTC_FROM_HERE, this, MSG_DELETE); + RTC_DLOG(LS_VERBOSE) << ToString() << ": Connection destroyed"; + + // Fire the 'destroyed' event before deleting the object. This is done + // intentionally to avoid a situation whereby the signal might have dangling + // pointers to objects that have been deleted by the time the async task + // that deletes the connection object runs. + SignalDestroyed(this); + SignalDestroyed.disconnect_all(); + LogCandidatePairConfig(webrtc::IceCandidatePairConfigType::kDestroyed); + + // Unwind the stack before deleting the object in case upstream callers + // need to refer to the Connection's state as part of teardown. + // NOTE: We move ownership of 'this' into the capture section of the lambda + // so that the object will always be deleted, including if PostTask fails. + // In such a case (only tests), deletion would happen inside of the call + // to `Destroy()`. + network_thread_->PostTask( + webrtc::ToQueuedTask([me = absl::WrapUnique(this)]() {})); } void Connection::FailAndDestroy() { @@ -1422,15 +1433,6 @@ void Connection::MaybeUpdatePeerReflexiveCandidate( } } -void Connection::OnMessage(rtc::Message* pmsg) { - RTC_DCHECK_RUN_ON(network_thread_); - RTC_DCHECK(pmsg->message_id == MSG_DELETE); - RTC_LOG(LS_INFO) << "Connection deleted with number of pings sent: " - << num_pings_sent_; - SignalDestroyed(this); - delete this; -} - int64_t Connection::last_received() const { RTC_DCHECK_RUN_ON(network_thread_); return std::max(last_data_received_, diff --git a/p2p/base/connection.h b/p2p/base/connection.h index c102194498..8254706318 100644 --- a/p2p/base/connection.h +++ b/p2p/base/connection.h @@ -70,9 +70,7 @@ class ConnectionRequest : public StunRequest { // Represents a communication link between a port on the local client and a // port on the remote client. -class Connection : public CandidatePairInterface, - public rtc::MessageHandlerAutoCleanup, - public sigslot::has_slots<> { +class Connection : public CandidatePairInterface, public sigslot::has_slots<> { public: struct SentPing { SentPing(const std::string id, int64_t sent_time, uint32_t nomination) @@ -320,8 +318,6 @@ class Connection : public CandidatePairInterface, void set_remote_nomination(uint32_t remote_nomination); protected: - enum { MSG_DELETE = 0, MSG_FIRST_AVAILABLE }; - // Constructs a new connection to the given remote port. Connection(Port* port, size_t index, const Candidate& candidate); @@ -351,8 +347,6 @@ class Connection : public CandidatePairInterface, void set_state(IceCandidatePairState state); void set_connected(bool value); - void OnMessage(rtc::Message* pmsg) override; - // The local port where this connection sends and receives packets. Port* port() { return port_; } const Port* port() const { return port_; } diff --git a/p2p/base/p2p_transport_channel_unittest.cc b/p2p/base/p2p_transport_channel_unittest.cc index 7aed20a669..0a24f8c030 100644 --- a/p2p/base/p2p_transport_channel_unittest.cc +++ b/p2p/base/p2p_transport_channel_unittest.cc @@ -483,6 +483,9 @@ class P2PTransportChannelTestBase : public ::testing::Test, ep2_.cd1_.ch_.reset(); ep1_.cd2_.ch_.reset(); ep2_.cd2_.ch_.reset(); + // Process pending tasks that need to run for cleanup purposes such as + // pending deletion of Connection objects (see Connection::Destroy). + rtc::Thread::Current()->ProcessMessages(0); } P2PTransportChannel* ep1_ch1() { return ep1_.cd1_.ch_.get(); } P2PTransportChannel* ep1_ch2() { return ep1_.cd2_.ch_.get(); } diff --git a/p2p/base/tcp_port.cc b/p2p/base/tcp_port.cc index 9d542074a4..445b0d03a5 100644 --- a/p2p/base/tcp_port.cc +++ b/p2p/base/tcp_port.cc @@ -79,6 +79,7 @@ #include "rtc_base/logging.h" #include "rtc_base/net_helper.h" #include "rtc_base/rate_tracker.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/third_party/sigslot/sigslot.h" namespace cricket { @@ -366,7 +367,9 @@ TCPConnection::TCPConnection(TCPPort* port, } } -TCPConnection::~TCPConnection() {} +TCPConnection::~TCPConnection() { + RTC_DCHECK_RUN_ON(network_thread_); +} int TCPConnection::Send(const void* data, size_t size, @@ -493,11 +496,20 @@ void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) { // events. pretending_to_be_writable_ = true; + // If this connection can't become connected and writable again in 5 + // seconds, it's time to tear this down. This is the case for the original + // TCP connection on passive side during a reconnect. // We don't attempt reconnect right here. This is to avoid a case where the // shutdown is intentional and reconnect is not necessary. We only reconnect // when the connection is used to Send() or Ping(). - port()->thread()->PostDelayed(RTC_FROM_HERE, reconnection_timeout(), this, - MSG_TCPCONNECTION_DELAYED_ONCLOSE); + port()->thread()->PostDelayedTask( + webrtc::ToQueuedTask(network_safety_, + [this]() { + if (pretending_to_be_writable_) { + Destroy(); + } + }), + reconnection_timeout()); } else if (!pretending_to_be_writable_) { // OnClose could be called when the underneath socket times out during the // initial connect() (i.e. `pretending_to_be_writable_` is false) . We have @@ -507,24 +519,6 @@ void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) { } } -void TCPConnection::OnMessage(rtc::Message* pmsg) { - switch (pmsg->message_id) { - case MSG_TCPCONNECTION_DELAYED_ONCLOSE: - // If this connection can't become connected and writable again in 5 - // seconds, it's time to tear this down. This is the case for the original - // TCP connection on passive side during a reconnect. - if (pretending_to_be_writable_) { - Destroy(); - } - break; - case MSG_TCPCONNECTION_FAILED_CREATE_SOCKET: - FailAndPrune(); - break; - default: - Connection::OnMessage(pmsg); - } -} - void TCPConnection::MaybeReconnect() { // Only reconnect for an outgoing TCPConnection when OnClose was signaled and // no outstanding reconnect is pending. @@ -576,13 +570,13 @@ void TCPConnection::CreateOutgoingTcpSocket() { } else { RTC_LOG(LS_WARNING) << ToString() << ": Failed to create connection to " << remote_candidate().address().ToSensitiveString(); + set_state(IceCandidatePairState::FAILED); // We can't FailAndPrune directly here. FailAndPrune and deletes all // the StunRequests from the request_map_. And if this is in the stack // of Connection::Ping(), we are still using the request. // Unwind the stack and defer the FailAndPrune. - set_state(IceCandidatePairState::FAILED); - port()->thread()->Post(RTC_FROM_HERE, this, - MSG_TCPCONNECTION_FAILED_CREATE_SOCKET); + port()->thread()->PostTask( + webrtc::ToQueuedTask(network_safety_, [this]() { FailAndPrune(); })); } } diff --git a/p2p/base/tcp_port.h b/p2p/base/tcp_port.h index 932af50aa4..07d483cc3f 100644 --- a/p2p/base/tcp_port.h +++ b/p2p/base/tcp_port.h @@ -20,6 +20,7 @@ #include "p2p/base/port.h" #include "rtc_base/async_packet_socket.h" #include "rtc_base/containers/flat_map.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" namespace cricket { @@ -135,8 +136,6 @@ class TCPConnection : public Connection { rtc::AsyncPacketSocket* socket() { return socket_.get(); } - void OnMessage(rtc::Message* pmsg) override; - // Allow test cases to overwrite the default timeout period. int reconnection_timeout() const { return reconnection_timeout_; } void set_reconnection_timeout(int timeout_in_ms) { @@ -144,11 +143,6 @@ class TCPConnection : public Connection { } protected: - enum { - MSG_TCPCONNECTION_DELAYED_ONCLOSE = Connection::MSG_FIRST_AVAILABLE, - MSG_TCPCONNECTION_FAILED_CREATE_SOCKET, - }; - // Set waiting_for_stun_binding_complete_ to false to allow data packets in // addition to what Port::OnConnectionRequestResponse does. void OnConnectionRequestResponse(ConnectionRequest* req, @@ -190,6 +184,8 @@ class TCPConnection : public Connection { // Allow test case to overwrite the default timeout period. int reconnection_timeout_; + webrtc::ScopedTaskSafety network_safety_; + friend class TCPPort; }; diff --git a/pc/scenario_tests/goog_cc_test.cc b/pc/scenario_tests/goog_cc_test.cc index 82ae47b0c7..f0a30dfd86 100644 --- a/pc/scenario_tests/goog_cc_test.cc +++ b/pc/scenario_tests/goog_cc_test.cc @@ -100,6 +100,9 @@ TEST(GoogCcPeerScenarioTest, MAYBE_NoBweChangeFromVideoUnmute) { s.ProcessMessages(TimeDelta::Seconds(1)); EXPECT_GE(get_bwe(), initial_bwe); } + + caller->pc()->Close(); + callee->pc()->Close(); } } // namespace test diff --git a/test/peer_scenario/tests/remote_estimate_test.cc b/test/peer_scenario/tests/remote_estimate_test.cc index 429a5b4ef6..9190f5c92e 100644 --- a/test/peer_scenario/tests/remote_estimate_test.cc +++ b/test/peer_scenario/tests/remote_estimate_test.cc @@ -102,6 +102,8 @@ TEST(RemoteEstimateEndToEnd, AudioUsesAbsSendTimeExtension) { } }); RTC_CHECK(s.WaitAndProcess(&received_abs_send_time)); + caller->pc()->Close(); + callee->pc()->Close(); } } // namespace test } // namespace webrtc diff --git a/test/peer_scenario/tests/unsignaled_stream_test.cc b/test/peer_scenario/tests/unsignaled_stream_test.cc index 5cb0405a62..4f478b4b2a 100644 --- a/test/peer_scenario/tests/unsignaled_stream_test.cc +++ b/test/peer_scenario/tests/unsignaled_stream_test.cc @@ -254,6 +254,8 @@ TEST_P(UnsignaledStreamTest, ReplacesUnsignaledStreamOnCompletedSignaling) { }); EXPECT_TRUE(s.WaitAndProcess(&offer_exchange_done)); EXPECT_TRUE(s.WaitAndProcess(&second_sink.frame_observed_)); + caller->pc()->Close(); + callee->pc()->Close(); } INSTANTIATE_TEST_SUITE_P(