From 47ec157fbf11e63744df5cf8cae1fae155d9f832 Mon Sep 17 00:00:00 2001 From: Mirko Bonadei Date: Wed, 3 Feb 2021 12:08:07 +0000 Subject: [PATCH] Revert "Prepare to avoid hops to worker for network events." This reverts commit d48a2b14e7545d0a0778df753e062075c044e2a1. Reason for revert: TSan tests started to fail constantly after this CL (it looks like it is flaky and the CQ was lucky to get green). See https://ci.chromium.org/ui/p/webrtc/builders/ci/Linux%20Tsan%20v2/25042/overview. Original change's description: > Prepare to avoid hops to worker for network events. > > This moves the thread hop for network events, from BaseChannel and > into Call. The reason for this is to move the control over those hops > (including DeliverPacket[Async]) into the same class where the state > is held that is affected by those hops. Once that's done, we can start > moving the relevant network state over to the network thread and > eventually remove the hops. > > I'm also adding several TODOs for tracking future steps and give > developers a heads up. > > Bug: webrtc:11993 > Change-Id: Ice7ee3b5b6893532df52039324293979196d341d > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/204800 > Commit-Queue: Tommi > Reviewed-by: Niels Moller > Cr-Commit-Position: refs/heads/master@{#33138} TBR=nisse@webrtc.org,tommi@webrtc.org Change-Id: Id87cf9cbcc8ed58e74d755a110f0ef9dd980e298 No-Presubmit: true No-Tree-Checks: true No-Try: true Bug: webrtc:11993 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205525 Reviewed-by: Mirko Bonadei Commit-Queue: Mirko Bonadei Cr-Commit-Position: refs/heads/master@{#33145} --- audio/audio_receive_stream.cc | 3 - audio/channel_receive.cc | 1 - call/call.cc | 103 +++++++--------------------- media/engine/webrtc_video_engine.cc | 12 ++-- media/engine/webrtc_voice_engine.cc | 4 +- pc/channel.cc | 13 ++-- pc/channel_unittest.cc | 6 +- 7 files changed, 44 insertions(+), 98 deletions(-) diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc index 03dd4c0eef..d6f6140fae 100644 --- a/audio/audio_receive_stream.cc +++ b/audio/audio_receive_stream.cc @@ -341,7 +341,6 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) { } void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. RTC_DCHECK_RUN_ON(&worker_thread_checker_); channel_receive_->SetAssociatedSendChannel( send_stream ? send_stream->GetChannel() : nullptr); @@ -363,8 +362,6 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const { const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting() const { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or - // remove test method and |associated_send_stream_| variable. RTC_DCHECK_RUN_ON(&worker_thread_checker_); return associated_send_stream_; } diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index 5c2b91803a..a8015c8637 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -787,7 +787,6 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers, void ChannelReceive::SetAssociatedSendChannel( const ChannelSendInterface* channel) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. RTC_DCHECK(worker_thread_checker_.IsCurrent()); MutexLock lock(&assoc_send_channel_lock_); associated_send_channel_ = channel; diff --git a/call/call.cc b/call/call.cc index 46bf52862f..f20f4b5c41 100644 --- a/call/call.cc +++ b/call/call.cc @@ -335,18 +335,15 @@ class Call final : public webrtc::Call, NetworkState audio_network_state_; NetworkState video_network_state_; - // TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the - // network thread. bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_); // Audio, Video, and FlexFEC receive streams are owned by the client that // creates them. - // TODO(bugs.webrtc.org/11993): Move audio_receive_streams_, - // video_receive_streams_ and sync_stream_mapping_ over to the network thread. std::set audio_receive_streams_ RTC_GUARDED_BY(worker_thread_); std::set video_receive_streams_ RTC_GUARDED_BY(worker_thread_); + std::map sync_stream_mapping_ RTC_GUARDED_BY(worker_thread_); @@ -381,9 +378,6 @@ class Call final : public webrtc::Call, // send side BWE are negotiated. const bool use_send_side_bwe; }; - - // TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the - // network thread. std::map receive_rtp_config_ RTC_GUARDED_BY(worker_thread_); @@ -806,8 +800,6 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( audio_send_ssrcs_.end()); audio_send_ssrcs_[config.rtp.ssrc] = send_stream; - // TODO(bugs.webrtc.org/11993): call AssociateSendStream and - // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { stream->AssociateSendStream(send_stream); @@ -815,7 +807,6 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } UpdateAggregateNetworkState(); - return send_stream; } @@ -834,8 +825,6 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { size_t num_deleted = audio_send_ssrcs_.erase(ssrc); RTC_DCHECK_EQ(1, num_deleted); - // TODO(bugs.webrtc.org/11993): call AssociateSendStream and - // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { if (stream->config().rtp.local_ssrc == ssrc) { stream->AssociateSendStream(nullptr); @@ -843,7 +832,6 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { } UpdateAggregateNetworkState(); - delete send_stream; } @@ -854,19 +842,11 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( EnsureStarted(); event_log_->Log(std::make_unique( CreateRtcLogStreamConfig(config))); - - // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| - // and |audio_receiver_controller_| out of AudioReceiveStream construction and - // set it up asynchronously on the network thread (the registration and - // |audio_receiver_controller_| need to live on the network thread). AudioReceiveStream* receive_stream = new AudioReceiveStream( clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(), module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); - // TODO(bugs.webrtc.org/11993): Update the below on the network thread. - // We could possibly set up the audio_receiver_controller_ association up - // as part of the async setup. receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config)); audio_receive_streams_.insert(receive_stream); @@ -893,12 +873,8 @@ void Call::DestroyAudioReceiveStream( uint32_t ssrc = config.rtp.remote_ssrc; receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(ssrc); - - // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync - // and UpdateAggregateNetworkState on the network thread. audio_receive_streams_.erase(audio_receive_stream); const std::string& sync_group = audio_receive_stream->config().sync_group; - const auto it = sync_stream_mapping_.find(sync_group); if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) { sync_stream_mapping_.erase(it); @@ -907,9 +883,6 @@ void Call::DestroyAudioReceiveStream( receive_rtp_config_.erase(ssrc); UpdateAggregateNetworkState(); - // TODO(bugs.webrtc.org/11993): Consider if deleting |audio_receive_stream| - // on the network thread would be better or if we'd need to tear down the - // state in two phases. delete audio_receive_stream; } @@ -1022,15 +995,13 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( EnsureStarted(); - // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| - // and |video_receiver_controller_| out of VideoReceiveStream2 construction - // and set it up asynchronously on the network thread (the registration and - // |video_receiver_controller_| need to live on the network thread). + TaskQueueBase* current = GetCurrentTaskQueueOrThread(); + RTC_CHECK(current); VideoReceiveStream2* receive_stream = new VideoReceiveStream2( - task_queue_factory_, worker_thread_, &video_receiver_controller_, - num_cpu_cores_, transport_send_ptr_->packet_router(), - std::move(configuration), module_process_thread_->process_thread(), - call_stats_.get(), clock_, new VCMTiming(clock_)); + task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_, + transport_send_ptr_->packet_router(), std::move(configuration), + module_process_thread_->process_thread(), call_stats_.get(), clock_, + new VCMTiming(clock_)); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); if (config.rtp.rtx_ssrc) { @@ -1163,54 +1134,34 @@ const WebRtcKeyValueConfig& Call::trials() const { } void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { - RTC_DCHECK_RUN_ON(network_thread_); - RTC_DCHECK(media == MediaType::AUDIO || media == MediaType::VIDEO); - - auto closure = [this, media, state]() { - // TODO(bugs.webrtc.org/11993): Move this over to the network thread. - RTC_DCHECK_RUN_ON(worker_thread_); - if (media == MediaType::AUDIO) { + RTC_DCHECK_RUN_ON(worker_thread_); + switch (media) { + case MediaType::AUDIO: audio_network_state_ = state; - } else { - RTC_DCHECK_EQ(media, MediaType::VIDEO); + break; + case MediaType::VIDEO: video_network_state_ = state; - } + break; + case MediaType::ANY: + case MediaType::DATA: + RTC_NOTREACHED(); + break; + } - // TODO(tommi): Is it necessary to always do this, including if there - // was no change in state? - UpdateAggregateNetworkState(); - - // TODO(tommi): Is it right to do this if media == AUDIO? - for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { - video_receive_stream->SignalNetworkState(video_network_state_); - } - }; - - if (network_thread_ == worker_thread_) { - closure(); - } else { - // TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to - // post to the worker thread. - worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure))); + UpdateAggregateNetworkState(); + for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { + video_receive_stream->SignalNetworkState(video_network_state_); } } void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) { - RTC_DCHECK_RUN_ON(network_thread_); - worker_thread_->PostTask( - ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() { - // TODO(bugs.webrtc.org/11993): Move this over to the network thread. - RTC_DCHECK_RUN_ON(worker_thread_); - for (auto& kv : audio_send_ssrcs_) { - kv.second->SetTransportOverhead(transport_overhead_per_packet); - } - })); + RTC_DCHECK_RUN_ON(worker_thread_); + for (auto& kv : audio_send_ssrcs_) { + kv.second->SetTransportOverhead(transport_overhead_per_packet); + } } void Call::UpdateAggregateNetworkState() { - // TODO(bugs.webrtc.org/11993): Move this over to the network thread. - // RTC_DCHECK_RUN_ON(network_thread_); - RTC_DCHECK_RUN_ON(worker_thread_); bool have_audio = @@ -1290,7 +1241,6 @@ void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { } void Call::ConfigureSync(const std::string& sync_group) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. // Set sync only if there was no previous one. if (sync_group.empty()) return; @@ -1502,9 +1452,6 @@ void Call::DeliverPacketAsync(MediaType media_type, } void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. - // This method is called synchronously via |OnRtpPacket()| (see DeliverRtp) - // on the same thread. RTC_DCHECK_RUN_ON(worker_thread_); RtpPacketReceived parsed_packet; if (!parsed_packet.Parse(packet, length)) diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index c75d757cef..28ed365b38 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -1803,7 +1803,7 @@ void WebRtcVideoChannel::BackfillBufferedPackets( } void WebRtcVideoChannel::OnReadyToSend(bool ready) { - RTC_DCHECK_RUN_ON(&network_thread_checker_); + RTC_DCHECK_RUN_ON(&thread_checker_); RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready."); call_->SignalChannelNetworkState( webrtc::MediaType::VIDEO, @@ -1813,11 +1813,11 @@ void WebRtcVideoChannel::OnReadyToSend(bool ready) { void WebRtcVideoChannel::OnNetworkRouteChanged( const std::string& transport_name, const rtc::NetworkRoute& network_route) { - RTC_DCHECK_RUN_ON(&network_thread_checker_); - webrtc::RtpTransportControllerSendInterface* transport = - call_->GetTransportControllerSend(); - transport->OnNetworkRouteChanged(transport_name, network_route); - transport->OnTransportOverheadChanged(network_route.packet_overhead); + RTC_DCHECK_RUN_ON(&thread_checker_); + call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name, + network_route); + call_->GetTransportControllerSend()->OnTransportOverheadChanged( + network_route.packet_overhead); } void WebRtcVideoChannel::SetInterface(NetworkInterface* iface) { diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc index 0df96f3d1c..9efef3aefc 100644 --- a/media/engine/webrtc_voice_engine.cc +++ b/media/engine/webrtc_voice_engine.cc @@ -2290,7 +2290,7 @@ void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet, void WebRtcVoiceMediaChannel::OnNetworkRouteChanged( const std::string& transport_name, const rtc::NetworkRoute& network_route) { - RTC_DCHECK_RUN_ON(&network_thread_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name, network_route); call_->OnAudioTransportOverheadChanged(network_route.packet_overhead); @@ -2335,7 +2335,7 @@ bool WebRtcVoiceMediaChannel::SetMaxSendBitrate(int bps) { } void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) { - RTC_DCHECK_RUN_ON(&network_thread_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready."); call_->SignalChannelNetworkState( webrtc::MediaType::AUDIO, diff --git a/pc/channel.cc b/pc/channel.cc index 16e226384a..b672a96539 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -369,7 +369,7 @@ void BaseChannel::OnWritableState(bool writable) { void BaseChannel::OnNetworkRouteChanged( absl::optional network_route) { - RTC_LOG(LS_INFO) << "Network route changed for " << ToString(); + RTC_LOG(LS_INFO) << "Network route for " << ToString() << " was changed."; RTC_DCHECK_RUN_ON(network_thread()); rtc::NetworkRoute new_route; @@ -380,7 +380,10 @@ void BaseChannel::OnNetworkRouteChanged( // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot // work correctly. Intentionally leave it broken to simplify the code and // encourage the users to stop using non-muxing RTCP. - media_channel_->OnNetworkRouteChanged(transport_name_, new_route); + worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] { + RTC_DCHECK_RUN_ON(worker_thread()); + media_channel_->OnNetworkRouteChanged(transport_name_, new_route); + })); } sigslot::signal1& BaseChannel::SignalFirstPacketReceived() { @@ -396,8 +399,10 @@ sigslot::signal1& BaseChannel::SignalSentPacket() { } void BaseChannel::OnTransportReadyToSend(bool ready) { - RTC_DCHECK_RUN_ON(network_thread()); - media_channel_->OnReadyToSend(ready); + worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] { + RTC_DCHECK_RUN_ON(worker_thread()); + media_channel_->OnReadyToSend(ready); + })); } bool BaseChannel::SendPacket(bool rtcp, diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc index 4a0a6b4a15..f2e93d69ea 100644 --- a/pc/channel_unittest.cc +++ b/pc/channel_unittest.cc @@ -1205,13 +1205,11 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { CreateChannels(0, 0); EXPECT_FALSE(media_channel1_->ready_to_send()); - network_thread_->PostTask( - RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(true); }); + channel1_->OnTransportReadyToSend(true); WaitForThreads(); EXPECT_TRUE(media_channel1_->ready_to_send()); - network_thread_->PostTask( - RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(false); }); + channel1_->OnTransportReadyToSend(false); WaitForThreads(); EXPECT_FALSE(media_channel1_->ready_to_send()); }