From d48a2b14e7545d0a0778df753e062075c044e2a1 Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Tue, 2 Feb 2021 17:57:36 +0100 Subject: [PATCH] Prepare to avoid hops to worker for network events. This moves the thread hop for network events, from BaseChannel and into Call. The reason for this is to move the control over those hops (including DeliverPacket[Async]) into the same class where the state is held that is affected by those hops. Once that's done, we can start moving the relevant network state over to the network thread and eventually remove the hops. I'm also adding several TODOs for tracking future steps and give developers a heads up. Bug: webrtc:11993 Change-Id: Ice7ee3b5b6893532df52039324293979196d341d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/204800 Commit-Queue: Tommi Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#33138} --- audio/audio_receive_stream.cc | 3 + audio/channel_receive.cc | 1 + call/call.cc | 107 +++++++++++++++++++++------- media/engine/webrtc_video_engine.cc | 12 ++-- media/engine/webrtc_voice_engine.cc | 4 +- pc/channel.cc | 13 ++-- pc/channel_unittest.cc | 6 +- 7 files changed, 100 insertions(+), 46 deletions(-) diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc index d6f6140fae..03dd4c0eef 100644 --- a/audio/audio_receive_stream.cc +++ b/audio/audio_receive_stream.cc @@ -341,6 +341,7 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) { } void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. RTC_DCHECK_RUN_ON(&worker_thread_checker_); channel_receive_->SetAssociatedSendChannel( send_stream ? send_stream->GetChannel() : nullptr); @@ -362,6 +363,8 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const { const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting() const { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or + // remove test method and |associated_send_stream_| variable. RTC_DCHECK_RUN_ON(&worker_thread_checker_); return associated_send_stream_; } diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index a8015c8637..5c2b91803a 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -787,6 +787,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers, void ChannelReceive::SetAssociatedSendChannel( const ChannelSendInterface* channel) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. RTC_DCHECK(worker_thread_checker_.IsCurrent()); MutexLock lock(&assoc_send_channel_lock_); associated_send_channel_ = channel; diff --git a/call/call.cc b/call/call.cc index f20f4b5c41..46bf52862f 100644 --- a/call/call.cc +++ b/call/call.cc @@ -335,15 +335,18 @@ class Call final : public webrtc::Call, NetworkState audio_network_state_; NetworkState video_network_state_; + // TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the + // network thread. bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_); // Audio, Video, and FlexFEC receive streams are owned by the client that // creates them. + // TODO(bugs.webrtc.org/11993): Move audio_receive_streams_, + // video_receive_streams_ and sync_stream_mapping_ over to the network thread. std::set audio_receive_streams_ RTC_GUARDED_BY(worker_thread_); std::set video_receive_streams_ RTC_GUARDED_BY(worker_thread_); - std::map sync_stream_mapping_ RTC_GUARDED_BY(worker_thread_); @@ -378,6 +381,9 @@ class Call final : public webrtc::Call, // send side BWE are negotiated. const bool use_send_side_bwe; }; + + // TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the + // network thread. std::map receive_rtp_config_ RTC_GUARDED_BY(worker_thread_); @@ -800,6 +806,8 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( audio_send_ssrcs_.end()); audio_send_ssrcs_[config.rtp.ssrc] = send_stream; + // TODO(bugs.webrtc.org/11993): call AssociateSendStream and + // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { stream->AssociateSendStream(send_stream); @@ -807,6 +815,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } UpdateAggregateNetworkState(); + return send_stream; } @@ -825,6 +834,8 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { size_t num_deleted = audio_send_ssrcs_.erase(ssrc); RTC_DCHECK_EQ(1, num_deleted); + // TODO(bugs.webrtc.org/11993): call AssociateSendStream and + // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { if (stream->config().rtp.local_ssrc == ssrc) { stream->AssociateSendStream(nullptr); @@ -832,6 +843,7 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { } UpdateAggregateNetworkState(); + delete send_stream; } @@ -842,11 +854,19 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( EnsureStarted(); event_log_->Log(std::make_unique( CreateRtcLogStreamConfig(config))); + + // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| + // and |audio_receiver_controller_| out of AudioReceiveStream construction and + // set it up asynchronously on the network thread (the registration and + // |audio_receiver_controller_| need to live on the network thread). AudioReceiveStream* receive_stream = new AudioReceiveStream( clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(), module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); + // TODO(bugs.webrtc.org/11993): Update the below on the network thread. + // We could possibly set up the audio_receiver_controller_ association up + // as part of the async setup. receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config)); audio_receive_streams_.insert(receive_stream); @@ -873,8 +893,12 @@ void Call::DestroyAudioReceiveStream( uint32_t ssrc = config.rtp.remote_ssrc; receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(ssrc); + + // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync + // and UpdateAggregateNetworkState on the network thread. audio_receive_streams_.erase(audio_receive_stream); const std::string& sync_group = audio_receive_stream->config().sync_group; + const auto it = sync_stream_mapping_.find(sync_group); if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) { sync_stream_mapping_.erase(it); @@ -883,6 +907,9 @@ void Call::DestroyAudioReceiveStream( receive_rtp_config_.erase(ssrc); UpdateAggregateNetworkState(); + // TODO(bugs.webrtc.org/11993): Consider if deleting |audio_receive_stream| + // on the network thread would be better or if we'd need to tear down the + // state in two phases. delete audio_receive_stream; } @@ -995,13 +1022,15 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( EnsureStarted(); - TaskQueueBase* current = GetCurrentTaskQueueOrThread(); - RTC_CHECK(current); + // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| + // and |video_receiver_controller_| out of VideoReceiveStream2 construction + // and set it up asynchronously on the network thread (the registration and + // |video_receiver_controller_| need to live on the network thread). VideoReceiveStream2* receive_stream = new VideoReceiveStream2( - task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_, - transport_send_ptr_->packet_router(), std::move(configuration), - module_process_thread_->process_thread(), call_stats_.get(), clock_, - new VCMTiming(clock_)); + task_queue_factory_, worker_thread_, &video_receiver_controller_, + num_cpu_cores_, transport_send_ptr_->packet_router(), + std::move(configuration), module_process_thread_->process_thread(), + call_stats_.get(), clock_, new VCMTiming(clock_)); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); if (config.rtp.rtx_ssrc) { @@ -1134,34 +1163,54 @@ const WebRtcKeyValueConfig& Call::trials() const { } void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { - RTC_DCHECK_RUN_ON(worker_thread_); - switch (media) { - case MediaType::AUDIO: - audio_network_state_ = state; - break; - case MediaType::VIDEO: - video_network_state_ = state; - break; - case MediaType::ANY: - case MediaType::DATA: - RTC_NOTREACHED(); - break; - } + RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK(media == MediaType::AUDIO || media == MediaType::VIDEO); - UpdateAggregateNetworkState(); - for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { - video_receive_stream->SignalNetworkState(video_network_state_); + auto closure = [this, media, state]() { + // TODO(bugs.webrtc.org/11993): Move this over to the network thread. + RTC_DCHECK_RUN_ON(worker_thread_); + if (media == MediaType::AUDIO) { + audio_network_state_ = state; + } else { + RTC_DCHECK_EQ(media, MediaType::VIDEO); + video_network_state_ = state; + } + + // TODO(tommi): Is it necessary to always do this, including if there + // was no change in state? + UpdateAggregateNetworkState(); + + // TODO(tommi): Is it right to do this if media == AUDIO? + for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { + video_receive_stream->SignalNetworkState(video_network_state_); + } + }; + + if (network_thread_ == worker_thread_) { + closure(); + } else { + // TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to + // post to the worker thread. + worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure))); } } void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) { - RTC_DCHECK_RUN_ON(worker_thread_); - for (auto& kv : audio_send_ssrcs_) { - kv.second->SetTransportOverhead(transport_overhead_per_packet); - } + RTC_DCHECK_RUN_ON(network_thread_); + worker_thread_->PostTask( + ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() { + // TODO(bugs.webrtc.org/11993): Move this over to the network thread. + RTC_DCHECK_RUN_ON(worker_thread_); + for (auto& kv : audio_send_ssrcs_) { + kv.second->SetTransportOverhead(transport_overhead_per_packet); + } + })); } void Call::UpdateAggregateNetworkState() { + // TODO(bugs.webrtc.org/11993): Move this over to the network thread. + // RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(worker_thread_); bool have_audio = @@ -1241,6 +1290,7 @@ void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { } void Call::ConfigureSync(const std::string& sync_group) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. // Set sync only if there was no previous one. if (sync_group.empty()) return; @@ -1452,6 +1502,9 @@ void Call::DeliverPacketAsync(MediaType media_type, } void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. + // This method is called synchronously via |OnRtpPacket()| (see DeliverRtp) + // on the same thread. RTC_DCHECK_RUN_ON(worker_thread_); RtpPacketReceived parsed_packet; if (!parsed_packet.Parse(packet, length)) diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index 161c075674..244a1a92cc 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -1803,7 +1803,7 @@ void WebRtcVideoChannel::BackfillBufferedPackets( } void WebRtcVideoChannel::OnReadyToSend(bool ready) { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready."); call_->SignalChannelNetworkState( webrtc::MediaType::VIDEO, @@ -1813,11 +1813,11 @@ void WebRtcVideoChannel::OnReadyToSend(bool ready) { void WebRtcVideoChannel::OnNetworkRouteChanged( const std::string& transport_name, const rtc::NetworkRoute& network_route) { - RTC_DCHECK_RUN_ON(&thread_checker_); - call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name, - network_route); - call_->GetTransportControllerSend()->OnTransportOverheadChanged( - network_route.packet_overhead); + RTC_DCHECK_RUN_ON(&network_thread_checker_); + webrtc::RtpTransportControllerSendInterface* transport = + call_->GetTransportControllerSend(); + transport->OnNetworkRouteChanged(transport_name, network_route); + transport->OnTransportOverheadChanged(network_route.packet_overhead); } void WebRtcVideoChannel::SetInterface(NetworkInterface* iface) { diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc index 9efef3aefc..0df96f3d1c 100644 --- a/media/engine/webrtc_voice_engine.cc +++ b/media/engine/webrtc_voice_engine.cc @@ -2290,7 +2290,7 @@ void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet, void WebRtcVoiceMediaChannel::OnNetworkRouteChanged( const std::string& transport_name, const rtc::NetworkRoute& network_route) { - RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name, network_route); call_->OnAudioTransportOverheadChanged(network_route.packet_overhead); @@ -2335,7 +2335,7 @@ bool WebRtcVoiceMediaChannel::SetMaxSendBitrate(int bps) { } void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) { - RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready."); call_->SignalChannelNetworkState( webrtc::MediaType::AUDIO, diff --git a/pc/channel.cc b/pc/channel.cc index b672a96539..16e226384a 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -369,7 +369,7 @@ void BaseChannel::OnWritableState(bool writable) { void BaseChannel::OnNetworkRouteChanged( absl::optional network_route) { - RTC_LOG(LS_INFO) << "Network route for " << ToString() << " was changed."; + RTC_LOG(LS_INFO) << "Network route changed for " << ToString(); RTC_DCHECK_RUN_ON(network_thread()); rtc::NetworkRoute new_route; @@ -380,10 +380,7 @@ void BaseChannel::OnNetworkRouteChanged( // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot // work correctly. Intentionally leave it broken to simplify the code and // encourage the users to stop using non-muxing RTCP. - worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] { - RTC_DCHECK_RUN_ON(worker_thread()); - media_channel_->OnNetworkRouteChanged(transport_name_, new_route); - })); + media_channel_->OnNetworkRouteChanged(transport_name_, new_route); } sigslot::signal1& BaseChannel::SignalFirstPacketReceived() { @@ -399,10 +396,8 @@ sigslot::signal1& BaseChannel::SignalSentPacket() { } void BaseChannel::OnTransportReadyToSend(bool ready) { - worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] { - RTC_DCHECK_RUN_ON(worker_thread()); - media_channel_->OnReadyToSend(ready); - })); + RTC_DCHECK_RUN_ON(network_thread()); + media_channel_->OnReadyToSend(ready); } bool BaseChannel::SendPacket(bool rtcp, diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc index f2e93d69ea..4a0a6b4a15 100644 --- a/pc/channel_unittest.cc +++ b/pc/channel_unittest.cc @@ -1205,11 +1205,13 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { CreateChannels(0, 0); EXPECT_FALSE(media_channel1_->ready_to_send()); - channel1_->OnTransportReadyToSend(true); + network_thread_->PostTask( + RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(true); }); WaitForThreads(); EXPECT_TRUE(media_channel1_->ready_to_send()); - channel1_->OnTransportReadyToSend(false); + network_thread_->PostTask( + RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(false); }); WaitForThreads(); EXPECT_FALSE(media_channel1_->ready_to_send()); }