Move network thread invokes for initialization for media channels, out.

Remove Init_w and Deinit(), both of which were wrappers around Invoke()
calls from the worker thread to the network thread.

Instead, replace them with Init_n() and Deinit_n() that are currently*
required to be called by external code in order to associate/disassociate
the channels with the transport.

This CL mostly moves things around in order to prepare for upcoming
changes, but it does change channel destruction in the following way:
- When destroying channels, we don't block the worker thread anymore
  while uninitialization happens on the network thread. Previously
  both signal and worker threads were blocked during the
  uninitialization in the ChannelManager.

* In an upcoming CL, Init_n() and Deinit_n() will be called internally
  from a different method that's always called on the network thread
  when a channel is associated/disassociated with a transceiver. When
  we're there, we will have removed several invokes that currently are
  a part of constructing/destructing channel objects.

Bug: webrtc:11992
Change-Id: Ibc30447a40749ceb36d37834b0cfc5c5ea60e895
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/246502
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35707}
This commit is contained in:
Tomas Gunnarsson 2022-01-17 11:25:21 +01:00 committed by WebRTC LUCI CQ
parent cc9b7ec740
commit 1933d3b677
5 changed files with 86 additions and 38 deletions

View File

@ -183,34 +183,29 @@ void BaseChannel::DisconnectFromRtpTransport_n() {
rtp_transport_->SignalNetworkRouteChanged.disconnect(this);
rtp_transport_->SignalWritableState.disconnect(this);
rtp_transport_->SignalSentPacket.disconnect(this);
rtp_transport_ = nullptr;
}
void BaseChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) {
RTC_DCHECK_RUN_ON(worker_thread());
void BaseChannel::Init_n(webrtc::RtpTransportInternal* rtp_transport) {
// Set the transport before we call SetInterface() since setting the interface
// pointer will call us back to set transport options.
SetRtpTransport(rtp_transport);
network_thread_->Invoke<void>(RTC_FROM_HERE, [this, rtp_transport] {
SetRtpTransport(rtp_transport);
// Both RTP and RTCP channels should be set, we can call SetInterface on
// the media channel and it can set network options.
RTC_DCHECK(!media_channel_->HasNetworkInterface());
media_channel_->SetInterface(this);
});
// Both RTP and RTCP channels should be set, we can call SetInterface on
// the media channel and it can set network options.
RTC_DCHECK(!media_channel_->HasNetworkInterface());
media_channel_->SetInterface(this);
}
void BaseChannel::Deinit() {
RTC_DCHECK_RUN_ON(worker_thread());
void BaseChannel::Deinit_n() {
// Packets arrive on the network thread, processing packets calls virtual
// functions, so need to stop this process in Deinit that is called in
// derived classes destructor.
network_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(network_thread());
media_channel_->SetInterface(/*iface=*/nullptr);
if (rtp_transport_) {
DisconnectFromRtpTransport_n();
}
RTC_DCHECK(!network_initialized());
});
media_channel_->SetInterface(/*iface=*/nullptr);
if (rtp_transport_) {
DisconnectFromRtpTransport_n();
}
RTC_DCHECK(!network_initialized());
}
bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) {
@ -234,7 +229,7 @@ bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) {
if (!ConnectToRtpTransport_n()) {
return false;
}
OnTransportReadyToSend(rtp_transport_->IsReadyToSend());
media_channel_->OnReadyToSend(rtp_transport_->IsReadyToSend());
UpdateWritableState_n();
// Set the cached socket options.
@ -322,6 +317,7 @@ int BaseChannel::SetOption(SocketType type,
rtc::Socket::Option opt,
int value) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(network_initialized());
RTC_DCHECK(rtp_transport_);
switch (type) {
case ST_RTP:
@ -338,6 +334,7 @@ int BaseChannel::SetOption(SocketType type,
void BaseChannel::OnWritableState(bool writable) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(network_initialized());
if (writable) {
ChannelWritable_n();
} else {
@ -347,9 +344,11 @@ void BaseChannel::OnWritableState(bool writable) {
void BaseChannel::OnNetworkRouteChanged(
absl::optional<rtc::NetworkRoute> network_route) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(network_initialized());
RTC_LOG(LS_INFO) << "Network route changed for " << ToString();
RTC_DCHECK_RUN_ON(network_thread());
rtc::NetworkRoute new_route;
if (network_route) {
new_route = *(network_route);
@ -365,11 +364,19 @@ void BaseChannel::SetFirstPacketReceivedCallback(
std::function<void()> callback) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(!on_first_packet_received_ || !callback);
// TODO(bugs.webrtc.org/11992): Rename SetFirstPacketReceivedCallback to
// something that indicates network thread initialization/uninitialization and
// call Init_n() / Deinit_n() respectively.
// if (!callback)
// Deinit_n();
on_first_packet_received_ = std::move(callback);
}
void BaseChannel::OnTransportReadyToSend(bool ready) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(network_initialized());
media_channel_->OnReadyToSend(ready);
}
@ -377,6 +384,7 @@ bool BaseChannel::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(network_initialized());
TRACE_EVENT0("webrtc", "BaseChannel::SendPacket");
// Until all the code is migrated to use RtpPacketType instead of bool.
@ -420,6 +428,7 @@ bool BaseChannel::SendPacket(bool rtcp,
void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(network_initialized());
if (on_first_packet_received_) {
on_first_packet_received_();
@ -824,7 +833,6 @@ VoiceChannel::~VoiceChannel() {
TRACE_EVENT0("webrtc", "VoiceChannel::~VoiceChannel");
// this can't be done in the base class, since it calls a virtual
DisableMedia_w();
Deinit();
}
void VoiceChannel::UpdateMediaSendRecvState_w() {
@ -947,7 +955,6 @@ VideoChannel::~VideoChannel() {
TRACE_EVENT0("webrtc", "VideoChannel::~VideoChannel");
// this can't be done in the base class, since it calls a virtual
DisableMedia_w();
Deinit();
}
void VideoChannel::UpdateMediaSendRecvState_w() {

View File

@ -86,10 +86,6 @@ struct CryptoParams;
// and methods with _s suffix on signaling thread.
// Network and worker threads may be the same thread.
//
// WARNING! SUBCLASSES MUST CALL Deinit() IN THEIR DESTRUCTORS!
// This is required to avoid a data race between the destructor modifying the
// vtable, and the media channel's thread using BaseChannel as the
// NetworkInterface.
class BaseChannel : public ChannelInterface,
// TODO(tommi): Remove has_slots inheritance.
@ -114,11 +110,10 @@ class BaseChannel : public ChannelInterface,
webrtc::CryptoOptions crypto_options,
rtc::UniqueRandomIdGenerator* ssrc_generator);
virtual ~BaseChannel();
virtual void Init_w(webrtc::RtpTransportInternal* rtp_transport);
// Deinit may be called multiple times and is simply ignored if it's already
// done.
void Deinit();
void Init_n(webrtc::RtpTransportInternal* rtp_transport)
RTC_RUN_ON(network_thread());
void Deinit_n() RTC_RUN_ON(network_thread());
rtc::Thread* worker_thread() const { return worker_thread_; }
rtc::Thread* network_thread() const { return network_thread_; }

View File

@ -176,7 +176,10 @@ VoiceChannel* ChannelManager::CreateVoiceChannel(
absl::WrapUnique(media_channel), content_name, srtp_required,
crypto_options, ssrc_generator);
voice_channel->Init_w(rtp_transport);
network_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(voice_channel->network_thread());
voice_channel->Init_n(rtp_transport);
});
VoiceChannel* voice_channel_ptr = voice_channel.get();
voice_channels_.push_back(std::move(voice_channel));
@ -187,6 +190,11 @@ void ChannelManager::DestroyVoiceChannel(VoiceChannel* voice_channel) {
TRACE_EVENT0("webrtc", "ChannelManager::DestroyVoiceChannel");
RTC_DCHECK(voice_channel);
network_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(voice_channel->network_thread());
voice_channel->Deinit_n();
});
if (!worker_thread_->IsCurrent()) {
worker_thread_->Invoke<void>(RTC_FROM_HERE,
[&] { DestroyVoiceChannel(voice_channel); });
@ -240,7 +248,10 @@ VideoChannel* ChannelManager::CreateVideoChannel(
absl::WrapUnique(media_channel), content_name, srtp_required,
crypto_options, ssrc_generator);
video_channel->Init_w(rtp_transport);
network_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(video_channel->network_thread());
video_channel->Init_n(rtp_transport);
});
VideoChannel* video_channel_ptr = video_channel.get();
video_channels_.push_back(std::move(video_channel));
@ -251,6 +262,11 @@ void ChannelManager::DestroyVideoChannel(VideoChannel* video_channel) {
TRACE_EVENT0("webrtc", "ChannelManager::DestroyVideoChannel");
RTC_DCHECK(video_channel);
network_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(video_channel->network_thread());
video_channel->Deinit_n();
});
if (!worker_thread_->IsCurrent()) {
worker_thread_->Invoke<void>(RTC_FROM_HERE,
[&] { DestroyVideoChannel(video_channel); });

View File

@ -126,8 +126,10 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
~ChannelTest() {
if (network_thread_) {
network_thread_->Invoke<void>(
RTC_FROM_HERE, [this]() { network_thread_safety_->SetNotAlive(); });
network_thread_->Invoke<void>(RTC_FROM_HERE, [this]() {
network_thread_safety_->SetNotAlive();
DeinitChannels();
});
}
}
@ -271,6 +273,22 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
}
}
// Unininitializes the channels on the network thread.
void DeinitChannels() {
if (!channel1_ && !channel2_)
return;
network_thread_->Invoke<void>(RTC_FROM_HERE, [this]() {
if (channel1_) {
RTC_DCHECK_RUN_ON(channel1_->network_thread());
channel1_->Deinit_n();
}
if (channel2_) {
RTC_DCHECK_RUN_ON(channel2_->network_thread());
channel2_->Deinit_n();
}
});
}
std::unique_ptr<webrtc::RtpTransport> CreateUnencryptedTransport(
rtc::PacketTransportInternal* rtp_packet_transport,
rtc::PacketTransportInternal* rtcp_packet_transport) {
@ -918,6 +936,9 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
EXPECT_TRUE(SendAccept());
SendRtp1();
SendRtp2();
DeinitChannels();
// Do not wait, destroy channels.
channel1_.reset(nullptr);
channel2_.reset(nullptr);
@ -1427,7 +1448,10 @@ std::unique_ptr<cricket::VoiceChannel> ChannelTest<VoiceTraits>::CreateChannel(
worker_thread, network_thread, signaling_thread, std::move(ch),
cricket::CN_AUDIO, (flags & DTLS) != 0, webrtc::CryptoOptions(),
&ssrc_generator_);
channel->Init_w(rtp_transport);
network_thread->Invoke<void>(RTC_FROM_HERE, [&]() {
RTC_DCHECK_RUN_ON(channel->network_thread());
channel->Init_n(rtp_transport);
});
return channel;
}
@ -1510,7 +1534,10 @@ std::unique_ptr<cricket::VideoChannel> ChannelTest<VideoTraits>::CreateChannel(
worker_thread, network_thread, signaling_thread, std::move(ch),
cricket::CN_VIDEO, (flags & DTLS) != 0, webrtc::CryptoOptions(),
&ssrc_generator_);
channel->Init_w(rtp_transport);
network_thread->Invoke<void>(RTC_FROM_HERE, [&]() {
RTC_DCHECK_RUN_ON(channel->network_thread());
channel->Init_n(rtp_transport);
});
return channel;
}

View File

@ -4711,11 +4711,14 @@ cricket::VoiceChannel* SdpOfferAnswerHandler::CreateVoiceChannel(
if (!channel_manager()->media_engine())
return nullptr;
// TODO(tommi): Avoid hop to network thread.
RtpTransportInternal* rtp_transport = pc_->GetRtpTransport(mid);
// TODO(bugs.webrtc.org/11992): CreateVoiceChannel internally switches to the
// worker thread. We shouldn't be using the `call_ptr_` hack here but simply
// be on the worker thread and use `call_` (update upstream code).
// TODO(tommi): This hops to the worker and from the worker to the network
// thread (blocking both signal and worker).
return channel_manager()->CreateVoiceChannel(
pc_->call_ptr(), pc_->configuration()->media_config, rtp_transport,
signaling_thread(), mid, pc_->SrtpRequired(), pc_->GetCryptoOptions(),