From 16de21696a275376326b49a8a60f5d27462fd32f Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Wed, 26 Jan 2022 10:21:57 +0100 Subject: [PATCH] Delete channel objects asynchronously from the transceiver. Move deletion of channel objects over to the RtpTransceiver instead of having it done by SdpOfferAnswer. The deletion is now also done via PostTask rather than Invoke. Bug: webrtc:11992, webrtc:13540 Change-Id: I5aff14956d5e572ca8816bbfef8739bb609b4484 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248170 Reviewed-by: Harald Alvestrand Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#35798} --- pc/channel_manager.cc | 36 +++++++------ pc/channel_manager.h | 5 +- pc/rtc_stats_collector_unittest.cc | 4 +- pc/rtp_transceiver.cc | 13 ++++- pc/rtp_transceiver_unittest.cc | 66 +++++++++++++++--------- pc/sdp_offer_answer.cc | 36 +++---------- pc/sdp_offer_answer.h | 6 --- pc/stats_collector_unittest.cc | 8 +++ pc/test/fake_peer_connection_for_stats.h | 33 ++++++++---- 9 files changed, 117 insertions(+), 90 deletions(-) diff --git a/pc/channel_manager.cc b/pc/channel_manager.cc index b583b94e3d..a0154e9e12 100644 --- a/pc/channel_manager.cc +++ b/pc/channel_manager.cc @@ -61,18 +61,20 @@ ChannelManager::ChannelManager( ChannelManager::~ChannelManager() { RTC_DCHECK_RUN_ON(signaling_thread_); - if (media_engine_) { - // While `media_engine_` is const throughout the ChannelManager's lifetime, - // it requires destruction to happen on the worker thread. Instead of - // marking the pointer as non-const, we live with this const_cast<> in the - // destructor. - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - const_cast&>(media_engine_).reset(); - }); - } - - RTC_DCHECK(voice_channels_.empty()); - RTC_DCHECK(video_channels_.empty()); + // While `media_engine_` is const throughout the ChannelManager's lifetime, + // it requires destruction to happen on the worker thread. Instead of + // marking the pointer as non-const, we live with this const_cast<> in the + // destructor. + // NOTE: Before removing this Invoke(), consider that it has a dual purpose. + // Besides resetting the media engine pointer, it also ensures that any + // potentially outstanding calls to `DestroyChannel` on the worker, will be + // completed. + worker_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK(voice_channels_.empty()); + RTC_DCHECK(video_channels_.empty()); + const_cast&>(media_engine_).reset(); + }); } void ChannelManager::GetSupportedAudioSendCodecs( @@ -248,11 +250,13 @@ void ChannelManager::DestroyVideoChannel(VideoChannel* channel) { void ChannelManager::DestroyChannel(ChannelInterface* channel) { RTC_DCHECK(channel); - // TODO(bugs.webrtc.org/11992): Change to either be called on the worker - // thread, or do this asynchronously on the worker. if (!worker_thread_->IsCurrent()) { - worker_thread_->Invoke(RTC_FROM_HERE, - [&] { DestroyChannel(channel); }); + // Delete the channel asynchronously on the worker thread. + // NOTE: This is made safe by a call to `worker_thread_->Invoke()` from + // the destructor, which acts as a 'flush' for any pending calls to + // DestroyChannel. If that Invoke() gets removed, we'll need to make + // adjustments here. + worker_thread_->PostTask([this, channel] { DestroyChannel(channel); }); return; } diff --git a/pc/channel_manager.h b/pc/channel_manager.h index 9e4e1bf131..a1c4efd55b 100644 --- a/pc/channel_manager.h +++ b/pc/channel_manager.h @@ -43,7 +43,7 @@ namespace cricket { // voice or just video channels. // ChannelManager also allows the application to discover what devices it has // using device manager. -class ChannelManager final : public ChannelFactoryInterface { +class ChannelManager : public ChannelFactoryInterface { public: // Returns an initialized instance of ChannelManager. // If media_engine is non-nullptr, then the returned ChannelManager instance @@ -110,7 +110,7 @@ class ChannelManager final : public ChannelFactoryInterface { // Stops recording AEC dump. void StopAecDump(); - private: + protected: ChannelManager(std::unique_ptr media_engine, bool enable_rtx, rtc::Thread* worker_thread, @@ -122,6 +122,7 @@ class ChannelManager final : public ChannelFactoryInterface { // Destroys a video channel created by CreateVideoChannel. void DestroyVideoChannel(VideoChannel* video_channel); + private: const std::unique_ptr media_engine_; // Nullable. rtc::Thread* const signaling_thread_; rtc::Thread* const worker_thread_; diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc index cbb17c0732..6719d0bfee 100644 --- a/pc/rtc_stats_collector_unittest.cc +++ b/pc/rtc_stats_collector_unittest.cc @@ -531,7 +531,7 @@ class RTCStatsCollectorWrapper { rtc::scoped_refptr(local_audio_track), voice_sender_info.local_stats[0].ssrc, voice_sender_info.local_stats[0].ssrc + 10, local_stream_ids); - EXPECT_CALL(*rtp_sender, SetMediaChannel(_)); + EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return()); pc_->AddSender(rtp_sender); } @@ -568,7 +568,7 @@ class RTCStatsCollectorWrapper { rtc::scoped_refptr(local_video_track), video_sender_info.local_stats[0].ssrc, video_sender_info.local_stats[0].ssrc + 10, local_stream_ids); - EXPECT_CALL(*rtp_sender, SetMediaChannel(_)); + EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return()); pc_->AddSender(rtp_sender); } diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index becf8f2cca..c8afec899a 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -154,6 +154,8 @@ RtpTransceiver::~RtpTransceiver() { RTC_DCHECK_RUN_ON(thread_); StopInternal(); } + + RTC_CHECK(!channel_) << "Missing call to SetChannel(nullptr)?"; } void RtpTransceiver::SetChannel( @@ -161,7 +163,7 @@ void RtpTransceiver::SetChannel( std::function transport_lookup) { RTC_DCHECK_RUN_ON(thread_); // Cannot set a non-null channel on a stopped transceiver. - if (stopped_ && channel) { + if ((stopped_ && channel) || channel == channel_) { return; } @@ -180,6 +182,8 @@ void RtpTransceiver::SetChannel( signaling_thread_safety_ = PendingTaskSafetyFlag::Create(); } + cricket::ChannelInterface* channel_to_delete = nullptr; + // An alternative to this, could be to require SetChannel to be called // on the network thread. The channel object operates for the most part // on the network thread, as part of its initialization being on the network @@ -193,6 +197,7 @@ void RtpTransceiver::SetChannel( if (channel_) { channel_->SetFirstPacketReceivedCallback(nullptr); channel_->SetRtpTransport(nullptr); + channel_to_delete = channel_; } channel_ = channel; @@ -221,6 +226,12 @@ void RtpTransceiver::SetChannel( receiver->internal()->SetMediaChannel(channel_->media_channel()); } } + + // Destroy the channel, if we had one, now _after_ updating the receivers who + // might have had references to the previous channel. + if (channel_to_delete) { + channel_manager_->DestroyChannel(channel_to_delete); + } } void RtpTransceiver::AddSender( diff --git a/pc/rtp_transceiver_unittest.cc b/pc/rtp_transceiver_unittest.cc index 0122c0d64d..d63af92169 100644 --- a/pc/rtp_transceiver_unittest.cc +++ b/pc/rtp_transceiver_unittest.cc @@ -32,12 +32,24 @@ using ::testing::ReturnRef; namespace webrtc { +namespace { +class ChannelManagerForTest : public cricket::ChannelManager { + public: + ChannelManagerForTest() + : cricket::ChannelManager(std::make_unique(), + true, + rtc::Thread::Current(), + rtc::Thread::Current()) {} + + MOCK_METHOD(void, DestroyChannel, (cricket::ChannelInterface*), (override)); +}; +} // namespace + // Checks that a channel cannot be set on a stopped `RtpTransceiver`. TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { - auto cm = cricket::ChannelManager::Create( - nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + ChannelManagerForTest cm; const std::string content_name("my_mid"); - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, cm.get()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, &cm); cricket::MockChannelInterface channel1; EXPECT_CALL(channel1, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); @@ -62,14 +74,18 @@ TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { // Channel can no longer be set, so this call should be a no-op. transceiver.SetChannel(&channel2, [](const std::string&) { return nullptr; }); EXPECT_EQ(&channel1, transceiver.channel()); + + // Clear the current channel before `transceiver` goes out of scope. + EXPECT_CALL(channel1, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(cm, DestroyChannel(&channel1)).WillRepeatedly(testing::Return()); + transceiver.SetChannel(nullptr, nullptr); } // Checks that a channel can be unset on a stopped `RtpTransceiver` TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { - auto cm = cricket::ChannelManager::Create( - nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + ChannelManagerForTest cm; const std::string content_name("my_mid"); - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, cm.get()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, &cm); cricket::MockChannelInterface channel; EXPECT_CALL(channel, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_VIDEO)); @@ -77,6 +93,7 @@ TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { EXPECT_CALL(channel, SetFirstPacketReceivedCallback(_)) .WillRepeatedly(testing::Return()); EXPECT_CALL(channel, SetRtpTransport(_)).WillRepeatedly(Return(true)); + EXPECT_CALL(cm, DestroyChannel(&channel)).WillRepeatedly(testing::Return()); transceiver.SetChannel(&channel, [&](const std::string& mid) { EXPECT_EQ(mid, content_name); @@ -96,20 +113,15 @@ TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { class RtpTransceiverUnifiedPlanTest : public ::testing::Test { public: RtpTransceiverUnifiedPlanTest() - : channel_manager_(cricket::ChannelManager::Create( - std::make_unique(), - false, - rtc::Thread::Current(), - rtc::Thread::Current())), - transceiver_(RtpSenderProxyWithInternal::Create( + : transceiver_(RtpSenderProxyWithInternal::Create( rtc::Thread::Current(), sender_), RtpReceiverProxyWithInternal::Create( rtc::Thread::Current(), rtc::Thread::Current(), receiver_), - channel_manager_.get(), - channel_manager_->GetSupportedAudioRtpHeaderExtensions(), + &channel_manager_, + channel_manager_.GetSupportedAudioRtpHeaderExtensions(), /* on_negotiation_needed= */ [] {}) {} static rtc::scoped_refptr MockReceiver() { @@ -128,7 +140,7 @@ class RtpTransceiverUnifiedPlanTest : public ::testing::Test { rtc::scoped_refptr receiver_ = MockReceiver(); rtc::scoped_refptr sender_ = MockSender(); - std::unique_ptr channel_manager_; + ChannelManagerForTest channel_manager_; RtpTransceiver transceiver_; }; @@ -153,12 +165,7 @@ TEST_F(RtpTransceiverUnifiedPlanTest, StopSetsDirection) { class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { public: RtpTransceiverTestForHeaderExtensions() - : channel_manager_(cricket::ChannelManager::Create( - std::make_unique(), - false, - rtc::Thread::Current(), - rtc::Thread::Current())), - extensions_( + : extensions_( {RtpHeaderExtensionCapability("uri1", 1, RtpTransceiverDirection::kSendOnly), @@ -178,7 +185,7 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { rtc::Thread::Current(), rtc::Thread::Current(), receiver_), - channel_manager_.get(), + &channel_manager_, extensions_, /* on_negotiation_needed= */ [] {}) {} @@ -196,10 +203,19 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { return sender; } + void ClearChannel(cricket::MockChannelInterface& mock_channel) { + EXPECT_CALL(*sender_.get(), SetMediaChannel(nullptr)); + EXPECT_CALL(*receiver_.get(), Stop()); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(channel_manager_, DestroyChannel(&mock_channel)) + .WillRepeatedly(testing::Return()); + transceiver_.SetChannel(nullptr, nullptr); + } + rtc::scoped_refptr receiver_ = MockReceiver(); rtc::scoped_refptr sender_ = MockSender(); - std::unique_ptr channel_manager_; + ChannelManagerForTest channel_manager_; std::vector extensions_; RtpTransceiver transceiver_; }; @@ -307,6 +323,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, transceiver_.SetChannel(&mock_channel, [](const std::string&) { return nullptr; }); EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre()); + + ClearChannel(mock_channel); } TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { @@ -338,6 +356,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { "uri1", 1, RtpTransceiverDirection::kSendRecv), RtpHeaderExtensionCapability( "uri2", 2, RtpTransceiverDirection::kSendRecv))); + + ClearChannel(mock_channel); } TEST_F(RtpTransceiverTestForHeaderExtensions, diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index e2be8e0116..553704c02b 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -2893,7 +2893,7 @@ RTCError SdpOfferAnswerHandler::Rollback(SdpType desc_type) { } RTC_DCHECK(transceiver->internal()->mid().has_value()); - DestroyTransceiverChannel(transceiver); + transceiver->internal()->SetChannel(nullptr, nullptr); if (signaling_state() == PeerConnectionInterface::kHaveRemoteOffer && transceiver->receiver()) { @@ -3547,7 +3547,6 @@ RTCError SdpOfferAnswerHandler::UpdateTransceiverChannel( if (content.rejected) { if (channel) { transceiver->internal()->SetChannel(nullptr, nullptr); - channel_manager()->DestroyChannel(channel); } } else { if (!channel) { @@ -4541,12 +4540,14 @@ void SdpOfferAnswerHandler::RemoveUnusedChannels( // voice channel. const cricket::ContentInfo* video_info = cricket::GetFirstVideoContent(desc); if (!video_info || video_info->rejected) { - DestroyTransceiverChannel(rtp_manager()->GetVideoTransceiver()); + rtp_manager()->GetVideoTransceiver()->internal()->SetChannel(nullptr, + nullptr); } const cricket::ContentInfo* audio_info = cricket::GetFirstAudioContent(desc); if (!audio_info || audio_info->rejected) { - DestroyTransceiverChannel(rtp_manager()->GetAudioTransceiver()); + rtp_manager()->GetAudioTransceiver()->internal()->SetChannel(nullptr, + nullptr); } const cricket::ContentInfo* data_info = cricket::GetFirstDataContent(desc); @@ -4822,29 +4823,6 @@ bool SdpOfferAnswerHandler::CreateDataChannel(const std::string& mid) { return true; } -void SdpOfferAnswerHandler::DestroyTransceiverChannel( - rtc::scoped_refptr> - transceiver) { - TRACE_EVENT0("webrtc", "SdpOfferAnswerHandler::DestroyTransceiverChannel"); - RTC_DCHECK(transceiver); - RTC_LOG_THREAD_BLOCK_COUNT(); - - cricket::ChannelInterface* channel = transceiver->internal()->channel(); - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); - if (channel) { - // TODO(tommi): VideoRtpReceiver::SetMediaChannel blocks and jumps to the - // worker thread. When being set to nullptr, there are additional - // blocking calls to e.g. ClearRecordableEncodedFrameCallback which triggers - // another blocking call or Stop() for video channels. - // The channel object also needs to be de-initialized on the network thread - // so if ownership of the channel object lies with the transceiver, we could - // un-set the channel pointer and uninitialize/destruct the channel object - // at the same time, rather than in separate steps. - transceiver->internal()->SetChannel(nullptr, nullptr); - channel_manager()->DestroyChannel(channel); - } -} - void SdpOfferAnswerHandler::DestroyDataChannelTransport(RTCError error) { RTC_DCHECK_RUN_ON(signaling_thread()); const bool has_sctp = pc_->sctp_mid().has_value(); @@ -4876,12 +4854,12 @@ void SdpOfferAnswerHandler::DestroyAllChannels() { for (const auto& transceiver : list) { if (transceiver->media_type() == cricket::MEDIA_TYPE_VIDEO) { - DestroyTransceiverChannel(transceiver); + transceiver->internal()->SetChannel(nullptr, nullptr); } } for (const auto& transceiver : list) { if (transceiver->media_type() == cricket::MEDIA_TYPE_AUDIO) { - DestroyTransceiverChannel(transceiver); + transceiver->internal()->SetChannel(nullptr, nullptr); } } diff --git a/pc/sdp_offer_answer.h b/pc/sdp_offer_answer.h index e7851d8d6c..5baeaae548 100644 --- a/pc/sdp_offer_answer.h +++ b/pc/sdp_offer_answer.h @@ -533,12 +533,6 @@ class SdpOfferAnswerHandler : public SdpStateProvider, cricket::VideoChannel* CreateVideoChannel(const std::string& mid); bool CreateDataChannel(const std::string& mid); - // Destroys and clears the BaseChannel associated with the given transceiver, - // if such channel is set. - void DestroyTransceiverChannel( - rtc::scoped_refptr> - transceiver); - // Destroys the RTP data channel transport and/or the SCTP data channel // transport and clears it. void DestroyDataChannelTransport(RTCError error); diff --git a/pc/stats_collector_unittest.cc b/pc/stats_collector_unittest.cc index 5113c3ed5e..520f24c83d 100644 --- a/pc/stats_collector_unittest.cc +++ b/pc/stats_collector_unittest.cc @@ -54,6 +54,8 @@ using cricket::VideoSenderInfo; using cricket::VoiceMediaInfo; using cricket::VoiceReceiverInfo; using cricket::VoiceSenderInfo; +using ::testing::_; +using ::testing::AtMost; using ::testing::Return; using ::testing::UnorderedElementsAre; @@ -745,6 +747,9 @@ static rtc::scoped_refptr CreateMockSender( Return(track->kind() == MediaStreamTrackInterface::kAudioKind ? cricket::MEDIA_TYPE_AUDIO : cricket::MEDIA_TYPE_VIDEO)); + EXPECT_CALL(*sender, SetMediaChannel(_)).Times(AtMost(2)); + EXPECT_CALL(*sender, SetTransceiverAsStopped()).Times(AtMost(1)); + EXPECT_CALL(*sender, Stop()); return sender; } @@ -759,6 +764,9 @@ static rtc::scoped_refptr CreateMockReceiver( Return(track->kind() == MediaStreamTrackInterface::kAudioKind ? cricket::MEDIA_TYPE_AUDIO : cricket::MEDIA_TYPE_VIDEO)); + EXPECT_CALL(*receiver, SetMediaChannel(_)).Times(AtMost(1)); + EXPECT_CALL(*receiver, Stop()); + EXPECT_CALL(*receiver, StopAndEndTrack()); return receiver; } diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index 877949d2e0..8df9eeb032 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -152,6 +152,12 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { local_streams_(StreamCollection::Create()), remote_streams_(StreamCollection::Create()) {} + ~FakePeerConnectionForStats() { + for (auto transceiver : transceivers_) { + transceiver->internal()->SetChannel(nullptr, nullptr); + } + } + rtc::scoped_refptr mutable_local_streams() { return local_streams_; } @@ -205,7 +211,8 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { voice_channel_ = std::make_unique( worker_thread_, network_thread_, signaling_thread_, std::move(voice_media_channel), mid, kDefaultSrtpRequired, - webrtc::CryptoOptions(), &ssrc_generator_, transport_name); + webrtc::CryptoOptions(), &channel_manager_.ssrc_generator(), + transport_name); GetOrCreateFirstTransceiverOfType(cricket::MEDIA_TYPE_AUDIO) ->internal() ->SetChannel(voice_channel_.get(), @@ -223,7 +230,8 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { video_channel_ = std::make_unique( worker_thread_, network_thread_, signaling_thread_, std::move(video_media_channel), mid, kDefaultSrtpRequired, - webrtc::CryptoOptions(), &ssrc_generator_, transport_name); + webrtc::CryptoOptions(), &channel_manager_.ssrc_generator(), + transport_name); GetOrCreateFirstTransceiverOfType(cricket::MEDIA_TYPE_VIDEO) ->internal() ->SetChannel(video_channel_.get(), @@ -394,21 +402,26 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { } } auto transceiver = RtpTransceiverProxyWithInternal::Create( - signaling_thread_, - new RtpTransceiver(media_type, channel_manager_.get())); + signaling_thread_, new RtpTransceiver(media_type, &channel_manager_)); transceivers_.push_back(transceiver); return transceiver; } + class TestChannelManager : public cricket::ChannelManager { + public: + TestChannelManager(rtc::Thread* worker, rtc::Thread* network) + : cricket::ChannelManager(nullptr, true, worker, network) {} + + // Override DestroyChannel so that calls from the transceiver won't go to + // the default ChannelManager implementation. + void DestroyChannel(cricket::ChannelInterface*) override {} + }; + rtc::Thread* const network_thread_; rtc::Thread* const worker_thread_; rtc::Thread* const signaling_thread_; - std::unique_ptr channel_manager_ = - cricket::ChannelManager::Create(nullptr /* MediaEngineInterface */, - true, - worker_thread_, - network_thread_); + TestChannelManager channel_manager_{worker_thread_, network_thread_}; rtc::scoped_refptr local_streams_; rtc::scoped_refptr remote_streams_; @@ -432,8 +445,6 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { local_certificates_by_transport_; std::map> remote_cert_chains_by_transport_; - - rtc::UniqueRandomIdGenerator ssrc_generator_; }; } // namespace webrtc