From 1933d3b677f5f597f03cbea87750f70be60bb069 Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Mon, 17 Jan 2022 11:25:21 +0100 Subject: [PATCH] Move network thread invokes for initialization for media channels, out. Remove Init_w and Deinit(), both of which were wrappers around Invoke() calls from the worker thread to the network thread. Instead, replace them with Init_n() and Deinit_n() that are currently* required to be called by external code in order to associate/disassociate the channels with the transport. This CL mostly moves things around in order to prepare for upcoming changes, but it does change channel destruction in the following way: - When destroying channels, we don't block the worker thread anymore while uninitialization happens on the network thread. Previously both signal and worker threads were blocked during the uninitialization in the ChannelManager. * In an upcoming CL, Init_n() and Deinit_n() will be called internally from a different method that's always called on the network thread when a channel is associated/disassociated with a transceiver. When we're there, we will have removed several invokes that currently are a part of constructing/destructing channel objects. Bug: webrtc:11992 Change-Id: Ibc30447a40749ceb36d37834b0cfc5c5ea60e895 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/246502 Reviewed-by: Harald Alvestrand Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#35707} --- pc/channel.cc | 55 ++++++++++++++++++++++++------------------ pc/channel.h | 11 +++------ pc/channel_manager.cc | 20 +++++++++++++-- pc/channel_unittest.cc | 35 ++++++++++++++++++++++++--- pc/sdp_offer_answer.cc | 3 +++ 5 files changed, 86 insertions(+), 38 deletions(-) diff --git a/pc/channel.cc b/pc/channel.cc index 95e702667d..fa1cddf1d9 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -183,34 +183,29 @@ void BaseChannel::DisconnectFromRtpTransport_n() { rtp_transport_->SignalNetworkRouteChanged.disconnect(this); rtp_transport_->SignalWritableState.disconnect(this); rtp_transport_->SignalSentPacket.disconnect(this); + rtp_transport_ = nullptr; } -void BaseChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) { - RTC_DCHECK_RUN_ON(worker_thread()); +void BaseChannel::Init_n(webrtc::RtpTransportInternal* rtp_transport) { + // Set the transport before we call SetInterface() since setting the interface + // pointer will call us back to set transport options. + SetRtpTransport(rtp_transport); - network_thread_->Invoke(RTC_FROM_HERE, [this, rtp_transport] { - SetRtpTransport(rtp_transport); - // Both RTP and RTCP channels should be set, we can call SetInterface on - // the media channel and it can set network options. - RTC_DCHECK(!media_channel_->HasNetworkInterface()); - media_channel_->SetInterface(this); - }); + // Both RTP and RTCP channels should be set, we can call SetInterface on + // the media channel and it can set network options. + RTC_DCHECK(!media_channel_->HasNetworkInterface()); + media_channel_->SetInterface(this); } -void BaseChannel::Deinit() { - RTC_DCHECK_RUN_ON(worker_thread()); +void BaseChannel::Deinit_n() { // Packets arrive on the network thread, processing packets calls virtual // functions, so need to stop this process in Deinit that is called in // derived classes destructor. - network_thread_->Invoke(RTC_FROM_HERE, [&] { - RTC_DCHECK_RUN_ON(network_thread()); - media_channel_->SetInterface(/*iface=*/nullptr); - - if (rtp_transport_) { - DisconnectFromRtpTransport_n(); - } - RTC_DCHECK(!network_initialized()); - }); + media_channel_->SetInterface(/*iface=*/nullptr); + if (rtp_transport_) { + DisconnectFromRtpTransport_n(); + } + RTC_DCHECK(!network_initialized()); } bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) { @@ -234,7 +229,7 @@ bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) { if (!ConnectToRtpTransport_n()) { return false; } - OnTransportReadyToSend(rtp_transport_->IsReadyToSend()); + media_channel_->OnReadyToSend(rtp_transport_->IsReadyToSend()); UpdateWritableState_n(); // Set the cached socket options. @@ -322,6 +317,7 @@ int BaseChannel::SetOption(SocketType type, rtc::Socket::Option opt, int value) { RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(network_initialized()); RTC_DCHECK(rtp_transport_); switch (type) { case ST_RTP: @@ -338,6 +334,7 @@ int BaseChannel::SetOption(SocketType type, void BaseChannel::OnWritableState(bool writable) { RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(network_initialized()); if (writable) { ChannelWritable_n(); } else { @@ -347,9 +344,11 @@ void BaseChannel::OnWritableState(bool writable) { void BaseChannel::OnNetworkRouteChanged( absl::optional network_route) { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(network_initialized()); + RTC_LOG(LS_INFO) << "Network route changed for " << ToString(); - RTC_DCHECK_RUN_ON(network_thread()); rtc::NetworkRoute new_route; if (network_route) { new_route = *(network_route); @@ -365,11 +364,19 @@ void BaseChannel::SetFirstPacketReceivedCallback( std::function callback) { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(!on_first_packet_received_ || !callback); + + // TODO(bugs.webrtc.org/11992): Rename SetFirstPacketReceivedCallback to + // something that indicates network thread initialization/uninitialization and + // call Init_n() / Deinit_n() respectively. + // if (!callback) + // Deinit_n(); + on_first_packet_received_ = std::move(callback); } void BaseChannel::OnTransportReadyToSend(bool ready) { RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(network_initialized()); media_channel_->OnReadyToSend(ready); } @@ -377,6 +384,7 @@ bool BaseChannel::SendPacket(bool rtcp, rtc::CopyOnWriteBuffer* packet, const rtc::PacketOptions& options) { RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(network_initialized()); TRACE_EVENT0("webrtc", "BaseChannel::SendPacket"); // Until all the code is migrated to use RtpPacketType instead of bool. @@ -420,6 +428,7 @@ bool BaseChannel::SendPacket(bool rtcp, void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(network_initialized()); if (on_first_packet_received_) { on_first_packet_received_(); @@ -824,7 +833,6 @@ VoiceChannel::~VoiceChannel() { TRACE_EVENT0("webrtc", "VoiceChannel::~VoiceChannel"); // this can't be done in the base class, since it calls a virtual DisableMedia_w(); - Deinit(); } void VoiceChannel::UpdateMediaSendRecvState_w() { @@ -947,7 +955,6 @@ VideoChannel::~VideoChannel() { TRACE_EVENT0("webrtc", "VideoChannel::~VideoChannel"); // this can't be done in the base class, since it calls a virtual DisableMedia_w(); - Deinit(); } void VideoChannel::UpdateMediaSendRecvState_w() { diff --git a/pc/channel.h b/pc/channel.h index 6c2fa1d6e3..7d7aeb6aea 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -86,10 +86,6 @@ struct CryptoParams; // and methods with _s suffix on signaling thread. // Network and worker threads may be the same thread. // -// WARNING! SUBCLASSES MUST CALL Deinit() IN THEIR DESTRUCTORS! -// This is required to avoid a data race between the destructor modifying the -// vtable, and the media channel's thread using BaseChannel as the -// NetworkInterface. class BaseChannel : public ChannelInterface, // TODO(tommi): Remove has_slots inheritance. @@ -114,11 +110,10 @@ class BaseChannel : public ChannelInterface, webrtc::CryptoOptions crypto_options, rtc::UniqueRandomIdGenerator* ssrc_generator); virtual ~BaseChannel(); - virtual void Init_w(webrtc::RtpTransportInternal* rtp_transport); - // Deinit may be called multiple times and is simply ignored if it's already - // done. - void Deinit(); + void Init_n(webrtc::RtpTransportInternal* rtp_transport) + RTC_RUN_ON(network_thread()); + void Deinit_n() RTC_RUN_ON(network_thread()); rtc::Thread* worker_thread() const { return worker_thread_; } rtc::Thread* network_thread() const { return network_thread_; } diff --git a/pc/channel_manager.cc b/pc/channel_manager.cc index 525371fddc..7f1cd982a4 100644 --- a/pc/channel_manager.cc +++ b/pc/channel_manager.cc @@ -176,7 +176,10 @@ VoiceChannel* ChannelManager::CreateVoiceChannel( absl::WrapUnique(media_channel), content_name, srtp_required, crypto_options, ssrc_generator); - voice_channel->Init_w(rtp_transport); + network_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(voice_channel->network_thread()); + voice_channel->Init_n(rtp_transport); + }); VoiceChannel* voice_channel_ptr = voice_channel.get(); voice_channels_.push_back(std::move(voice_channel)); @@ -187,6 +190,11 @@ void ChannelManager::DestroyVoiceChannel(VoiceChannel* voice_channel) { TRACE_EVENT0("webrtc", "ChannelManager::DestroyVoiceChannel"); RTC_DCHECK(voice_channel); + network_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(voice_channel->network_thread()); + voice_channel->Deinit_n(); + }); + if (!worker_thread_->IsCurrent()) { worker_thread_->Invoke(RTC_FROM_HERE, [&] { DestroyVoiceChannel(voice_channel); }); @@ -240,7 +248,10 @@ VideoChannel* ChannelManager::CreateVideoChannel( absl::WrapUnique(media_channel), content_name, srtp_required, crypto_options, ssrc_generator); - video_channel->Init_w(rtp_transport); + network_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(video_channel->network_thread()); + video_channel->Init_n(rtp_transport); + }); VideoChannel* video_channel_ptr = video_channel.get(); video_channels_.push_back(std::move(video_channel)); @@ -251,6 +262,11 @@ void ChannelManager::DestroyVideoChannel(VideoChannel* video_channel) { TRACE_EVENT0("webrtc", "ChannelManager::DestroyVideoChannel"); RTC_DCHECK(video_channel); + network_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(video_channel->network_thread()); + video_channel->Deinit_n(); + }); + if (!worker_thread_->IsCurrent()) { worker_thread_->Invoke(RTC_FROM_HERE, [&] { DestroyVideoChannel(video_channel); }); diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc index 4818267f18..42497b86df 100644 --- a/pc/channel_unittest.cc +++ b/pc/channel_unittest.cc @@ -126,8 +126,10 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { ~ChannelTest() { if (network_thread_) { - network_thread_->Invoke( - RTC_FROM_HERE, [this]() { network_thread_safety_->SetNotAlive(); }); + network_thread_->Invoke(RTC_FROM_HERE, [this]() { + network_thread_safety_->SetNotAlive(); + DeinitChannels(); + }); } } @@ -271,6 +273,22 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { } } + // Unininitializes the channels on the network thread. + void DeinitChannels() { + if (!channel1_ && !channel2_) + return; + network_thread_->Invoke(RTC_FROM_HERE, [this]() { + if (channel1_) { + RTC_DCHECK_RUN_ON(channel1_->network_thread()); + channel1_->Deinit_n(); + } + if (channel2_) { + RTC_DCHECK_RUN_ON(channel2_->network_thread()); + channel2_->Deinit_n(); + } + }); + } + std::unique_ptr CreateUnencryptedTransport( rtc::PacketTransportInternal* rtp_packet_transport, rtc::PacketTransportInternal* rtcp_packet_transport) { @@ -918,6 +936,9 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { EXPECT_TRUE(SendAccept()); SendRtp1(); SendRtp2(); + + DeinitChannels(); + // Do not wait, destroy channels. channel1_.reset(nullptr); channel2_.reset(nullptr); @@ -1427,7 +1448,10 @@ std::unique_ptr ChannelTest::CreateChannel( worker_thread, network_thread, signaling_thread, std::move(ch), cricket::CN_AUDIO, (flags & DTLS) != 0, webrtc::CryptoOptions(), &ssrc_generator_); - channel->Init_w(rtp_transport); + network_thread->Invoke(RTC_FROM_HERE, [&]() { + RTC_DCHECK_RUN_ON(channel->network_thread()); + channel->Init_n(rtp_transport); + }); return channel; } @@ -1510,7 +1534,10 @@ std::unique_ptr ChannelTest::CreateChannel( worker_thread, network_thread, signaling_thread, std::move(ch), cricket::CN_VIDEO, (flags & DTLS) != 0, webrtc::CryptoOptions(), &ssrc_generator_); - channel->Init_w(rtp_transport); + network_thread->Invoke(RTC_FROM_HERE, [&]() { + RTC_DCHECK_RUN_ON(channel->network_thread()); + channel->Init_n(rtp_transport); + }); return channel; } diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 49389d6c7f..1721e41d2a 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -4711,11 +4711,14 @@ cricket::VoiceChannel* SdpOfferAnswerHandler::CreateVoiceChannel( if (!channel_manager()->media_engine()) return nullptr; + // TODO(tommi): Avoid hop to network thread. RtpTransportInternal* rtp_transport = pc_->GetRtpTransport(mid); // TODO(bugs.webrtc.org/11992): CreateVoiceChannel internally switches to the // worker thread. We shouldn't be using the `call_ptr_` hack here but simply // be on the worker thread and use `call_` (update upstream code). + // TODO(tommi): This hops to the worker and from the worker to the network + // thread (blocking both signal and worker). return channel_manager()->CreateVoiceChannel( pc_->call_ptr(), pc_->configuration()->media_config, rtp_transport, signaling_thread(), mid, pc_->SrtpRequired(), pc_->GetCryptoOptions(),