diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc index d6f6140fae..03dd4c0eef 100644 --- a/audio/audio_receive_stream.cc +++ b/audio/audio_receive_stream.cc @@ -341,6 +341,7 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) { } void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. RTC_DCHECK_RUN_ON(&worker_thread_checker_); channel_receive_->SetAssociatedSendChannel( send_stream ? send_stream->GetChannel() : nullptr); @@ -362,6 +363,8 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const { const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting() const { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or + // remove test method and |associated_send_stream_| variable. RTC_DCHECK_RUN_ON(&worker_thread_checker_); return associated_send_stream_; } diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index a8015c8637..5c2b91803a 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -787,6 +787,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers, void ChannelReceive::SetAssociatedSendChannel( const ChannelSendInterface* channel) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. RTC_DCHECK(worker_thread_checker_.IsCurrent()); MutexLock lock(&assoc_send_channel_lock_); associated_send_channel_ = channel; diff --git a/call/call.cc b/call/call.cc index f20f4b5c41..46bf52862f 100644 --- a/call/call.cc +++ b/call/call.cc @@ -335,15 +335,18 @@ class Call final : public webrtc::Call, NetworkState audio_network_state_; NetworkState video_network_state_; + // TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the + // network thread. bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_); // Audio, Video, and FlexFEC receive streams are owned by the client that // creates them. + // TODO(bugs.webrtc.org/11993): Move audio_receive_streams_, + // video_receive_streams_ and sync_stream_mapping_ over to the network thread. std::set audio_receive_streams_ RTC_GUARDED_BY(worker_thread_); std::set video_receive_streams_ RTC_GUARDED_BY(worker_thread_); - std::map sync_stream_mapping_ RTC_GUARDED_BY(worker_thread_); @@ -378,6 +381,9 @@ class Call final : public webrtc::Call, // send side BWE are negotiated. const bool use_send_side_bwe; }; + + // TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the + // network thread. std::map receive_rtp_config_ RTC_GUARDED_BY(worker_thread_); @@ -800,6 +806,8 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( audio_send_ssrcs_.end()); audio_send_ssrcs_[config.rtp.ssrc] = send_stream; + // TODO(bugs.webrtc.org/11993): call AssociateSendStream and + // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { stream->AssociateSendStream(send_stream); @@ -807,6 +815,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } UpdateAggregateNetworkState(); + return send_stream; } @@ -825,6 +834,8 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { size_t num_deleted = audio_send_ssrcs_.erase(ssrc); RTC_DCHECK_EQ(1, num_deleted); + // TODO(bugs.webrtc.org/11993): call AssociateSendStream and + // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { if (stream->config().rtp.local_ssrc == ssrc) { stream->AssociateSendStream(nullptr); @@ -832,6 +843,7 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { } UpdateAggregateNetworkState(); + delete send_stream; } @@ -842,11 +854,19 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( EnsureStarted(); event_log_->Log(std::make_unique( CreateRtcLogStreamConfig(config))); + + // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| + // and |audio_receiver_controller_| out of AudioReceiveStream construction and + // set it up asynchronously on the network thread (the registration and + // |audio_receiver_controller_| need to live on the network thread). AudioReceiveStream* receive_stream = new AudioReceiveStream( clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(), module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); + // TODO(bugs.webrtc.org/11993): Update the below on the network thread. + // We could possibly set up the audio_receiver_controller_ association up + // as part of the async setup. receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config)); audio_receive_streams_.insert(receive_stream); @@ -873,8 +893,12 @@ void Call::DestroyAudioReceiveStream( uint32_t ssrc = config.rtp.remote_ssrc; receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(ssrc); + + // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync + // and UpdateAggregateNetworkState on the network thread. audio_receive_streams_.erase(audio_receive_stream); const std::string& sync_group = audio_receive_stream->config().sync_group; + const auto it = sync_stream_mapping_.find(sync_group); if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) { sync_stream_mapping_.erase(it); @@ -883,6 +907,9 @@ void Call::DestroyAudioReceiveStream( receive_rtp_config_.erase(ssrc); UpdateAggregateNetworkState(); + // TODO(bugs.webrtc.org/11993): Consider if deleting |audio_receive_stream| + // on the network thread would be better or if we'd need to tear down the + // state in two phases. delete audio_receive_stream; } @@ -995,13 +1022,15 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( EnsureStarted(); - TaskQueueBase* current = GetCurrentTaskQueueOrThread(); - RTC_CHECK(current); + // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| + // and |video_receiver_controller_| out of VideoReceiveStream2 construction + // and set it up asynchronously on the network thread (the registration and + // |video_receiver_controller_| need to live on the network thread). VideoReceiveStream2* receive_stream = new VideoReceiveStream2( - task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_, - transport_send_ptr_->packet_router(), std::move(configuration), - module_process_thread_->process_thread(), call_stats_.get(), clock_, - new VCMTiming(clock_)); + task_queue_factory_, worker_thread_, &video_receiver_controller_, + num_cpu_cores_, transport_send_ptr_->packet_router(), + std::move(configuration), module_process_thread_->process_thread(), + call_stats_.get(), clock_, new VCMTiming(clock_)); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); if (config.rtp.rtx_ssrc) { @@ -1134,34 +1163,54 @@ const WebRtcKeyValueConfig& Call::trials() const { } void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { - RTC_DCHECK_RUN_ON(worker_thread_); - switch (media) { - case MediaType::AUDIO: - audio_network_state_ = state; - break; - case MediaType::VIDEO: - video_network_state_ = state; - break; - case MediaType::ANY: - case MediaType::DATA: - RTC_NOTREACHED(); - break; - } + RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK(media == MediaType::AUDIO || media == MediaType::VIDEO); - UpdateAggregateNetworkState(); - for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { - video_receive_stream->SignalNetworkState(video_network_state_); + auto closure = [this, media, state]() { + // TODO(bugs.webrtc.org/11993): Move this over to the network thread. + RTC_DCHECK_RUN_ON(worker_thread_); + if (media == MediaType::AUDIO) { + audio_network_state_ = state; + } else { + RTC_DCHECK_EQ(media, MediaType::VIDEO); + video_network_state_ = state; + } + + // TODO(tommi): Is it necessary to always do this, including if there + // was no change in state? + UpdateAggregateNetworkState(); + + // TODO(tommi): Is it right to do this if media == AUDIO? + for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { + video_receive_stream->SignalNetworkState(video_network_state_); + } + }; + + if (network_thread_ == worker_thread_) { + closure(); + } else { + // TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to + // post to the worker thread. + worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure))); } } void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) { - RTC_DCHECK_RUN_ON(worker_thread_); - for (auto& kv : audio_send_ssrcs_) { - kv.second->SetTransportOverhead(transport_overhead_per_packet); - } + RTC_DCHECK_RUN_ON(network_thread_); + worker_thread_->PostTask( + ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() { + // TODO(bugs.webrtc.org/11993): Move this over to the network thread. + RTC_DCHECK_RUN_ON(worker_thread_); + for (auto& kv : audio_send_ssrcs_) { + kv.second->SetTransportOverhead(transport_overhead_per_packet); + } + })); } void Call::UpdateAggregateNetworkState() { + // TODO(bugs.webrtc.org/11993): Move this over to the network thread. + // RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(worker_thread_); bool have_audio = @@ -1241,6 +1290,7 @@ void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { } void Call::ConfigureSync(const std::string& sync_group) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. // Set sync only if there was no previous one. if (sync_group.empty()) return; @@ -1452,6 +1502,9 @@ void Call::DeliverPacketAsync(MediaType media_type, } void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. + // This method is called synchronously via |OnRtpPacket()| (see DeliverRtp) + // on the same thread. RTC_DCHECK_RUN_ON(worker_thread_); RtpPacketReceived parsed_packet; if (!parsed_packet.Parse(packet, length)) diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index 28ed365b38..1f93be6809 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -1803,7 +1803,7 @@ void WebRtcVideoChannel::BackfillBufferedPackets( } void WebRtcVideoChannel::OnReadyToSend(bool ready) { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready."); call_->SignalChannelNetworkState( webrtc::MediaType::VIDEO, @@ -1813,11 +1813,15 @@ void WebRtcVideoChannel::OnReadyToSend(bool ready) { void WebRtcVideoChannel::OnNetworkRouteChanged( const std::string& transport_name, const rtc::NetworkRoute& network_route) { - RTC_DCHECK_RUN_ON(&thread_checker_); - call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name, - network_route); - call_->GetTransportControllerSend()->OnTransportOverheadChanged( - network_route.packet_overhead); + RTC_DCHECK_RUN_ON(&network_thread_checker_); + worker_thread_->PostTask(ToQueuedTask( + task_safety_, [this, name = transport_name, route = network_route] { + RTC_DCHECK_RUN_ON(&thread_checker_); + webrtc::RtpTransportControllerSendInterface* transport = + call_->GetTransportControllerSend(); + transport->OnNetworkRouteChanged(name, route); + transport->OnTransportOverheadChanged(route.packet_overhead); + })); } void WebRtcVideoChannel::SetInterface(NetworkInterface* iface) { diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc index 9efef3aefc..3243418e35 100644 --- a/media/engine/webrtc_voice_engine.cc +++ b/media/engine/webrtc_voice_engine.cc @@ -2290,10 +2290,15 @@ void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet, void WebRtcVoiceMediaChannel::OnNetworkRouteChanged( const std::string& transport_name, const rtc::NetworkRoute& network_route) { - RTC_DCHECK_RUN_ON(worker_thread_); - call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name, - network_route); + RTC_DCHECK_RUN_ON(&network_thread_checker_); + call_->OnAudioTransportOverheadChanged(network_route.packet_overhead); + + worker_thread_->PostTask(ToQueuedTask( + task_safety_, [this, name = transport_name, route = network_route] { + RTC_DCHECK_RUN_ON(worker_thread_); + call_->GetTransportControllerSend()->OnNetworkRouteChanged(name, route); + })); } bool WebRtcVoiceMediaChannel::MuteStream(uint32_t ssrc, bool muted) { @@ -2335,7 +2340,7 @@ bool WebRtcVoiceMediaChannel::SetMaxSendBitrate(int bps) { } void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) { - RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready."); call_->SignalChannelNetworkState( webrtc::MediaType::AUDIO, diff --git a/pc/channel.cc b/pc/channel.cc index b672a96539..16e226384a 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -369,7 +369,7 @@ void BaseChannel::OnWritableState(bool writable) { void BaseChannel::OnNetworkRouteChanged( absl::optional network_route) { - RTC_LOG(LS_INFO) << "Network route for " << ToString() << " was changed."; + RTC_LOG(LS_INFO) << "Network route changed for " << ToString(); RTC_DCHECK_RUN_ON(network_thread()); rtc::NetworkRoute new_route; @@ -380,10 +380,7 @@ void BaseChannel::OnNetworkRouteChanged( // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot // work correctly. Intentionally leave it broken to simplify the code and // encourage the users to stop using non-muxing RTCP. - worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] { - RTC_DCHECK_RUN_ON(worker_thread()); - media_channel_->OnNetworkRouteChanged(transport_name_, new_route); - })); + media_channel_->OnNetworkRouteChanged(transport_name_, new_route); } sigslot::signal1& BaseChannel::SignalFirstPacketReceived() { @@ -399,10 +396,8 @@ sigslot::signal1& BaseChannel::SignalSentPacket() { } void BaseChannel::OnTransportReadyToSend(bool ready) { - worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] { - RTC_DCHECK_RUN_ON(worker_thread()); - media_channel_->OnReadyToSend(ready); - })); + RTC_DCHECK_RUN_ON(network_thread()); + media_channel_->OnReadyToSend(ready); } bool BaseChannel::SendPacket(bool rtcp, diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc index f2e93d69ea..4a0a6b4a15 100644 --- a/pc/channel_unittest.cc +++ b/pc/channel_unittest.cc @@ -1205,11 +1205,13 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { CreateChannels(0, 0); EXPECT_FALSE(media_channel1_->ready_to_send()); - channel1_->OnTransportReadyToSend(true); + network_thread_->PostTask( + RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(true); }); WaitForThreads(); EXPECT_TRUE(media_channel1_->ready_to_send()); - channel1_->OnTransportReadyToSend(false); + network_thread_->PostTask( + RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(false); }); WaitForThreads(); EXPECT_FALSE(media_channel1_->ready_to_send()); }