diff --git a/pc/channel_manager.cc b/pc/channel_manager.cc index b583b94e3d..a0154e9e12 100644 --- a/pc/channel_manager.cc +++ b/pc/channel_manager.cc @@ -61,18 +61,20 @@ ChannelManager::ChannelManager( ChannelManager::~ChannelManager() { RTC_DCHECK_RUN_ON(signaling_thread_); - if (media_engine_) { - // While `media_engine_` is const throughout the ChannelManager's lifetime, - // it requires destruction to happen on the worker thread. Instead of - // marking the pointer as non-const, we live with this const_cast<> in the - // destructor. - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - const_cast&>(media_engine_).reset(); - }); - } - - RTC_DCHECK(voice_channels_.empty()); - RTC_DCHECK(video_channels_.empty()); + // While `media_engine_` is const throughout the ChannelManager's lifetime, + // it requires destruction to happen on the worker thread. Instead of + // marking the pointer as non-const, we live with this const_cast<> in the + // destructor. + // NOTE: Before removing this Invoke(), consider that it has a dual purpose. + // Besides resetting the media engine pointer, it also ensures that any + // potentially outstanding calls to `DestroyChannel` on the worker, will be + // completed. + worker_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK(voice_channels_.empty()); + RTC_DCHECK(video_channels_.empty()); + const_cast&>(media_engine_).reset(); + }); } void ChannelManager::GetSupportedAudioSendCodecs( @@ -248,11 +250,13 @@ void ChannelManager::DestroyVideoChannel(VideoChannel* channel) { void ChannelManager::DestroyChannel(ChannelInterface* channel) { RTC_DCHECK(channel); - // TODO(bugs.webrtc.org/11992): Change to either be called on the worker - // thread, or do this asynchronously on the worker. if (!worker_thread_->IsCurrent()) { - worker_thread_->Invoke(RTC_FROM_HERE, - [&] { DestroyChannel(channel); }); + // Delete the channel asynchronously on the worker thread. + // NOTE: This is made safe by a call to `worker_thread_->Invoke()` from + // the destructor, which acts as a 'flush' for any pending calls to + // DestroyChannel. If that Invoke() gets removed, we'll need to make + // adjustments here. + worker_thread_->PostTask([this, channel] { DestroyChannel(channel); }); return; } diff --git a/pc/channel_manager.h b/pc/channel_manager.h index 9e4e1bf131..a1c4efd55b 100644 --- a/pc/channel_manager.h +++ b/pc/channel_manager.h @@ -43,7 +43,7 @@ namespace cricket { // voice or just video channels. // ChannelManager also allows the application to discover what devices it has // using device manager. -class ChannelManager final : public ChannelFactoryInterface { +class ChannelManager : public ChannelFactoryInterface { public: // Returns an initialized instance of ChannelManager. // If media_engine is non-nullptr, then the returned ChannelManager instance @@ -110,7 +110,7 @@ class ChannelManager final : public ChannelFactoryInterface { // Stops recording AEC dump. void StopAecDump(); - private: + protected: ChannelManager(std::unique_ptr media_engine, bool enable_rtx, rtc::Thread* worker_thread, @@ -122,6 +122,7 @@ class ChannelManager final : public ChannelFactoryInterface { // Destroys a video channel created by CreateVideoChannel. void DestroyVideoChannel(VideoChannel* video_channel); + private: const std::unique_ptr media_engine_; // Nullable. rtc::Thread* const signaling_thread_; rtc::Thread* const worker_thread_; diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc index cbb17c0732..6719d0bfee 100644 --- a/pc/rtc_stats_collector_unittest.cc +++ b/pc/rtc_stats_collector_unittest.cc @@ -531,7 +531,7 @@ class RTCStatsCollectorWrapper { rtc::scoped_refptr(local_audio_track), voice_sender_info.local_stats[0].ssrc, voice_sender_info.local_stats[0].ssrc + 10, local_stream_ids); - EXPECT_CALL(*rtp_sender, SetMediaChannel(_)); + EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return()); pc_->AddSender(rtp_sender); } @@ -568,7 +568,7 @@ class RTCStatsCollectorWrapper { rtc::scoped_refptr(local_video_track), video_sender_info.local_stats[0].ssrc, video_sender_info.local_stats[0].ssrc + 10, local_stream_ids); - EXPECT_CALL(*rtp_sender, SetMediaChannel(_)); + EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return()); pc_->AddSender(rtp_sender); } diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index becf8f2cca..c8afec899a 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -154,6 +154,8 @@ RtpTransceiver::~RtpTransceiver() { RTC_DCHECK_RUN_ON(thread_); StopInternal(); } + + RTC_CHECK(!channel_) << "Missing call to SetChannel(nullptr)?"; } void RtpTransceiver::SetChannel( @@ -161,7 +163,7 @@ void RtpTransceiver::SetChannel( std::function transport_lookup) { RTC_DCHECK_RUN_ON(thread_); // Cannot set a non-null channel on a stopped transceiver. - if (stopped_ && channel) { + if ((stopped_ && channel) || channel == channel_) { return; } @@ -180,6 +182,8 @@ void RtpTransceiver::SetChannel( signaling_thread_safety_ = PendingTaskSafetyFlag::Create(); } + cricket::ChannelInterface* channel_to_delete = nullptr; + // An alternative to this, could be to require SetChannel to be called // on the network thread. The channel object operates for the most part // on the network thread, as part of its initialization being on the network @@ -193,6 +197,7 @@ void RtpTransceiver::SetChannel( if (channel_) { channel_->SetFirstPacketReceivedCallback(nullptr); channel_->SetRtpTransport(nullptr); + channel_to_delete = channel_; } channel_ = channel; @@ -221,6 +226,12 @@ void RtpTransceiver::SetChannel( receiver->internal()->SetMediaChannel(channel_->media_channel()); } } + + // Destroy the channel, if we had one, now _after_ updating the receivers who + // might have had references to the previous channel. + if (channel_to_delete) { + channel_manager_->DestroyChannel(channel_to_delete); + } } void RtpTransceiver::AddSender( diff --git a/pc/rtp_transceiver_unittest.cc b/pc/rtp_transceiver_unittest.cc index 0122c0d64d..d63af92169 100644 --- a/pc/rtp_transceiver_unittest.cc +++ b/pc/rtp_transceiver_unittest.cc @@ -32,12 +32,24 @@ using ::testing::ReturnRef; namespace webrtc { +namespace { +class ChannelManagerForTest : public cricket::ChannelManager { + public: + ChannelManagerForTest() + : cricket::ChannelManager(std::make_unique(), + true, + rtc::Thread::Current(), + rtc::Thread::Current()) {} + + MOCK_METHOD(void, DestroyChannel, (cricket::ChannelInterface*), (override)); +}; +} // namespace + // Checks that a channel cannot be set on a stopped `RtpTransceiver`. TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { - auto cm = cricket::ChannelManager::Create( - nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + ChannelManagerForTest cm; const std::string content_name("my_mid"); - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, cm.get()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, &cm); cricket::MockChannelInterface channel1; EXPECT_CALL(channel1, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); @@ -62,14 +74,18 @@ TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { // Channel can no longer be set, so this call should be a no-op. transceiver.SetChannel(&channel2, [](const std::string&) { return nullptr; }); EXPECT_EQ(&channel1, transceiver.channel()); + + // Clear the current channel before `transceiver` goes out of scope. + EXPECT_CALL(channel1, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(cm, DestroyChannel(&channel1)).WillRepeatedly(testing::Return()); + transceiver.SetChannel(nullptr, nullptr); } // Checks that a channel can be unset on a stopped `RtpTransceiver` TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { - auto cm = cricket::ChannelManager::Create( - nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + ChannelManagerForTest cm; const std::string content_name("my_mid"); - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, cm.get()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, &cm); cricket::MockChannelInterface channel; EXPECT_CALL(channel, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_VIDEO)); @@ -77,6 +93,7 @@ TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { EXPECT_CALL(channel, SetFirstPacketReceivedCallback(_)) .WillRepeatedly(testing::Return()); EXPECT_CALL(channel, SetRtpTransport(_)).WillRepeatedly(Return(true)); + EXPECT_CALL(cm, DestroyChannel(&channel)).WillRepeatedly(testing::Return()); transceiver.SetChannel(&channel, [&](const std::string& mid) { EXPECT_EQ(mid, content_name); @@ -96,20 +113,15 @@ TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { class RtpTransceiverUnifiedPlanTest : public ::testing::Test { public: RtpTransceiverUnifiedPlanTest() - : channel_manager_(cricket::ChannelManager::Create( - std::make_unique(), - false, - rtc::Thread::Current(), - rtc::Thread::Current())), - transceiver_(RtpSenderProxyWithInternal::Create( + : transceiver_(RtpSenderProxyWithInternal::Create( rtc::Thread::Current(), sender_), RtpReceiverProxyWithInternal::Create( rtc::Thread::Current(), rtc::Thread::Current(), receiver_), - channel_manager_.get(), - channel_manager_->GetSupportedAudioRtpHeaderExtensions(), + &channel_manager_, + channel_manager_.GetSupportedAudioRtpHeaderExtensions(), /* on_negotiation_needed= */ [] {}) {} static rtc::scoped_refptr MockReceiver() { @@ -128,7 +140,7 @@ class RtpTransceiverUnifiedPlanTest : public ::testing::Test { rtc::scoped_refptr receiver_ = MockReceiver(); rtc::scoped_refptr sender_ = MockSender(); - std::unique_ptr channel_manager_; + ChannelManagerForTest channel_manager_; RtpTransceiver transceiver_; }; @@ -153,12 +165,7 @@ TEST_F(RtpTransceiverUnifiedPlanTest, StopSetsDirection) { class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { public: RtpTransceiverTestForHeaderExtensions() - : channel_manager_(cricket::ChannelManager::Create( - std::make_unique(), - false, - rtc::Thread::Current(), - rtc::Thread::Current())), - extensions_( + : extensions_( {RtpHeaderExtensionCapability("uri1", 1, RtpTransceiverDirection::kSendOnly), @@ -178,7 +185,7 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { rtc::Thread::Current(), rtc::Thread::Current(), receiver_), - channel_manager_.get(), + &channel_manager_, extensions_, /* on_negotiation_needed= */ [] {}) {} @@ -196,10 +203,19 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { return sender; } + void ClearChannel(cricket::MockChannelInterface& mock_channel) { + EXPECT_CALL(*sender_.get(), SetMediaChannel(nullptr)); + EXPECT_CALL(*receiver_.get(), Stop()); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(channel_manager_, DestroyChannel(&mock_channel)) + .WillRepeatedly(testing::Return()); + transceiver_.SetChannel(nullptr, nullptr); + } + rtc::scoped_refptr receiver_ = MockReceiver(); rtc::scoped_refptr sender_ = MockSender(); - std::unique_ptr channel_manager_; + ChannelManagerForTest channel_manager_; std::vector extensions_; RtpTransceiver transceiver_; }; @@ -307,6 +323,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, transceiver_.SetChannel(&mock_channel, [](const std::string&) { return nullptr; }); EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre()); + + ClearChannel(mock_channel); } TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { @@ -338,6 +356,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { "uri1", 1, RtpTransceiverDirection::kSendRecv), RtpHeaderExtensionCapability( "uri2", 2, RtpTransceiverDirection::kSendRecv))); + + ClearChannel(mock_channel); } TEST_F(RtpTransceiverTestForHeaderExtensions, diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index e2be8e0116..553704c02b 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -2893,7 +2893,7 @@ RTCError SdpOfferAnswerHandler::Rollback(SdpType desc_type) { } RTC_DCHECK(transceiver->internal()->mid().has_value()); - DestroyTransceiverChannel(transceiver); + transceiver->internal()->SetChannel(nullptr, nullptr); if (signaling_state() == PeerConnectionInterface::kHaveRemoteOffer && transceiver->receiver()) { @@ -3547,7 +3547,6 @@ RTCError SdpOfferAnswerHandler::UpdateTransceiverChannel( if (content.rejected) { if (channel) { transceiver->internal()->SetChannel(nullptr, nullptr); - channel_manager()->DestroyChannel(channel); } } else { if (!channel) { @@ -4541,12 +4540,14 @@ void SdpOfferAnswerHandler::RemoveUnusedChannels( // voice channel. const cricket::ContentInfo* video_info = cricket::GetFirstVideoContent(desc); if (!video_info || video_info->rejected) { - DestroyTransceiverChannel(rtp_manager()->GetVideoTransceiver()); + rtp_manager()->GetVideoTransceiver()->internal()->SetChannel(nullptr, + nullptr); } const cricket::ContentInfo* audio_info = cricket::GetFirstAudioContent(desc); if (!audio_info || audio_info->rejected) { - DestroyTransceiverChannel(rtp_manager()->GetAudioTransceiver()); + rtp_manager()->GetAudioTransceiver()->internal()->SetChannel(nullptr, + nullptr); } const cricket::ContentInfo* data_info = cricket::GetFirstDataContent(desc); @@ -4822,29 +4823,6 @@ bool SdpOfferAnswerHandler::CreateDataChannel(const std::string& mid) { return true; } -void SdpOfferAnswerHandler::DestroyTransceiverChannel( - rtc::scoped_refptr> - transceiver) { - TRACE_EVENT0("webrtc", "SdpOfferAnswerHandler::DestroyTransceiverChannel"); - RTC_DCHECK(transceiver); - RTC_LOG_THREAD_BLOCK_COUNT(); - - cricket::ChannelInterface* channel = transceiver->internal()->channel(); - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); - if (channel) { - // TODO(tommi): VideoRtpReceiver::SetMediaChannel blocks and jumps to the - // worker thread. When being set to nullptr, there are additional - // blocking calls to e.g. ClearRecordableEncodedFrameCallback which triggers - // another blocking call or Stop() for video channels. - // The channel object also needs to be de-initialized on the network thread - // so if ownership of the channel object lies with the transceiver, we could - // un-set the channel pointer and uninitialize/destruct the channel object - // at the same time, rather than in separate steps. - transceiver->internal()->SetChannel(nullptr, nullptr); - channel_manager()->DestroyChannel(channel); - } -} - void SdpOfferAnswerHandler::DestroyDataChannelTransport(RTCError error) { RTC_DCHECK_RUN_ON(signaling_thread()); const bool has_sctp = pc_->sctp_mid().has_value(); @@ -4876,12 +4854,12 @@ void SdpOfferAnswerHandler::DestroyAllChannels() { for (const auto& transceiver : list) { if (transceiver->media_type() == cricket::MEDIA_TYPE_VIDEO) { - DestroyTransceiverChannel(transceiver); + transceiver->internal()->SetChannel(nullptr, nullptr); } } for (const auto& transceiver : list) { if (transceiver->media_type() == cricket::MEDIA_TYPE_AUDIO) { - DestroyTransceiverChannel(transceiver); + transceiver->internal()->SetChannel(nullptr, nullptr); } } diff --git a/pc/sdp_offer_answer.h b/pc/sdp_offer_answer.h index e7851d8d6c..5baeaae548 100644 --- a/pc/sdp_offer_answer.h +++ b/pc/sdp_offer_answer.h @@ -533,12 +533,6 @@ class SdpOfferAnswerHandler : public SdpStateProvider, cricket::VideoChannel* CreateVideoChannel(const std::string& mid); bool CreateDataChannel(const std::string& mid); - // Destroys and clears the BaseChannel associated with the given transceiver, - // if such channel is set. - void DestroyTransceiverChannel( - rtc::scoped_refptr> - transceiver); - // Destroys the RTP data channel transport and/or the SCTP data channel // transport and clears it. void DestroyDataChannelTransport(RTCError error); diff --git a/pc/stats_collector_unittest.cc b/pc/stats_collector_unittest.cc index 5113c3ed5e..520f24c83d 100644 --- a/pc/stats_collector_unittest.cc +++ b/pc/stats_collector_unittest.cc @@ -54,6 +54,8 @@ using cricket::VideoSenderInfo; using cricket::VoiceMediaInfo; using cricket::VoiceReceiverInfo; using cricket::VoiceSenderInfo; +using ::testing::_; +using ::testing::AtMost; using ::testing::Return; using ::testing::UnorderedElementsAre; @@ -745,6 +747,9 @@ static rtc::scoped_refptr CreateMockSender( Return(track->kind() == MediaStreamTrackInterface::kAudioKind ? cricket::MEDIA_TYPE_AUDIO : cricket::MEDIA_TYPE_VIDEO)); + EXPECT_CALL(*sender, SetMediaChannel(_)).Times(AtMost(2)); + EXPECT_CALL(*sender, SetTransceiverAsStopped()).Times(AtMost(1)); + EXPECT_CALL(*sender, Stop()); return sender; } @@ -759,6 +764,9 @@ static rtc::scoped_refptr CreateMockReceiver( Return(track->kind() == MediaStreamTrackInterface::kAudioKind ? cricket::MEDIA_TYPE_AUDIO : cricket::MEDIA_TYPE_VIDEO)); + EXPECT_CALL(*receiver, SetMediaChannel(_)).Times(AtMost(1)); + EXPECT_CALL(*receiver, Stop()); + EXPECT_CALL(*receiver, StopAndEndTrack()); return receiver; } diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index 877949d2e0..8df9eeb032 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -152,6 +152,12 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { local_streams_(StreamCollection::Create()), remote_streams_(StreamCollection::Create()) {} + ~FakePeerConnectionForStats() { + for (auto transceiver : transceivers_) { + transceiver->internal()->SetChannel(nullptr, nullptr); + } + } + rtc::scoped_refptr mutable_local_streams() { return local_streams_; } @@ -205,7 +211,8 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { voice_channel_ = std::make_unique( worker_thread_, network_thread_, signaling_thread_, std::move(voice_media_channel), mid, kDefaultSrtpRequired, - webrtc::CryptoOptions(), &ssrc_generator_, transport_name); + webrtc::CryptoOptions(), &channel_manager_.ssrc_generator(), + transport_name); GetOrCreateFirstTransceiverOfType(cricket::MEDIA_TYPE_AUDIO) ->internal() ->SetChannel(voice_channel_.get(), @@ -223,7 +230,8 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { video_channel_ = std::make_unique( worker_thread_, network_thread_, signaling_thread_, std::move(video_media_channel), mid, kDefaultSrtpRequired, - webrtc::CryptoOptions(), &ssrc_generator_, transport_name); + webrtc::CryptoOptions(), &channel_manager_.ssrc_generator(), + transport_name); GetOrCreateFirstTransceiverOfType(cricket::MEDIA_TYPE_VIDEO) ->internal() ->SetChannel(video_channel_.get(), @@ -394,21 +402,26 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { } } auto transceiver = RtpTransceiverProxyWithInternal::Create( - signaling_thread_, - new RtpTransceiver(media_type, channel_manager_.get())); + signaling_thread_, new RtpTransceiver(media_type, &channel_manager_)); transceivers_.push_back(transceiver); return transceiver; } + class TestChannelManager : public cricket::ChannelManager { + public: + TestChannelManager(rtc::Thread* worker, rtc::Thread* network) + : cricket::ChannelManager(nullptr, true, worker, network) {} + + // Override DestroyChannel so that calls from the transceiver won't go to + // the default ChannelManager implementation. + void DestroyChannel(cricket::ChannelInterface*) override {} + }; + rtc::Thread* const network_thread_; rtc::Thread* const worker_thread_; rtc::Thread* const signaling_thread_; - std::unique_ptr channel_manager_ = - cricket::ChannelManager::Create(nullptr /* MediaEngineInterface */, - true, - worker_thread_, - network_thread_); + TestChannelManager channel_manager_{worker_thread_, network_thread_}; rtc::scoped_refptr local_streams_; rtc::scoped_refptr remote_streams_; @@ -432,8 +445,6 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { local_certificates_by_transport_; std::map> remote_cert_chains_by_transport_; - - rtc::UniqueRandomIdGenerator ssrc_generator_; }; } // namespace webrtc