Reland "Prepare to avoid hops to worker for network events."

This is a reland of d48a2b14e7545d0a0778df753e062075c044e2a1

The diff of the reland (what caused the tsan error) can be seen
by diffing patch sets 2 and 3. Essentially I missed keeping the calls
to the transport controller on the worker thread. Note to self to add
thread/sequence checks to that code so that we won't have to rely on
tsan :)

Original change's description:
> Prepare to avoid hops to worker for network events.
>
> This moves the thread hop for network events, from BaseChannel and
> into Call. The reason for this is to move the control over those hops
> (including DeliverPacket[Async]) into the same class where the state
> is held that is affected by those hops. Once that's done, we can start
> moving the relevant network state over to the network thread and
> eventually remove the hops.
>
> I'm also adding several TODOs for tracking future steps and give
> developers a heads up.
>
> Bug: webrtc:11993
> Change-Id: Ice7ee3b5b6893532df52039324293979196d341d
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/204800
> Commit-Queue: Tommi <tommi@webrtc.org>
> Reviewed-by: Niels Moller <nisse@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33138}

Bug: webrtc:11993, webrtc:12430
Change-Id: I4fccaa418d22c2087a55bbb3ddbb25fac3b4dfcc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205580
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33153}
This commit is contained in:
Tomas Gunnarsson 2021-02-03 16:23:40 +01:00 committed by Commit Bot
parent 3f7990d38b
commit ad3258647e
7 changed files with 111 additions and 48 deletions

View File

@ -341,6 +341,7 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) {
}
void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_receive_->SetAssociatedSendChannel(
send_stream ? send_stream->GetChannel() : nullptr);
@ -362,6 +363,8 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const {
const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting()
const {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or
// remove test method and |associated_send_stream_| variable.
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
return associated_send_stream_;
}

View File

@ -787,6 +787,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers,
void ChannelReceive::SetAssociatedSendChannel(
const ChannelSendInterface* channel) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
RTC_DCHECK(worker_thread_checker_.IsCurrent());
MutexLock lock(&assoc_send_channel_lock_);
associated_send_channel_ = channel;

View File

@ -335,15 +335,18 @@ class Call final : public webrtc::Call,
NetworkState audio_network_state_;
NetworkState video_network_state_;
// TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the
// network thread.
bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
// TODO(bugs.webrtc.org/11993): Move audio_receive_streams_,
// video_receive_streams_ and sync_stream_mapping_ over to the network thread.
std::set<AudioReceiveStream*> audio_receive_streams_
RTC_GUARDED_BY(worker_thread_);
std::set<VideoReceiveStream2*> video_receive_streams_
RTC_GUARDED_BY(worker_thread_);
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
RTC_GUARDED_BY(worker_thread_);
@ -378,6 +381,9 @@ class Call final : public webrtc::Call,
// send side BWE are negotiated.
const bool use_send_side_bwe;
};
// TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the
// network thread.
std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
RTC_GUARDED_BY(worker_thread_);
@ -800,6 +806,8 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
audio_send_ssrcs_.end());
audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
// TODO(bugs.webrtc.org/11993): call AssociateSendStream and
// UpdateAggregateNetworkState asynchronously on the network thread.
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
stream->AssociateSendStream(send_stream);
@ -807,6 +815,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
}
UpdateAggregateNetworkState();
return send_stream;
}
@ -825,6 +834,8 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
RTC_DCHECK_EQ(1, num_deleted);
// TODO(bugs.webrtc.org/11993): call AssociateSendStream and
// UpdateAggregateNetworkState asynchronously on the network thread.
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == ssrc) {
stream->AssociateSendStream(nullptr);
@ -832,6 +843,7 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
}
UpdateAggregateNetworkState();
delete send_stream;
}
@ -842,11 +854,19 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
EnsureStarted();
event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
CreateRtcLogStreamConfig(config)));
// TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
// and |audio_receiver_controller_| out of AudioReceiveStream construction and
// set it up asynchronously on the network thread (the registration and
// |audio_receiver_controller_| need to live on the network thread).
AudioReceiveStream* receive_stream = new AudioReceiveStream(
clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
module_process_thread_->process_thread(), config_.neteq_factory, config,
config_.audio_state, event_log_);
// TODO(bugs.webrtc.org/11993): Update the below on the network thread.
// We could possibly set up the audio_receiver_controller_ association up
// as part of the async setup.
receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
audio_receive_streams_.insert(receive_stream);
@ -873,8 +893,12 @@ void Call::DestroyAudioReceiveStream(
uint32_t ssrc = config.rtp.remote_ssrc;
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(ssrc);
// TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync
// and UpdateAggregateNetworkState on the network thread.
audio_receive_streams_.erase(audio_receive_stream);
const std::string& sync_group = audio_receive_stream->config().sync_group;
const auto it = sync_stream_mapping_.find(sync_group);
if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
sync_stream_mapping_.erase(it);
@ -883,6 +907,9 @@ void Call::DestroyAudioReceiveStream(
receive_rtp_config_.erase(ssrc);
UpdateAggregateNetworkState();
// TODO(bugs.webrtc.org/11993): Consider if deleting |audio_receive_stream|
// on the network thread would be better or if we'd need to tear down the
// state in two phases.
delete audio_receive_stream;
}
@ -995,13 +1022,15 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
EnsureStarted();
TaskQueueBase* current = GetCurrentTaskQueueOrThread();
RTC_CHECK(current);
// TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
// and |video_receiver_controller_| out of VideoReceiveStream2 construction
// and set it up asynchronously on the network thread (the registration and
// |video_receiver_controller_| need to live on the network thread).
VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_,
transport_send_ptr_->packet_router(), std::move(configuration),
module_process_thread_->process_thread(), call_stats_.get(), clock_,
new VCMTiming(clock_));
task_queue_factory_, worker_thread_, &video_receiver_controller_,
num_cpu_cores_, transport_send_ptr_->packet_router(),
std::move(configuration), module_process_thread_->process_thread(),
call_stats_.get(), clock_, new VCMTiming(clock_));
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
if (config.rtp.rtx_ssrc) {
@ -1134,34 +1163,54 @@ const WebRtcKeyValueConfig& Call::trials() const {
}
void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
RTC_DCHECK_RUN_ON(worker_thread_);
switch (media) {
case MediaType::AUDIO:
audio_network_state_ = state;
break;
case MediaType::VIDEO:
video_network_state_ = state;
break;
case MediaType::ANY:
case MediaType::DATA:
RTC_NOTREACHED();
break;
}
RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(media == MediaType::AUDIO || media == MediaType::VIDEO);
UpdateAggregateNetworkState();
for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
video_receive_stream->SignalNetworkState(video_network_state_);
auto closure = [this, media, state]() {
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
RTC_DCHECK_RUN_ON(worker_thread_);
if (media == MediaType::AUDIO) {
audio_network_state_ = state;
} else {
RTC_DCHECK_EQ(media, MediaType::VIDEO);
video_network_state_ = state;
}
// TODO(tommi): Is it necessary to always do this, including if there
// was no change in state?
UpdateAggregateNetworkState();
// TODO(tommi): Is it right to do this if media == AUDIO?
for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
video_receive_stream->SignalNetworkState(video_network_state_);
}
};
if (network_thread_ == worker_thread_) {
closure();
} else {
// TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to
// post to the worker thread.
worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure)));
}
}
void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
RTC_DCHECK_RUN_ON(worker_thread_);
for (auto& kv : audio_send_ssrcs_) {
kv.second->SetTransportOverhead(transport_overhead_per_packet);
}
RTC_DCHECK_RUN_ON(network_thread_);
worker_thread_->PostTask(
ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() {
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
RTC_DCHECK_RUN_ON(worker_thread_);
for (auto& kv : audio_send_ssrcs_) {
kv.second->SetTransportOverhead(transport_overhead_per_packet);
}
}));
}
void Call::UpdateAggregateNetworkState() {
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
// RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK_RUN_ON(worker_thread_);
bool have_audio =
@ -1241,6 +1290,7 @@ void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
}
void Call::ConfigureSync(const std::string& sync_group) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
// Set sync only if there was no previous one.
if (sync_group.empty())
return;
@ -1452,6 +1502,9 @@ void Call::DeliverPacketAsync(MediaType media_type,
}
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
// This method is called synchronously via |OnRtpPacket()| (see DeliverRtp)
// on the same thread.
RTC_DCHECK_RUN_ON(worker_thread_);
RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(packet, length))

View File

@ -1803,7 +1803,7 @@ void WebRtcVideoChannel::BackfillBufferedPackets(
}
void WebRtcVideoChannel::OnReadyToSend(bool ready) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready.");
call_->SignalChannelNetworkState(
webrtc::MediaType::VIDEO,
@ -1813,11 +1813,15 @@ void WebRtcVideoChannel::OnReadyToSend(bool ready) {
void WebRtcVideoChannel::OnNetworkRouteChanged(
const std::string& transport_name,
const rtc::NetworkRoute& network_route) {
RTC_DCHECK_RUN_ON(&thread_checker_);
call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name,
network_route);
call_->GetTransportControllerSend()->OnTransportOverheadChanged(
network_route.packet_overhead);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
worker_thread_->PostTask(ToQueuedTask(
task_safety_, [this, name = transport_name, route = network_route] {
RTC_DCHECK_RUN_ON(&thread_checker_);
webrtc::RtpTransportControllerSendInterface* transport =
call_->GetTransportControllerSend();
transport->OnNetworkRouteChanged(name, route);
transport->OnTransportOverheadChanged(route.packet_overhead);
}));
}
void WebRtcVideoChannel::SetInterface(NetworkInterface* iface) {

View File

@ -2290,10 +2290,15 @@ void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet,
void WebRtcVoiceMediaChannel::OnNetworkRouteChanged(
const std::string& transport_name,
const rtc::NetworkRoute& network_route) {
RTC_DCHECK_RUN_ON(worker_thread_);
call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name,
network_route);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
call_->OnAudioTransportOverheadChanged(network_route.packet_overhead);
worker_thread_->PostTask(ToQueuedTask(
task_safety_, [this, name = transport_name, route = network_route] {
RTC_DCHECK_RUN_ON(worker_thread_);
call_->GetTransportControllerSend()->OnNetworkRouteChanged(name, route);
}));
}
bool WebRtcVoiceMediaChannel::MuteStream(uint32_t ssrc, bool muted) {
@ -2335,7 +2340,7 @@ bool WebRtcVoiceMediaChannel::SetMaxSendBitrate(int bps) {
}
void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) {
RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready.");
call_->SignalChannelNetworkState(
webrtc::MediaType::AUDIO,

View File

@ -369,7 +369,7 @@ void BaseChannel::OnWritableState(bool writable) {
void BaseChannel::OnNetworkRouteChanged(
absl::optional<rtc::NetworkRoute> network_route) {
RTC_LOG(LS_INFO) << "Network route for " << ToString() << " was changed.";
RTC_LOG(LS_INFO) << "Network route changed for " << ToString();
RTC_DCHECK_RUN_ON(network_thread());
rtc::NetworkRoute new_route;
@ -380,10 +380,7 @@ void BaseChannel::OnNetworkRouteChanged(
// use the same transport name and MediaChannel::OnNetworkRouteChanged cannot
// work correctly. Intentionally leave it broken to simplify the code and
// encourage the users to stop using non-muxing RTCP.
worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] {
RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
}));
media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
}
sigslot::signal1<ChannelInterface*>& BaseChannel::SignalFirstPacketReceived() {
@ -399,10 +396,8 @@ sigslot::signal1<const rtc::SentPacket&>& BaseChannel::SignalSentPacket() {
}
void BaseChannel::OnTransportReadyToSend(bool ready) {
worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] {
RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnReadyToSend(ready);
}));
RTC_DCHECK_RUN_ON(network_thread());
media_channel_->OnReadyToSend(ready);
}
bool BaseChannel::SendPacket(bool rtcp,

View File

@ -1205,11 +1205,13 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
CreateChannels(0, 0);
EXPECT_FALSE(media_channel1_->ready_to_send());
channel1_->OnTransportReadyToSend(true);
network_thread_->PostTask(
RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(true); });
WaitForThreads();
EXPECT_TRUE(media_channel1_->ready_to_send());
channel1_->OnTransportReadyToSend(false);
network_thread_->PostTask(
RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(false); });
WaitForThreads();
EXPECT_FALSE(media_channel1_->ready_to_send());
}