From 02df2eb1de61994e4a02e4a5eb010726933ffa39 Mon Sep 17 00:00:00 2001 From: Tommi Date: Mon, 31 May 2021 12:57:53 +0200 Subject: [PATCH] Split AudioStream initialization into worker / network steps. This is in preparation for actually doing this initialization differently in the Call class. This CL takes the registration steps that are inherently network thread associated and makes them separate from the ctor/dtor. Bug: webrtc:11993 Change-Id: Ice2e16c108e0c302157534a3aa2b46738aaa7a93 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220608 Commit-Queue: Tommi Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#34163} --- audio/audio_receive_stream.cc | 30 +++++++++++++---------- audio/audio_receive_stream.h | 33 ++++++++++++++++++++++---- audio/audio_receive_stream_unittest.cc | 21 ++++++++++++---- audio/channel_receive.cc | 13 ++++++---- call/call.cc | 15 +++++++----- 5 files changed, 78 insertions(+), 34 deletions(-) diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc index cdbab66995..ef69e732ab 100644 --- a/audio/audio_receive_stream.cc +++ b/audio/audio_receive_stream.cc @@ -91,7 +91,6 @@ std::unique_ptr CreateChannelReceive( AudioReceiveStream::AudioReceiveStream( Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, ProcessThread* module_process_thread, NetEqFactory* neteq_factory, @@ -99,7 +98,6 @@ AudioReceiveStream::AudioReceiveStream( const rtc::scoped_refptr& audio_state, webrtc::RtcEventLog* event_log) : AudioReceiveStream(clock, - receiver_controller, packet_router, config, audio_state, @@ -113,7 +111,6 @@ AudioReceiveStream::AudioReceiveStream( AudioReceiveStream::AudioReceiveStream( Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, const webrtc::AudioReceiveStream::Config& config, const rtc::scoped_refptr& audio_state, @@ -129,7 +126,8 @@ AudioReceiveStream::AudioReceiveStream( RTC_DCHECK(audio_state_); RTC_DCHECK(channel_receive_); - RTC_DCHECK(receiver_controller); + network_thread_checker_.Detach(); + RTC_DCHECK(packet_router); // Configure bandwidth estimation. channel_receive_->RegisterReceiverCongestionControlObjects(packet_router); @@ -139,10 +137,6 @@ AudioReceiveStream::AudioReceiveStream( // be updated. channel_receive_->SetSourceTracker(&source_tracker_); - // Register with transport. - rtp_stream_receiver_ = receiver_controller->CreateReceiver( - config.rtp.remote_ssrc, channel_receive_.get()); - // Complete configuration. // TODO(solenberg): Config NACK history window (which is a packet count), // using the actual packet size for the configured codec. @@ -161,6 +155,19 @@ AudioReceiveStream::~AudioReceiveStream() { channel_receive_->ResetReceiverCongestionControlObjects(); } +void AudioReceiveStream::RegisterWithTransport( + RtpStreamReceiverControllerInterface* receiver_controller) { + RTC_DCHECK_RUN_ON(&network_thread_checker_); + RTC_DCHECK(!rtp_stream_receiver_); + rtp_stream_receiver_ = receiver_controller->CreateReceiver( + config_.rtp.remote_ssrc, channel_receive_.get()); +} + +void AudioReceiveStream::UnregisterFromTransport() { + RTC_DCHECK_RUN_ON(&network_thread_checker_); + rtp_stream_receiver_.reset(); +} + void AudioReceiveStream::Reconfigure( const webrtc::AudioReceiveStream::Config& config) { RTC_DCHECK(worker_thread_checker_.IsCurrent()); @@ -388,8 +395,7 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) { } void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); channel_receive_->SetAssociatedSendChannel( send_stream ? send_stream->GetChannel() : nullptr); associated_send_stream_ = send_stream; @@ -410,9 +416,7 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const { const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting() const { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or - // remove test method and |associated_send_stream_| variable. - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); return associated_send_stream_; } diff --git a/audio/audio_receive_stream.h b/audio/audio_receive_stream.h index 769578ba3b..40749cc950 100644 --- a/audio/audio_receive_stream.h +++ b/audio/audio_receive_stream.h @@ -22,6 +22,7 @@ #include "call/audio_receive_stream.h" #include "call/syncable.h" #include "modules/rtp_rtcp/source/source_tracker.h" +#include "rtc_base/system/no_unique_address.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -44,7 +45,6 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, public Syncable { public: AudioReceiveStream(Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, ProcessThread* module_process_thread, NetEqFactory* neteq_factory, @@ -54,7 +54,6 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, // For unit tests, which need to supply a mock channel receive. AudioReceiveStream( Clock* clock, - RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, const webrtc::AudioReceiveStream::Config& config, const rtc::scoped_refptr& audio_state, @@ -65,8 +64,22 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, AudioReceiveStream(const AudioReceiveStream&) = delete; AudioReceiveStream& operator=(const AudioReceiveStream&) = delete; + // Destruction happens on the worker thread. Prior to destruction the caller + // must ensure that a registration with the transport has been cleared. See + // `RegisterWithTransport` for details. + // TODO(tommi): As a further improvement to this, performing the full + // destruction on the network thread could be made the default. ~AudioReceiveStream() override; + // Called on the network thread to register/unregister with the network + // transport. + void RegisterWithTransport( + RtpStreamReceiverControllerInterface* receiver_controller); + // If registration has previously been done (via `RegisterWithTransport`) then + // `UnregisterFromTransport` must be called prior to destruction, on the + // network thread. + void UnregisterFromTransport(); + // webrtc::AudioReceiveStream implementation. void Reconfigure(const webrtc::AudioReceiveStream::Config& config) override; void Start() override; @@ -104,16 +117,26 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, private: AudioState* audio_state() const; - SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; + // TODO(bugs.webrtc.org/11993): This checker conceptually represents + // operations that belong to the network thread. The Call class is currently + // moving towards handling network packets on the network thread and while + // that work is ongoing, this checker may in practice represent the worker + // thread, but still serves as a mechanism of grouping together concepts + // that belong to the network thread. Once the packets are fully delivered + // on the network thread, this comment will be deleted. + RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_; webrtc::AudioReceiveStream::Config config_; rtc::scoped_refptr audio_state_; SourceTracker source_tracker_; const std::unique_ptr channel_receive_; - AudioSendStream* associated_send_stream_ = nullptr; + AudioSendStream* associated_send_stream_ + RTC_GUARDED_BY(network_thread_checker_) = nullptr; bool playing_ RTC_GUARDED_BY(worker_thread_checker_) = false; - std::unique_ptr rtp_stream_receiver_; + std::unique_ptr rtp_stream_receiver_ + RTC_GUARDED_BY(network_thread_checker_); }; } // namespace internal } // namespace webrtc diff --git a/audio/audio_receive_stream_unittest.cc b/audio/audio_receive_stream_unittest.cc index 72244dd84a..59a1f2f5be 100644 --- a/audio/audio_receive_stream_unittest.cc +++ b/audio/audio_receive_stream_unittest.cc @@ -121,11 +121,12 @@ struct ConfigHelper { } std::unique_ptr CreateAudioReceiveStream() { - return std::unique_ptr( - new internal::AudioReceiveStream( - Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, - &packet_router_, stream_config_, audio_state_, &event_log_, - std::unique_ptr(channel_receive_))); + auto ret = std::make_unique( + Clock::GetRealTimeClock(), &packet_router_, stream_config_, + audio_state_, &event_log_, + std::unique_ptr(channel_receive_)); + ret->RegisterWithTransport(&rtp_stream_receiver_controller_); + return ret; } AudioReceiveStream::Config& config() { return stream_config_; } @@ -199,6 +200,7 @@ TEST(AudioReceiveStreamTest, ConstructDestruct) { for (bool use_null_audio_processing : {false, true}) { ConfigHelper helper(use_null_audio_processing); auto recv_stream = helper.CreateAudioReceiveStream(); + recv_stream->UnregisterFromTransport(); } } @@ -212,6 +214,7 @@ TEST(AudioReceiveStreamTest, ReceiveRtcpPacket) { ReceivedRTCPPacket(&rtcp_packet[0], rtcp_packet.size())) .WillOnce(Return()); recv_stream->DeliverRtcp(&rtcp_packet[0], rtcp_packet.size()); + recv_stream->UnregisterFromTransport(); } } @@ -276,6 +279,7 @@ TEST(AudioReceiveStreamTest, GetStats) { EXPECT_EQ(kCallStats.capture_start_ntp_time_ms_, stats.capture_start_ntp_time_ms); EXPECT_EQ(kPlayoutNtpTimestampMs, stats.estimated_playout_ntp_timestamp_ms); + recv_stream->UnregisterFromTransport(); } } @@ -286,6 +290,7 @@ TEST(AudioReceiveStreamTest, SetGain) { EXPECT_CALL(*helper.channel_receive(), SetChannelOutputVolumeScaling(FloatEq(0.765f))); recv_stream->SetGain(0.765f); + recv_stream->UnregisterFromTransport(); } } @@ -317,6 +322,9 @@ TEST(AudioReceiveStreamTest, StreamsShouldBeAddedToMixerOnceOnStart) { // Stop stream before it is being destructed. recv_stream2->Stop(); + + recv_stream1->UnregisterFromTransport(); + recv_stream2->UnregisterFromTransport(); } } @@ -325,6 +333,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithSameConfig) { ConfigHelper helper(use_null_audio_processing); auto recv_stream = helper.CreateAudioReceiveStream(); recv_stream->Reconfigure(helper.config()); + recv_stream->UnregisterFromTransport(); } } @@ -348,6 +357,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithUpdatedConfig) { EXPECT_CALL(channel_receive, SetReceiveCodecs(new_config.decoder_map)); recv_stream->Reconfigure(new_config); + recv_stream->UnregisterFromTransport(); } } @@ -369,6 +379,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithFrameDecryptor) { new_config_1.frame_decryptor = mock_frame_decryptor_1; new_config_1.crypto_options.sframe.require_frame_encryption = true; recv_stream->Reconfigure(new_config_1); + recv_stream->UnregisterFromTransport(); } } diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index fd33dbdf24..415da3445b 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -48,6 +48,7 @@ #include "rtc_base/numerics/safe_minmax.h" #include "rtc_base/race_checker.h" #include "rtc_base/synchronization/mutex.h" +#include "rtc_base/system/no_unique_address.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/metrics.h" @@ -200,7 +201,8 @@ class ChannelReceive : public ChannelReceiveInterface { // we know about. The goal is to eventually split up voe::ChannelReceive into // parts with single-threaded semantics, and thereby reduce the need for // locks. - SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_; // Methods accessed from audio and video threads are checked for sequential- // only access. We don't necessarily own and control these threads, so thread @@ -267,7 +269,7 @@ class ChannelReceive : public ChannelReceiveInterface { float _outputGain RTC_GUARDED_BY(volume_settings_mutex_); const ChannelSendInterface* associated_send_channel_ - RTC_GUARDED_BY(worker_thread_checker_); + RTC_GUARDED_BY(network_thread_checker_); PacketRouter* packet_router_ = nullptr; @@ -525,6 +527,8 @@ ChannelReceive::ChannelReceive( RTC_DCHECK(module_process_thread_); RTC_DCHECK(audio_device_module); + network_thread_checker_.Detach(); + acm_receiver_.ResetInitialDelay(); acm_receiver_.SetMinimumDelay(0); acm_receiver_.SetMaximumDelay(0); @@ -857,8 +861,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers, void ChannelReceive::SetAssociatedSendChannel( const ChannelSendInterface* channel) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); associated_send_channel_ = channel; } @@ -1039,7 +1042,7 @@ int ChannelReceive::GetRtpTimestampRateHz() const { } int64_t ChannelReceive::GetRTT() const { - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); std::vector report_blocks = rtp_rtcp_->GetLatestReportBlockData(); diff --git a/call/call.cc b/call/call.cc index de30d6540f..5835412f33 100644 --- a/call/call.cc +++ b/call/call.cc @@ -923,15 +923,16 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( event_log_->Log(std::make_unique( CreateRtcLogStreamConfig(config))); - // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| - // and |audio_receiver_controller_| out of AudioReceiveStream construction and - // set it up asynchronously on the network thread (the registration and - // |audio_receiver_controller_| need to live on the network thread). AudioReceiveStream* receive_stream = new AudioReceiveStream( - clock_, &audio_receiver_controller_, transport_send_->packet_router(), + clock_, transport_send_->packet_router(), module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); + // TODO(bugs.webrtc.org/11993): Make the registration on the network thread + // (asynchronously). The registration and `audio_receiver_controller_` need + // to live on the network thread. + receive_stream->RegisterWithTransport(&audio_receiver_controller_); + // TODO(bugs.webrtc.org/11993): Update the below on the network thread. // We could possibly set up the audio_receiver_controller_ association up // as part of the async setup. @@ -963,7 +964,9 @@ void Call::DestroyAudioReceiveStream( ->RemoveStream(ssrc); // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync - // and UpdateAggregateNetworkState on the network thread. + // and UpdateAggregateNetworkState on the network thread. The call to + // `UnregisterFromTransport` should also happen on the network thread. + audio_receive_stream->UnregisterFromTransport(); audio_receive_streams_.erase(audio_receive_stream); const std::string& sync_group = audio_receive_stream->config().sync_group;