Revert "Fix data race for config_ in AudioSendStream"

This reverts commit 51e5c4b0f47926e2586d809e47dc60fe4812b782.

Reason for revert: Speculatively reverting because WebRTC fails to
roll due to a DCHECK in audio_send_stream.cc in a web platform test
and this is the only CL on the blamelist that touches that file.

Original change's description:
> Fix data race for config_ in AudioSendStream
>
> config_ was written and read on different threads without sync. This CL
> moves config access on worker_thread_ with all other required fields.
> It keeps only bitrate allocator accessed from worker_queue_, because
> it is used from it in other classes and supposed to be single threaded.
>
> Bug: None
> Change-Id: I23ece4dc8b09b41a8c589412bedd36d63b76cbc5
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/203267
> Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
> Reviewed-by: Niels Moller <nisse@webrtc.org>
> Reviewed-by: Per Åhgren <peah@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Artem Titov <titovartem@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33125}

TBR=danilchap@webrtc.org,peah@webrtc.org,nisse@webrtc.org,hta@webrtc.org,titovartem@webrtc.org

# Initially not skipping CQ checks because original CL landed > 1 day
# ago. Adding NOTRY now because of ios_sim_x64_dbg_ios12 issues.
NOTRY=True

Bug: None
Change-Id: I33355198fca96faad7ac77538c7bd31425f46ebe
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205340
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33142}
This commit is contained in:
Henrik Boström 2021-02-02 15:02:25 +00:00 committed by Commit Bot
parent 0e3cb9fb20
commit 76a1041f0f
2 changed files with 86 additions and 114 deletions

View File

@ -168,14 +168,13 @@ AudioSendStream::AudioSendStream(
RTC_DCHECK(rtp_rtcp_module_);
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(config, true);
UpdateCachedTargetAudioBitrateConstraints();
pacer_thread_checker_.Detach();
}
AudioSendStream::~AudioSendStream() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc;
RTC_DCHECK(!sending_);
channel_send_->ResetSenderCongestionControlObjects();
@ -187,13 +186,13 @@ AudioSendStream::~AudioSendStream() {
}
const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(worker_thread_checker_.IsCurrent());
return config_;
}
void AudioSendStream::Reconfigure(
const webrtc::AudioSendStream::Config& new_config) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(worker_thread_checker_.IsCurrent());
ConfigureStream(new_config, false);
}
@ -352,22 +351,20 @@ void AudioSendStream::ConfigureStream(
}
channel_send_->CallEncoder([this](AudioEncoder* encoder) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!encoder) {
return;
}
frame_length_range_ = encoder->GetFrameLengthRange();
UpdateCachedTargetAudioBitrateConstraints();
worker_queue_->PostTask(
[this, length_range = encoder->GetFrameLengthRange()] {
RTC_DCHECK_RUN_ON(worker_queue_);
frame_length_range_ = length_range;
});
});
if (sending_) {
ReconfigureBitrateObserver(new_config);
}
config_ = new_config;
if (!first_time) {
UpdateCachedTargetAudioBitrateConstraints();
}
}
void AudioSendStream::Start() {
@ -382,7 +379,13 @@ void AudioSendStream::Start() {
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
rtp_rtcp_module_->SetAsPartOfAllocation(true);
ConfigureBitrateObserver();
rtc::Event thread_sync_event;
worker_queue_->PostTask([&] {
RTC_DCHECK_RUN_ON(worker_queue_);
ConfigureBitrateObserver();
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
} else {
rtp_rtcp_module_->SetAsPartOfAllocation(false);
}
@ -393,7 +396,7 @@ void AudioSendStream::Start() {
}
void AudioSendStream::Stop() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(worker_thread_checker_.IsCurrent());
if (!sending_) {
return;
}
@ -428,14 +431,14 @@ bool AudioSendStream::SendTelephoneEvent(int payload_type,
int payload_frequency,
int event,
int duration_ms) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(worker_thread_checker_.IsCurrent());
channel_send_->SetSendTelephoneEventPayloadType(payload_type,
payload_frequency);
return channel_send_->SendTelephoneEventOutband(event, duration_ms);
}
void AudioSendStream::SetMuted(bool muted) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(worker_thread_checker_.IsCurrent());
channel_send_->SetInputMute(muted);
}
@ -445,7 +448,7 @@ webrtc::AudioSendStream::Stats AudioSendStream::GetStats() const {
webrtc::AudioSendStream::Stats AudioSendStream::GetStats(
bool has_remote_tracks) const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(worker_thread_checker_.IsCurrent());
webrtc::AudioSendStream::Stats stats;
stats.local_ssrc = config_.rtp.ssrc;
stats.target_bitrate_bps = channel_send_->GetBitrate();
@ -506,14 +509,12 @@ webrtc::AudioSendStream::Stats AudioSendStream::GetStats(
void AudioSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->ReceivedRTCPPacket(packet, length);
{
worker_queue_->PostTask([&]() {
// Poll if overhead has changed, which it can do if ack triggers us to stop
// sending mid/rid.
MutexLock lock(&overhead_per_packet_lock_);
UpdateOverheadForEncoder();
}
UpdateCachedTargetAudioBitrateConstraints();
});
}
uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
@ -522,11 +523,9 @@ uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
// Pick a target bitrate between the constraints. Overrules the allocator if
// it 1) allocated a bitrate of zero to disable the stream or 2) allocated a
// higher than max to allow for e.g. extra FEC.
RTC_DCHECK(cached_constraints_.has_value());
update.target_bitrate.Clamp(cached_constraints_->min,
cached_constraints_->max);
update.stable_target_bitrate.Clamp(cached_constraints_->min,
cached_constraints_->max);
auto constraints = GetMinMaxBitrateConstraints();
update.target_bitrate.Clamp(constraints.min, constraints.max);
update.stable_target_bitrate.Clamp(constraints.min, constraints.max);
channel_send_->OnBitrateAllocation(update);
@ -537,17 +536,13 @@ uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
void AudioSendStream::SetTransportOverhead(
int transport_overhead_per_packet_bytes) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
{
MutexLock lock(&overhead_per_packet_lock_);
transport_overhead_per_packet_bytes_ = transport_overhead_per_packet_bytes;
UpdateOverheadForEncoder();
}
UpdateCachedTargetAudioBitrateConstraints();
RTC_DCHECK(worker_thread_checker_.IsCurrent());
MutexLock lock(&overhead_per_packet_lock_);
transport_overhead_per_packet_bytes_ = transport_overhead_per_packet_bytes;
UpdateOverheadForEncoder();
}
void AudioSendStream::UpdateOverheadForEncoder() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
size_t overhead_per_packet_bytes = GetPerPacketOverheadBytes();
if (overhead_per_packet_ == overhead_per_packet_bytes) {
return;
@ -557,11 +552,19 @@ void AudioSendStream::UpdateOverheadForEncoder() {
channel_send_->CallEncoder([&](AudioEncoder* encoder) {
encoder->OnReceivedOverhead(overhead_per_packet_bytes);
});
if (total_packet_overhead_bytes_ != overhead_per_packet_bytes) {
total_packet_overhead_bytes_ = overhead_per_packet_bytes;
if (registered_with_allocator_) {
ConfigureBitrateObserver();
auto update_task = [this, overhead_per_packet_bytes] {
RTC_DCHECK_RUN_ON(worker_queue_);
if (total_packet_overhead_bytes_ != overhead_per_packet_bytes) {
total_packet_overhead_bytes_ = overhead_per_packet_bytes;
if (registered_with_allocator_) {
ConfigureBitrateObserver();
}
}
};
if (worker_queue_->IsCurrent()) {
update_task();
} else {
worker_queue_->PostTask(update_task);
}
}
@ -599,6 +602,7 @@ const internal::AudioState* AudioSendStream::audio_state() const {
void AudioSendStream::StoreEncoderProperties(int sample_rate_hz,
size_t num_channels) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
encoder_sample_rate_hz_ = sample_rate_hz;
encoder_num_channels_ = num_channels;
if (sending_) {
@ -796,6 +800,7 @@ void AudioSendStream::ReconfigureCNG(const Config& new_config) {
void AudioSendStream::ReconfigureBitrateObserver(
const webrtc::AudioSendStream::Config& new_config) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
// Since the Config's default is for both of these to be -1, this test will
// allow us to configure the bitrate observer if the new config has bitrate
// limits set, but would only have us call RemoveBitrateObserver if we were
@ -814,13 +819,20 @@ void AudioSendStream::ReconfigureBitrateObserver(
rtp_transport_->AccountForAudioPacketsInPacedSender(true);
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
// We may get a callback immediately as the observer is registered, so
// make sure the bitrate limits in config_ are up-to-date.
config_.min_bitrate_bps = new_config.min_bitrate_bps;
config_.max_bitrate_bps = new_config.max_bitrate_bps;
rtc::Event thread_sync_event;
worker_queue_->PostTask([&] {
RTC_DCHECK_RUN_ON(worker_queue_);
// We may get a callback immediately as the observer is registered, so
// make
// sure the bitrate limits in config_ are up-to-date.
config_.min_bitrate_bps = new_config.min_bitrate_bps;
config_.max_bitrate_bps = new_config.max_bitrate_bps;
config_.bitrate_priority = new_config.bitrate_priority;
ConfigureBitrateObserver();
config_.bitrate_priority = new_config.bitrate_priority;
ConfigureBitrateObserver();
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
rtp_rtcp_module_->SetAsPartOfAllocation(true);
} else {
rtp_transport_->AccountForAudioPacketsInPacedSender(false);
@ -854,25 +866,22 @@ void AudioSendStream::ConfigureBitrateObserver() {
if (allocation_settings_.priority_bitrate_raw)
priority_bitrate = *allocation_settings_.priority_bitrate_raw;
worker_queue_->PostTask([this, constraints, priority_bitrate,
config_bitrate_priority = config_.bitrate_priority] {
RTC_DCHECK_RUN_ON(worker_queue_);
bitrate_allocator_->AddObserver(
this,
MediaStreamAllocationConfig{
constraints.min.bps<uint32_t>(), constraints.max.bps<uint32_t>(), 0,
priority_bitrate.bps(), true,
allocation_settings_.bitrate_priority.value_or(
config_bitrate_priority)});
});
bitrate_allocator_->AddObserver(
this,
MediaStreamAllocationConfig{
constraints.min.bps<uint32_t>(), constraints.max.bps<uint32_t>(), 0,
priority_bitrate.bps(), true,
allocation_settings_.bitrate_priority.value_or(
config_.bitrate_priority)});
registered_with_allocator_ = true;
}
void AudioSendStream::RemoveBitrateObserver() {
registered_with_allocator_ = false;
RTC_DCHECK(worker_thread_checker_.IsCurrent());
rtc::Event thread_sync_event;
worker_queue_->PostTask([this, &thread_sync_event] {
RTC_DCHECK_RUN_ON(worker_queue_);
registered_with_allocator_ = false;
bitrate_allocator_->RemoveObserver(this);
thread_sync_event.Set();
});
@ -918,24 +927,5 @@ void AudioSendStream::RegisterCngPayloadType(int payload_type,
int clockrate_hz) {
channel_send_->RegisterCngPayloadType(payload_type, clockrate_hz);
}
void AudioSendStream::UpdateCachedTargetAudioBitrateConstraints() {
if (config_.min_bitrate_bps == -1 || config_.max_bitrate_bps == -1) {
// |config_| is invalid to evaluate constraints.
return;
}
if (!frame_length_range_.has_value()) {
// |frame_length_range_| have to have value to be able to evaluate
// constraints.
return;
}
AudioSendStream::TargetAudioBitrateConstraints new_constraints =
GetMinMaxBitrateConstraints();
worker_queue_->PostTask([this, new_constraints]() {
RTC_DCHECK_RUN_ON(worker_queue_);
cached_constraints_ = new_constraints;
});
}
} // namespace internal
} // namespace webrtc

View File

@ -24,8 +24,8 @@
#include "rtc_base/experiments/struct_parameters_parser.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
class RtcEventLog;
@ -121,29 +121,22 @@ class AudioSendStream final : public webrtc::AudioSendStream,
internal::AudioState* audio_state();
const internal::AudioState* audio_state() const;
void StoreEncoderProperties(int sample_rate_hz, size_t num_channels)
RTC_RUN_ON(worker_thread_checker_);
void StoreEncoderProperties(int sample_rate_hz, size_t num_channels);
void ConfigureStream(const Config& new_config, bool first_time)
RTC_RUN_ON(worker_thread_checker_);
bool SetupSendCodec(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
bool ReconfigureSendCodec(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ReconfigureANA(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ReconfigureCNG(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ReconfigureBitrateObserver(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ConfigureStream(const Config& new_config, bool first_time);
bool SetupSendCodec(const Config& new_config);
bool ReconfigureSendCodec(const Config& new_config);
void ReconfigureANA(const Config& new_config);
void ReconfigureCNG(const Config& new_config);
void ReconfigureBitrateObserver(const Config& new_config);
void ConfigureBitrateObserver() RTC_RUN_ON(worker_thread_checker_);
void RemoveBitrateObserver() RTC_RUN_ON(worker_thread_checker_);
void ConfigureBitrateObserver() RTC_RUN_ON(worker_queue_);
void RemoveBitrateObserver();
// Returns bitrate constraints, maybe including overhead when enabled by
// field trial.
TargetAudioBitrateConstraints GetMinMaxBitrateConstraints() const
RTC_RUN_ON(worker_thread_checker_);
RTC_RUN_ON(worker_queue_);
// Sets per-packet overhead on encoded (for ANA) based on current known values
// of transport and packetization overheads.
@ -154,16 +147,11 @@ class AudioSendStream final : public webrtc::AudioSendStream,
size_t GetPerPacketOverheadBytes() const
RTC_EXCLUSIVE_LOCKS_REQUIRED(overhead_per_packet_lock_);
void RegisterCngPayloadType(int payload_type, int clockrate_hz)
RTC_RUN_ON(worker_thread_checker_);
void UpdateCachedTargetAudioBitrateConstraints()
RTC_RUN_ON(worker_thread_checker_);
void RegisterCngPayloadType(int payload_type, int clockrate_hz);
Clock* clock_;
SequenceChecker worker_thread_checker_;
SequenceChecker pacer_thread_checker_;
rtc::ThreadChecker worker_thread_checker_;
rtc::ThreadChecker pacer_thread_checker_;
rtc::RaceChecker audio_capture_race_checker_;
rtc::TaskQueue* worker_queue_;
@ -173,16 +161,15 @@ class AudioSendStream final : public webrtc::AudioSendStream,
const bool send_side_bwe_with_overhead_;
const AudioAllocationConfig allocation_settings_;
webrtc::AudioSendStream::Config config_
RTC_GUARDED_BY(worker_thread_checker_);
webrtc::AudioSendStream::Config config_;
rtc::scoped_refptr<webrtc::AudioState> audio_state_;
const std::unique_ptr<voe::ChannelSendInterface> channel_send_;
RtcEventLog* const event_log_;
const bool use_legacy_overhead_calculation_;
int encoder_sample_rate_hz_ RTC_GUARDED_BY(worker_thread_checker_) = 0;
size_t encoder_num_channels_ RTC_GUARDED_BY(worker_thread_checker_) = 0;
bool sending_ RTC_GUARDED_BY(worker_thread_checker_) = false;
int encoder_sample_rate_hz_ = 0;
size_t encoder_num_channels_ = 0;
bool sending_ = false;
mutable Mutex audio_level_lock_;
// Keeps track of audio level, total audio energy and total samples duration.
// https://w3c.github.io/webrtc-stats/#dom-rtcaudiohandlerstats-totalaudioenergy
@ -190,9 +177,6 @@ class AudioSendStream final : public webrtc::AudioSendStream,
BitrateAllocatorInterface* const bitrate_allocator_
RTC_GUARDED_BY(worker_queue_);
// Constrains cached to be accessed from |worker_queue_|.
absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
cached_constraints_ RTC_GUARDED_BY(worker_queue_) = absl::nullopt;
RtpTransportControllerSendInterface* const rtp_transport_;
RtpRtcpInterface* const rtp_rtcp_module_;
@ -221,12 +205,10 @@ class AudioSendStream final : public webrtc::AudioSendStream,
size_t transport_overhead_per_packet_bytes_
RTC_GUARDED_BY(overhead_per_packet_lock_) = 0;
bool registered_with_allocator_ RTC_GUARDED_BY(worker_thread_checker_) =
false;
size_t total_packet_overhead_bytes_ RTC_GUARDED_BY(worker_thread_checker_) =
0;
bool registered_with_allocator_ RTC_GUARDED_BY(worker_queue_) = false;
size_t total_packet_overhead_bytes_ RTC_GUARDED_BY(worker_queue_) = 0;
absl::optional<std::pair<TimeDelta, TimeDelta>> frame_length_range_
RTC_GUARDED_BY(worker_thread_checker_);
RTC_GUARDED_BY(worker_queue_);
};
} // namespace internal
} // namespace webrtc