diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc index cdbab66995..ef69e732ab 100644 --- a/audio/audio_receive_stream.cc +++ b/audio/audio_receive_stream.cc @@ -91,7 +91,6 @@ std::unique_ptr CreateChannelReceive( AudioReceiveStream::AudioReceiveStream( Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, ProcessThread* module_process_thread, NetEqFactory* neteq_factory, @@ -99,7 +98,6 @@ AudioReceiveStream::AudioReceiveStream( const rtc::scoped_refptr& audio_state, webrtc::RtcEventLog* event_log) : AudioReceiveStream(clock, - receiver_controller, packet_router, config, audio_state, @@ -113,7 +111,6 @@ AudioReceiveStream::AudioReceiveStream( AudioReceiveStream::AudioReceiveStream( Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, const webrtc::AudioReceiveStream::Config& config, const rtc::scoped_refptr& audio_state, @@ -129,7 +126,8 @@ AudioReceiveStream::AudioReceiveStream( RTC_DCHECK(audio_state_); RTC_DCHECK(channel_receive_); - RTC_DCHECK(receiver_controller); + network_thread_checker_.Detach(); + RTC_DCHECK(packet_router); // Configure bandwidth estimation. channel_receive_->RegisterReceiverCongestionControlObjects(packet_router); @@ -139,10 +137,6 @@ AudioReceiveStream::AudioReceiveStream( // be updated. channel_receive_->SetSourceTracker(&source_tracker_); - // Register with transport. - rtp_stream_receiver_ = receiver_controller->CreateReceiver( - config.rtp.remote_ssrc, channel_receive_.get()); - // Complete configuration. // TODO(solenberg): Config NACK history window (which is a packet count), // using the actual packet size for the configured codec. @@ -161,6 +155,19 @@ AudioReceiveStream::~AudioReceiveStream() { channel_receive_->ResetReceiverCongestionControlObjects(); } +void AudioReceiveStream::RegisterWithTransport( + RtpStreamReceiverControllerInterface* receiver_controller) { + RTC_DCHECK_RUN_ON(&network_thread_checker_); + RTC_DCHECK(!rtp_stream_receiver_); + rtp_stream_receiver_ = receiver_controller->CreateReceiver( + config_.rtp.remote_ssrc, channel_receive_.get()); +} + +void AudioReceiveStream::UnregisterFromTransport() { + RTC_DCHECK_RUN_ON(&network_thread_checker_); + rtp_stream_receiver_.reset(); +} + void AudioReceiveStream::Reconfigure( const webrtc::AudioReceiveStream::Config& config) { RTC_DCHECK(worker_thread_checker_.IsCurrent()); @@ -388,8 +395,7 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) { } void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); channel_receive_->SetAssociatedSendChannel( send_stream ? send_stream->GetChannel() : nullptr); associated_send_stream_ = send_stream; @@ -410,9 +416,7 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const { const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting() const { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or - // remove test method and |associated_send_stream_| variable. - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); return associated_send_stream_; } diff --git a/audio/audio_receive_stream.h b/audio/audio_receive_stream.h index 769578ba3b..40749cc950 100644 --- a/audio/audio_receive_stream.h +++ b/audio/audio_receive_stream.h @@ -22,6 +22,7 @@ #include "call/audio_receive_stream.h" #include "call/syncable.h" #include "modules/rtp_rtcp/source/source_tracker.h" +#include "rtc_base/system/no_unique_address.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -44,7 +45,6 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, public Syncable { public: AudioReceiveStream(Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, ProcessThread* module_process_thread, NetEqFactory* neteq_factory, @@ -54,7 +54,6 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, // For unit tests, which need to supply a mock channel receive. AudioReceiveStream( Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, const webrtc::AudioReceiveStream::Config& config, const rtc::scoped_refptr& audio_state, @@ -65,8 +64,22 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, AudioReceiveStream(const AudioReceiveStream&) = delete; AudioReceiveStream& operator=(const AudioReceiveStream&) = delete; + // Destruction happens on the worker thread. Prior to destruction the caller + // must ensure that a registration with the transport has been cleared. See + // `RegisterWithTransport` for details. + // TODO(tommi): As a further improvement to this, performing the full + // destruction on the network thread could be made the default. ~AudioReceiveStream() override; + // Called on the network thread to register/unregister with the network + // transport. + void RegisterWithTransport( + RtpStreamReceiverControllerInterface* receiver_controller); + // If registration has previously been done (via `RegisterWithTransport`) then + // `UnregisterFromTransport` must be called prior to destruction, on the + // network thread. + void UnregisterFromTransport(); + // webrtc::AudioReceiveStream implementation. void Reconfigure(const webrtc::AudioReceiveStream::Config& config) override; void Start() override; @@ -104,16 +117,26 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, private: AudioState* audio_state() const; - SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; + // TODO(bugs.webrtc.org/11993): This checker conceptually represents + // operations that belong to the network thread. The Call class is currently + // moving towards handling network packets on the network thread and while + // that work is ongoing, this checker may in practice represent the worker + // thread, but still serves as a mechanism of grouping together concepts + // that belong to the network thread. Once the packets are fully delivered + // on the network thread, this comment will be deleted. + RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_; webrtc::AudioReceiveStream::Config config_; rtc::scoped_refptr audio_state_; SourceTracker source_tracker_; const std::unique_ptr channel_receive_; - AudioSendStream* associated_send_stream_ = nullptr; + AudioSendStream* associated_send_stream_ + RTC_GUARDED_BY(network_thread_checker_) = nullptr; bool playing_ RTC_GUARDED_BY(worker_thread_checker_) = false; - std::unique_ptr rtp_stream_receiver_; + std::unique_ptr rtp_stream_receiver_ + RTC_GUARDED_BY(network_thread_checker_); }; } // namespace internal } // namespace webrtc diff --git a/audio/audio_receive_stream_unittest.cc b/audio/audio_receive_stream_unittest.cc index 72244dd84a..59a1f2f5be 100644 --- a/audio/audio_receive_stream_unittest.cc +++ b/audio/audio_receive_stream_unittest.cc @@ -121,11 +121,12 @@ struct ConfigHelper { } std::unique_ptr CreateAudioReceiveStream() { - return std::unique_ptr( - new internal::AudioReceiveStream( - Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, - &packet_router_, stream_config_, audio_state_, &event_log_, - std::unique_ptr(channel_receive_))); + auto ret = std::make_unique( + Clock::GetRealTimeClock(), &packet_router_, stream_config_, + audio_state_, &event_log_, + std::unique_ptr(channel_receive_)); + ret->RegisterWithTransport(&rtp_stream_receiver_controller_); + return ret; } AudioReceiveStream::Config& config() { return stream_config_; } @@ -199,6 +200,7 @@ TEST(AudioReceiveStreamTest, ConstructDestruct) { for (bool use_null_audio_processing : {false, true}) { ConfigHelper helper(use_null_audio_processing); auto recv_stream = helper.CreateAudioReceiveStream(); + recv_stream->UnregisterFromTransport(); } } @@ -212,6 +214,7 @@ TEST(AudioReceiveStreamTest, ReceiveRtcpPacket) { ReceivedRTCPPacket(&rtcp_packet[0], rtcp_packet.size())) .WillOnce(Return()); recv_stream->DeliverRtcp(&rtcp_packet[0], rtcp_packet.size()); + recv_stream->UnregisterFromTransport(); } } @@ -276,6 +279,7 @@ TEST(AudioReceiveStreamTest, GetStats) { EXPECT_EQ(kCallStats.capture_start_ntp_time_ms_, stats.capture_start_ntp_time_ms); EXPECT_EQ(kPlayoutNtpTimestampMs, stats.estimated_playout_ntp_timestamp_ms); + recv_stream->UnregisterFromTransport(); } } @@ -286,6 +290,7 @@ TEST(AudioReceiveStreamTest, SetGain) { EXPECT_CALL(*helper.channel_receive(), SetChannelOutputVolumeScaling(FloatEq(0.765f))); recv_stream->SetGain(0.765f); + recv_stream->UnregisterFromTransport(); } } @@ -317,6 +322,9 @@ TEST(AudioReceiveStreamTest, StreamsShouldBeAddedToMixerOnceOnStart) { // Stop stream before it is being destructed. recv_stream2->Stop(); + + recv_stream1->UnregisterFromTransport(); + recv_stream2->UnregisterFromTransport(); } } @@ -325,6 +333,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithSameConfig) { ConfigHelper helper(use_null_audio_processing); auto recv_stream = helper.CreateAudioReceiveStream(); recv_stream->Reconfigure(helper.config()); + recv_stream->UnregisterFromTransport(); } } @@ -348,6 +357,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithUpdatedConfig) { EXPECT_CALL(channel_receive, SetReceiveCodecs(new_config.decoder_map)); recv_stream->Reconfigure(new_config); + recv_stream->UnregisterFromTransport(); } } @@ -369,6 +379,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithFrameDecryptor) { new_config_1.frame_decryptor = mock_frame_decryptor_1; new_config_1.crypto_options.sframe.require_frame_encryption = true; recv_stream->Reconfigure(new_config_1); + recv_stream->UnregisterFromTransport(); } } diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index fd33dbdf24..415da3445b 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -48,6 +48,7 @@ #include "rtc_base/numerics/safe_minmax.h" #include "rtc_base/race_checker.h" #include "rtc_base/synchronization/mutex.h" +#include "rtc_base/system/no_unique_address.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/metrics.h" @@ -200,7 +201,8 @@ class ChannelReceive : public ChannelReceiveInterface { // we know about. The goal is to eventually split up voe::ChannelReceive into // parts with single-threaded semantics, and thereby reduce the need for // locks. - SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_; // Methods accessed from audio and video threads are checked for sequential- // only access. We don't necessarily own and control these threads, so thread @@ -267,7 +269,7 @@ class ChannelReceive : public ChannelReceiveInterface { float _outputGain RTC_GUARDED_BY(volume_settings_mutex_); const ChannelSendInterface* associated_send_channel_ - RTC_GUARDED_BY(worker_thread_checker_); + RTC_GUARDED_BY(network_thread_checker_); PacketRouter* packet_router_ = nullptr; @@ -525,6 +527,8 @@ ChannelReceive::ChannelReceive( RTC_DCHECK(module_process_thread_); RTC_DCHECK(audio_device_module); + network_thread_checker_.Detach(); + acm_receiver_.ResetInitialDelay(); acm_receiver_.SetMinimumDelay(0); acm_receiver_.SetMaximumDelay(0); @@ -857,8 +861,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers, void ChannelReceive::SetAssociatedSendChannel( const ChannelSendInterface* channel) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); associated_send_channel_ = channel; } @@ -1039,7 +1042,7 @@ int ChannelReceive::GetRtpTimestampRateHz() const { } int64_t ChannelReceive::GetRTT() const { - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); std::vector report_blocks = rtp_rtcp_->GetLatestReportBlockData(); diff --git a/call/call.cc b/call/call.cc index de30d6540f..5835412f33 100644 --- a/call/call.cc +++ b/call/call.cc @@ -923,15 +923,16 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( event_log_->Log(std::make_unique( CreateRtcLogStreamConfig(config))); - // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| - // and |audio_receiver_controller_| out of AudioReceiveStream construction and - // set it up asynchronously on the network thread (the registration and - // |audio_receiver_controller_| need to live on the network thread). AudioReceiveStream* receive_stream = new AudioReceiveStream( - clock_, &audio_receiver_controller_, transport_send_->packet_router(), + clock_, transport_send_->packet_router(), module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); + // TODO(bugs.webrtc.org/11993): Make the registration on the network thread + // (asynchronously). The registration and `audio_receiver_controller_` need + // to live on the network thread. + receive_stream->RegisterWithTransport(&audio_receiver_controller_); + // TODO(bugs.webrtc.org/11993): Update the below on the network thread. // We could possibly set up the audio_receiver_controller_ association up // as part of the async setup. @@ -963,7 +964,9 @@ void Call::DestroyAudioReceiveStream( ->RemoveStream(ssrc); // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync - // and UpdateAggregateNetworkState on the network thread. + // and UpdateAggregateNetworkState on the network thread. The call to + // `UnregisterFromTransport` should also happen on the network thread. + audio_receive_stream->UnregisterFromTransport(); audio_receive_streams_.erase(audio_receive_stream); const std::string& sync_group = audio_receive_stream->config().sync_group;