Remove a couple of locks from ChannelReceive and add thread checks.

* Removes playing_lock_, sync_info_lock_ and video_sync_lock_.
* Also remove video_capture_thread_race_checker_ which was redundant.

Only video_sync_lock_ was actually needed. The other two aren't needed
anymore because of changes made to RtpStreamsSynchronizer class last
year (see webrtc:11489).

In the one case where we had a lock, we post a task to the thread
where the state is maintained. This task is for capturing histograms
which I'm not sure we should have been capturing on the audio thread
anyway.

Also making ChannelReceiveFrameTransformerDelegate compatible with more
tests by using TaskQueueBase instead of rtc::Thread. A number of tests
that instantiate ChannelReceive (and thereby CRFTD) set the worker
thread as a TQ and not actually an rtc::Thread instance. In those cases
CRFTD would previously have gotten a nullptr for the worker thread.

Bug: webrtc:11993
Change-Id: I59f4b2afbfedb06f241d9a613f8538adc19cd6d8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221364
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Henrik Lundin <henrik.lundin@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34257}
This commit is contained in:
Tommi 2021-06-09 16:11:11 +02:00 committed by WebRTC LUCI CQ
parent b56a63e470
commit e2e046452a
4 changed files with 58 additions and 74 deletions

View File

@ -95,6 +95,7 @@ rtc_library("audio") {
"../rtc_base/experiments:field_trial_parser",
"../rtc_base/synchronization:mutex",
"../rtc_base/system:no_unique_address",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
"../system_wrappers",
"../system_wrappers:field_trial",

View File

@ -21,6 +21,7 @@
#include "api/frame_transformer_interface.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "audio/audio_level.h"
#include "audio/channel_receive_frame_transformer_delegate.h"
#include "audio/channel_send.h"
@ -47,6 +48,8 @@
#include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/metrics.h"
@ -177,23 +180,22 @@ class ChannelReceive : public ChannelReceiveInterface {
private:
void ReceivePacket(const uint8_t* packet,
size_t packet_length,
const RTPHeader& header);
const RTPHeader& header)
RTC_RUN_ON(worker_thread_checker_);
int ResendPackets(const uint16_t* sequence_numbers, int length);
void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms);
void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms)
RTC_RUN_ON(worker_thread_checker_);
int GetRtpTimestampRateHz() const;
int64_t GetRTT() const;
void OnReceivedPayloadData(rtc::ArrayView<const uint8_t> payload,
const RTPHeader& rtpHeader);
const RTPHeader& rtpHeader)
RTC_RUN_ON(worker_thread_checker_);
void InitFrameTransformerDelegate(
rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer);
bool Playing() const {
MutexLock lock(&playing_lock_);
return playing_;
}
rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer)
RTC_RUN_ON(worker_thread_checker_);
// Thread checkers document and lock usage of some methods to specific threads
// we know about. The goal is to eventually split up voe::ChannelReceive into
@ -202,17 +204,18 @@ class ChannelReceive : public ChannelReceiveInterface {
RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_;
TaskQueueBase* const worker_thread_;
ScopedTaskSafety worker_safety_;
// Methods accessed from audio and video threads are checked for sequential-
// only access. We don't necessarily own and control these threads, so thread
// checkers cannot be used. E.g. Chromium may transfer "ownership" from one
// audio thread to another, but access is still sequential.
rtc::RaceChecker audio_thread_race_checker_;
rtc::RaceChecker video_capture_thread_race_checker_;
Mutex callback_mutex_;
Mutex volume_settings_mutex_;
mutable Mutex playing_lock_;
bool playing_ RTC_GUARDED_BY(&playing_lock_) = false;
bool playing_ RTC_GUARDED_BY(worker_thread_checker_) = false;
RtcEventLog* const event_log_;
@ -226,11 +229,10 @@ class ChannelReceive : public ChannelReceiveInterface {
// Info for GetSyncInfo is updated on network or worker thread, and queried on
// the worker thread.
mutable Mutex sync_info_lock_;
absl::optional<uint32_t> last_received_rtp_timestamp_
RTC_GUARDED_BY(&sync_info_lock_);
RTC_GUARDED_BY(&worker_thread_checker_);
absl::optional<int64_t> last_received_rtp_system_time_ms_
RTC_GUARDED_BY(&sync_info_lock_);
RTC_GUARDED_BY(&worker_thread_checker_);
// The AcmReceiver is thread safe, using its own lock.
acm2::AcmReceiver acm_receiver_;
@ -243,15 +245,14 @@ class ChannelReceive : public ChannelReceiveInterface {
// Timestamp of the audio pulled from NetEq.
absl::optional<uint32_t> jitter_buffer_playout_timestamp_;
mutable Mutex video_sync_lock_;
uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(video_sync_lock_);
uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(worker_thread_checker_);
absl::optional<int64_t> playout_timestamp_rtp_time_ms_
RTC_GUARDED_BY(video_sync_lock_);
uint32_t playout_delay_ms_ RTC_GUARDED_BY(video_sync_lock_);
RTC_GUARDED_BY(worker_thread_checker_);
uint32_t playout_delay_ms_ RTC_GUARDED_BY(worker_thread_checker_);
absl::optional<int64_t> playout_timestamp_ntp_
RTC_GUARDED_BY(video_sync_lock_);
RTC_GUARDED_BY(worker_thread_checker_);
absl::optional<int64_t> playout_timestamp_ntp_time_ms_
RTC_GUARDED_BY(video_sync_lock_);
RTC_GUARDED_BY(worker_thread_checker_);
mutable Mutex ts_stats_lock_;
@ -288,7 +289,7 @@ class ChannelReceive : public ChannelReceiveInterface {
void ChannelReceive::OnReceivedPayloadData(
rtc::ArrayView<const uint8_t> payload,
const RTPHeader& rtpHeader) {
if (!Playing()) {
if (!playing_) {
// Avoid inserting into NetEQ when we are not playing. Count the
// packet as discarded.
@ -331,18 +332,21 @@ void ChannelReceive::InitFrameTransformerDelegate(
rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer) {
RTC_DCHECK(frame_transformer);
RTC_DCHECK(!frame_transformer_delegate_);
RTC_DCHECK(worker_thread_);
RTC_DCHECK(worker_thread_->IsCurrent());
// Pass a callback to ChannelReceive::OnReceivedPayloadData, to be called by
// the delegate to receive transformed audio.
ChannelReceiveFrameTransformerDelegate::ReceiveFrameCallback
receive_audio_callback = [this](rtc::ArrayView<const uint8_t> packet,
const RTPHeader& header) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
OnReceivedPayloadData(packet, header);
};
frame_transformer_delegate_ =
rtc::make_ref_counted<ChannelReceiveFrameTransformerDelegate>(
std::move(receive_audio_callback), std::move(frame_transformer),
rtc::Thread::Current());
worker_thread_);
frame_transformer_delegate_->Init();
}
@ -453,18 +457,19 @@ AudioMixer::Source::AudioFrameInfo ChannelReceive::GetAudioFrameWithInfo(
}
audio_frame->packet_infos_ = RtpPacketInfos(packet_infos);
{
RTC_DCHECK(worker_thread_);
worker_thread_->PostTask(ToQueuedTask(worker_safety_, [this]() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.TargetJitterBufferDelayMs",
acm_receiver_.TargetDelayMs());
const int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs();
MutexLock lock(&video_sync_lock_);
RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDelayEstimateMs",
jitter_buffer_delay + playout_delay_ms_);
RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverJitterBufferDelayMs",
jitter_buffer_delay);
RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDeviceDelayMs",
playout_delay_ms_);
}
}));
return muted ? AudioMixer::Source::AudioFrameInfo::kMuted
: AudioMixer::Source::AudioFrameInfo::kNormal;
@ -499,7 +504,8 @@ ChannelReceive::ChannelReceive(
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor,
const webrtc::CryptoOptions& crypto_options,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer)
: event_log_(rtc_event_log),
: worker_thread_(TaskQueueBase::Current()),
event_log_(rtc_event_log),
rtp_receive_statistics_(ReceiveStatistics::Create(clock)),
remote_ssrc_(remote_ssrc),
acm_receiver_(AcmConfig(neteq_factory,
@ -522,6 +528,7 @@ ChannelReceive::ChannelReceive(
frame_decryptor_(frame_decryptor),
crypto_options_(crypto_options),
absolute_capture_time_interpolator_(clock) {
RTC_DCHECK(worker_thread_);
RTC_DCHECK(module_process_thread_);
RTC_DCHECK(audio_device_module);
@ -582,13 +589,11 @@ void ChannelReceive::SetSink(AudioSinkInterface* sink) {
void ChannelReceive::StartPlayout() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
MutexLock lock(&playing_lock_);
playing_ = true;
}
void ChannelReceive::StopPlayout() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
MutexLock lock(&playing_lock_);
playing_ = false;
_outputAudioLevel.ResetLevelFullRange();
}
@ -616,11 +621,8 @@ void ChannelReceive::OnRtpPacket(const RtpPacketReceived& packet) {
// UpdatePlayoutTimestamp and
int64_t now_ms = rtc::TimeMillis();
{
MutexLock lock(&sync_info_lock_);
last_received_rtp_timestamp_ = packet.Timestamp();
last_received_rtp_system_time_ms_ = now_ms;
}
// Store playout timestamp for the received RTP packet
UpdatePlayoutTimestamp(false, now_ms);
@ -893,14 +895,8 @@ AudioDecodingCallStats ChannelReceive::GetDecodingCallStatistics() const {
uint32_t ChannelReceive::GetDelayEstimate() const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
uint32_t playout_delay;
{
MutexLock lock(&video_sync_lock_);
playout_delay = playout_delay_ms_;
}
// Return the current jitter buffer delay + playout delay.
return acm_receiver_.FilteredCurrentDelayMs() + playout_delay;
return acm_receiver_.FilteredCurrentDelayMs() + playout_delay_ms_;
}
bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) {
@ -922,21 +918,17 @@ bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) {
bool ChannelReceive::GetPlayoutRtpTimestamp(uint32_t* rtp_timestamp,
int64_t* time_ms) const {
RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_);
{
MutexLock lock(&video_sync_lock_);
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!playout_timestamp_rtp_time_ms_)
return false;
*rtp_timestamp = playout_timestamp_rtp_;
*time_ms = playout_timestamp_rtp_time_ms_.value();
return true;
}
}
void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms,
int64_t time_ms) {
RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_);
MutexLock lock(&video_sync_lock_);
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
playout_timestamp_ntp_ = ntp_timestamp_ms;
playout_timestamp_ntp_time_ms_ = time_ms;
}
@ -944,7 +936,6 @@ void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms,
absl::optional<int64_t>
ChannelReceive::GetCurrentEstimatedPlayoutNtpTimestampMs(int64_t now_ms) const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
MutexLock lock(&video_sync_lock_);
if (!playout_timestamp_ntp_ || !playout_timestamp_ntp_time_ms_)
return absl::nullopt;
@ -974,24 +965,19 @@ absl::optional<Syncable::Info> ChannelReceive::GetSyncInfo() const {
return absl::nullopt;
}
{
MutexLock lock(&sync_info_lock_);
if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) {
return absl::nullopt;
}
info.latest_received_capture_timestamp = *last_received_rtp_timestamp_;
info.latest_receive_time_ms = *last_received_rtp_system_time_ms_;
}
int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs();
{
MutexLock lock(&video_sync_lock_);
info.current_delay_ms = jitter_buffer_delay + playout_delay_ms_;
}
return info;
}
// RTC_RUN_ON(worker_thread_checker_)
void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) {
// TODO(bugs.webrtc.org/11993): Expect to be called exclusively on the
// network thread. Once that's done, we won't need video_sync_lock_.
@ -1018,14 +1004,11 @@ void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) {
// Remove the playout delay.
playout_timestamp -= (delay_ms * (GetRtpTimestampRateHz() / 1000));
{
MutexLock lock(&video_sync_lock_);
if (!rtcp && playout_timestamp != playout_timestamp_rtp_) {
playout_timestamp_rtp_ = playout_timestamp;
playout_timestamp_rtp_time_ms_ = now_ms;
}
playout_delay_ms_ = delay_ms;
}
}
int ChannelReceive::GetRtpTimestampRateHz() const {

View File

@ -47,7 +47,7 @@ class TransformableAudioFrame : public TransformableAudioFrameInterface {
ChannelReceiveFrameTransformerDelegate::ChannelReceiveFrameTransformerDelegate(
ReceiveFrameCallback receive_frame_callback,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer,
rtc::Thread* channel_receive_thread)
TaskQueueBase* channel_receive_thread)
: receive_frame_callback_(receive_frame_callback),
frame_transformer_(std::move(frame_transformer)),
channel_receive_thread_(channel_receive_thread) {}

View File

@ -32,7 +32,7 @@ class ChannelReceiveFrameTransformerDelegate : public TransformedFrameCallback {
ChannelReceiveFrameTransformerDelegate(
ReceiveFrameCallback receive_frame_callback,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer,
rtc::Thread* channel_receive_thread);
TaskQueueBase* channel_receive_thread);
// Registers |this| as callback for |frame_transformer_|, to get the
// transformed frames.
@ -67,7 +67,7 @@ class ChannelReceiveFrameTransformerDelegate : public TransformedFrameCallback {
RTC_GUARDED_BY(sequence_checker_);
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer_
RTC_GUARDED_BY(sequence_checker_);
rtc::Thread* channel_receive_thread_;
TaskQueueBase* const channel_receive_thread_;
};
} // namespace webrtc