Split AudioStream initialization into worker / network steps.

This is in preparation for actually doing this initialization
differently in the Call class. This CL takes the registration
steps that are inherently network thread associated and makes
them separate from the ctor/dtor.

Bug: webrtc:11993
Change-Id: Ice2e16c108e0c302157534a3aa2b46738aaa7a93
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220608
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34163}
This commit is contained in:
Tommi 2021-05-31 12:57:53 +02:00 committed by WebRTC LUCI CQ
parent 6ad542cf11
commit 02df2eb1de
5 changed files with 78 additions and 34 deletions

View File

@ -91,7 +91,6 @@ std::unique_ptr<voe::ChannelReceiveInterface> CreateChannelReceive(
AudioReceiveStream::AudioReceiveStream(
Clock* clock,
RtpStreamReceiverControllerInterface* receiver_controller,
PacketRouter* packet_router,
ProcessThread* module_process_thread,
NetEqFactory* neteq_factory,
@ -99,7 +98,6 @@ AudioReceiveStream::AudioReceiveStream(
const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
webrtc::RtcEventLog* event_log)
: AudioReceiveStream(clock,
receiver_controller,
packet_router,
config,
audio_state,
@ -113,7 +111,6 @@ AudioReceiveStream::AudioReceiveStream(
AudioReceiveStream::AudioReceiveStream(
Clock* clock,
RtpStreamReceiverControllerInterface* receiver_controller,
PacketRouter* packet_router,
const webrtc::AudioReceiveStream::Config& config,
const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
@ -129,7 +126,8 @@ AudioReceiveStream::AudioReceiveStream(
RTC_DCHECK(audio_state_);
RTC_DCHECK(channel_receive_);
RTC_DCHECK(receiver_controller);
network_thread_checker_.Detach();
RTC_DCHECK(packet_router);
// Configure bandwidth estimation.
channel_receive_->RegisterReceiverCongestionControlObjects(packet_router);
@ -139,10 +137,6 @@ AudioReceiveStream::AudioReceiveStream(
// be updated.
channel_receive_->SetSourceTracker(&source_tracker_);
// Register with transport.
rtp_stream_receiver_ = receiver_controller->CreateReceiver(
config.rtp.remote_ssrc, channel_receive_.get());
// Complete configuration.
// TODO(solenberg): Config NACK history window (which is a packet count),
// using the actual packet size for the configured codec.
@ -161,6 +155,19 @@ AudioReceiveStream::~AudioReceiveStream() {
channel_receive_->ResetReceiverCongestionControlObjects();
}
void AudioReceiveStream::RegisterWithTransport(
RtpStreamReceiverControllerInterface* receiver_controller) {
RTC_DCHECK_RUN_ON(&network_thread_checker_);
RTC_DCHECK(!rtp_stream_receiver_);
rtp_stream_receiver_ = receiver_controller->CreateReceiver(
config_.rtp.remote_ssrc, channel_receive_.get());
}
void AudioReceiveStream::UnregisterFromTransport() {
RTC_DCHECK_RUN_ON(&network_thread_checker_);
rtp_stream_receiver_.reset();
}
void AudioReceiveStream::Reconfigure(
const webrtc::AudioReceiveStream::Config& config) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
@ -388,8 +395,7 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) {
}
void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
channel_receive_->SetAssociatedSendChannel(
send_stream ? send_stream->GetChannel() : nullptr);
associated_send_stream_ = send_stream;
@ -410,9 +416,7 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const {
const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting()
const {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or
// remove test method and |associated_send_stream_| variable.
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
return associated_send_stream_;
}

View File

@ -22,6 +22,7 @@
#include "call/audio_receive_stream.h"
#include "call/syncable.h"
#include "modules/rtp_rtcp/source/source_tracker.h"
#include "rtc_base/system/no_unique_address.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
@ -44,7 +45,6 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream,
public Syncable {
public:
AudioReceiveStream(Clock* clock,
RtpStreamReceiverControllerInterface* receiver_controller,
PacketRouter* packet_router,
ProcessThread* module_process_thread,
NetEqFactory* neteq_factory,
@ -54,7 +54,6 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream,
// For unit tests, which need to supply a mock channel receive.
AudioReceiveStream(
Clock* clock,
RtpStreamReceiverControllerInterface* receiver_controller,
PacketRouter* packet_router,
const webrtc::AudioReceiveStream::Config& config,
const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
@ -65,8 +64,22 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream,
AudioReceiveStream(const AudioReceiveStream&) = delete;
AudioReceiveStream& operator=(const AudioReceiveStream&) = delete;
// Destruction happens on the worker thread. Prior to destruction the caller
// must ensure that a registration with the transport has been cleared. See
// `RegisterWithTransport` for details.
// TODO(tommi): As a further improvement to this, performing the full
// destruction on the network thread could be made the default.
~AudioReceiveStream() override;
// Called on the network thread to register/unregister with the network
// transport.
void RegisterWithTransport(
RtpStreamReceiverControllerInterface* receiver_controller);
// If registration has previously been done (via `RegisterWithTransport`) then
// `UnregisterFromTransport` must be called prior to destruction, on the
// network thread.
void UnregisterFromTransport();
// webrtc::AudioReceiveStream implementation.
void Reconfigure(const webrtc::AudioReceiveStream::Config& config) override;
void Start() override;
@ -104,16 +117,26 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream,
private:
AudioState* audio_state() const;
SequenceChecker worker_thread_checker_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_;
// TODO(bugs.webrtc.org/11993): This checker conceptually represents
// operations that belong to the network thread. The Call class is currently
// moving towards handling network packets on the network thread and while
// that work is ongoing, this checker may in practice represent the worker
// thread, but still serves as a mechanism of grouping together concepts
// that belong to the network thread. Once the packets are fully delivered
// on the network thread, this comment will be deleted.
RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_;
webrtc::AudioReceiveStream::Config config_;
rtc::scoped_refptr<webrtc::AudioState> audio_state_;
SourceTracker source_tracker_;
const std::unique_ptr<voe::ChannelReceiveInterface> channel_receive_;
AudioSendStream* associated_send_stream_ = nullptr;
AudioSendStream* associated_send_stream_
RTC_GUARDED_BY(network_thread_checker_) = nullptr;
bool playing_ RTC_GUARDED_BY(worker_thread_checker_) = false;
std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_;
std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_
RTC_GUARDED_BY(network_thread_checker_);
};
} // namespace internal
} // namespace webrtc

View File

@ -121,11 +121,12 @@ struct ConfigHelper {
}
std::unique_ptr<internal::AudioReceiveStream> CreateAudioReceiveStream() {
return std::unique_ptr<internal::AudioReceiveStream>(
new internal::AudioReceiveStream(
Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_,
&packet_router_, stream_config_, audio_state_, &event_log_,
std::unique_ptr<voe::ChannelReceiveInterface>(channel_receive_)));
auto ret = std::make_unique<internal::AudioReceiveStream>(
Clock::GetRealTimeClock(), &packet_router_, stream_config_,
audio_state_, &event_log_,
std::unique_ptr<voe::ChannelReceiveInterface>(channel_receive_));
ret->RegisterWithTransport(&rtp_stream_receiver_controller_);
return ret;
}
AudioReceiveStream::Config& config() { return stream_config_; }
@ -199,6 +200,7 @@ TEST(AudioReceiveStreamTest, ConstructDestruct) {
for (bool use_null_audio_processing : {false, true}) {
ConfigHelper helper(use_null_audio_processing);
auto recv_stream = helper.CreateAudioReceiveStream();
recv_stream->UnregisterFromTransport();
}
}
@ -212,6 +214,7 @@ TEST(AudioReceiveStreamTest, ReceiveRtcpPacket) {
ReceivedRTCPPacket(&rtcp_packet[0], rtcp_packet.size()))
.WillOnce(Return());
recv_stream->DeliverRtcp(&rtcp_packet[0], rtcp_packet.size());
recv_stream->UnregisterFromTransport();
}
}
@ -276,6 +279,7 @@ TEST(AudioReceiveStreamTest, GetStats) {
EXPECT_EQ(kCallStats.capture_start_ntp_time_ms_,
stats.capture_start_ntp_time_ms);
EXPECT_EQ(kPlayoutNtpTimestampMs, stats.estimated_playout_ntp_timestamp_ms);
recv_stream->UnregisterFromTransport();
}
}
@ -286,6 +290,7 @@ TEST(AudioReceiveStreamTest, SetGain) {
EXPECT_CALL(*helper.channel_receive(),
SetChannelOutputVolumeScaling(FloatEq(0.765f)));
recv_stream->SetGain(0.765f);
recv_stream->UnregisterFromTransport();
}
}
@ -317,6 +322,9 @@ TEST(AudioReceiveStreamTest, StreamsShouldBeAddedToMixerOnceOnStart) {
// Stop stream before it is being destructed.
recv_stream2->Stop();
recv_stream1->UnregisterFromTransport();
recv_stream2->UnregisterFromTransport();
}
}
@ -325,6 +333,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithSameConfig) {
ConfigHelper helper(use_null_audio_processing);
auto recv_stream = helper.CreateAudioReceiveStream();
recv_stream->Reconfigure(helper.config());
recv_stream->UnregisterFromTransport();
}
}
@ -348,6 +357,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithUpdatedConfig) {
EXPECT_CALL(channel_receive, SetReceiveCodecs(new_config.decoder_map));
recv_stream->Reconfigure(new_config);
recv_stream->UnregisterFromTransport();
}
}
@ -369,6 +379,7 @@ TEST(AudioReceiveStreamTest, ReconfigureWithFrameDecryptor) {
new_config_1.frame_decryptor = mock_frame_decryptor_1;
new_config_1.crypto_options.sframe.require_frame_encryption = true;
recv_stream->Reconfigure(new_config_1);
recv_stream->UnregisterFromTransport();
}
}

View File

@ -48,6 +48,7 @@
#include "rtc_base/numerics/safe_minmax.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/metrics.h"
@ -200,7 +201,8 @@ class ChannelReceive : public ChannelReceiveInterface {
// we know about. The goal is to eventually split up voe::ChannelReceive into
// parts with single-threaded semantics, and thereby reduce the need for
// locks.
SequenceChecker worker_thread_checker_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_;
// Methods accessed from audio and video threads are checked for sequential-
// only access. We don't necessarily own and control these threads, so thread
@ -267,7 +269,7 @@ class ChannelReceive : public ChannelReceiveInterface {
float _outputGain RTC_GUARDED_BY(volume_settings_mutex_);
const ChannelSendInterface* associated_send_channel_
RTC_GUARDED_BY(worker_thread_checker_);
RTC_GUARDED_BY(network_thread_checker_);
PacketRouter* packet_router_ = nullptr;
@ -525,6 +527,8 @@ ChannelReceive::ChannelReceive(
RTC_DCHECK(module_process_thread_);
RTC_DCHECK(audio_device_module);
network_thread_checker_.Detach();
acm_receiver_.ResetInitialDelay();
acm_receiver_.SetMinimumDelay(0);
acm_receiver_.SetMaximumDelay(0);
@ -857,8 +861,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers,
void ChannelReceive::SetAssociatedSendChannel(
const ChannelSendInterface* channel) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
associated_send_channel_ = channel;
}
@ -1039,7 +1042,7 @@ int ChannelReceive::GetRtpTimestampRateHz() const {
}
int64_t ChannelReceive::GetRTT() const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK_RUN_ON(&network_thread_checker_);
std::vector<ReportBlockData> report_blocks =
rtp_rtcp_->GetLatestReportBlockData();

View File

@ -923,15 +923,16 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
CreateRtcLogStreamConfig(config)));
// TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
// and |audio_receiver_controller_| out of AudioReceiveStream construction and
// set it up asynchronously on the network thread (the registration and
// |audio_receiver_controller_| need to live on the network thread).
AudioReceiveStream* receive_stream = new AudioReceiveStream(
clock_, &audio_receiver_controller_, transport_send_->packet_router(),
clock_, transport_send_->packet_router(),
module_process_thread_->process_thread(), config_.neteq_factory, config,
config_.audio_state, event_log_);
// TODO(bugs.webrtc.org/11993): Make the registration on the network thread
// (asynchronously). The registration and `audio_receiver_controller_` need
// to live on the network thread.
receive_stream->RegisterWithTransport(&audio_receiver_controller_);
// TODO(bugs.webrtc.org/11993): Update the below on the network thread.
// We could possibly set up the audio_receiver_controller_ association up
// as part of the async setup.
@ -963,7 +964,9 @@ void Call::DestroyAudioReceiveStream(
->RemoveStream(ssrc);
// TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync
// and UpdateAggregateNetworkState on the network thread.
// and UpdateAggregateNetworkState on the network thread. The call to
// `UnregisterFromTransport` should also happen on the network thread.
audio_receive_stream->UnregisterFromTransport();
audio_receive_streams_.erase(audio_receive_stream);
const std::string& sync_group = audio_receive_stream->config().sync_group;