Move thread handling from source tracker.

This makes it simpler to use in more contexts.

Bug: b/364184684
Change-Id: I1b08ebd24e51ba1b3f85261eed503a78cd006fd8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/361480
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Jakob Ivarsson‎ <jakobi@webrtc.org>
Reviewed-by: Åsa Persson <asapersson@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42956}
This commit is contained in:
Jakob Ivarsson 2024-09-04 11:24:16 +00:00 committed by WebRTC LUCI CQ
parent 2fbaa8e3a3
commit 010c189f76
10 changed files with 58 additions and 87 deletions

View File

@ -102,7 +102,6 @@ AudioReceiveStreamImpl::AudioReceiveStreamImpl(
std::unique_ptr<voe::ChannelReceiveInterface> channel_receive)
: config_(config),
audio_state_(audio_state),
source_tracker_(&env.clock()),
channel_receive_(std::move(channel_receive)) {
RTC_LOG(LS_INFO) << "AudioReceiveStreamImpl: " << config.rtp.remote_ssrc;
RTC_DCHECK(config.decoder_factory);
@ -114,11 +113,6 @@ AudioReceiveStreamImpl::AudioReceiveStreamImpl(
// Configure bandwidth estimation.
channel_receive_->RegisterReceiverCongestionControlObjects(packet_router);
// When output is muted, ChannelReceive will directly notify the source
// tracker of "delivered" frames, so RtpReceiver information will continue to
// be updated.
channel_receive_->SetSourceTracker(&source_tracker_);
// Complete configuration.
// TODO(solenberg): Config NACK history window (which is a packet count),
// using the actual packet size for the configured codec.
@ -378,19 +372,13 @@ int AudioReceiveStreamImpl::GetBaseMinimumPlayoutDelayMs() const {
std::vector<RtpSource> AudioReceiveStreamImpl::GetSources() const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
return source_tracker_.GetSources();
return channel_receive_->GetSources();
}
AudioMixer::Source::AudioFrameInfo
AudioReceiveStreamImpl::GetAudioFrameWithInfo(int sample_rate_hz,
AudioFrame* audio_frame) {
AudioMixer::Source::AudioFrameInfo audio_frame_info =
channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame);
if (audio_frame_info != AudioMixer::Source::AudioFrameInfo::kError &&
!audio_frame->packet_infos_.empty()) {
source_tracker_.OnFrameDelivered(audio_frame->packet_infos_);
}
return audio_frame_info;
return channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame);
}
int AudioReceiveStreamImpl::Ssrc() const {

View File

@ -25,7 +25,6 @@
#include "audio/audio_state.h"
#include "call/audio_receive_stream.h"
#include "call/syncable.h"
#include "modules/rtp_rtcp/source/source_tracker.h"
#include "rtc_base/system/no_unique_address.h"
namespace webrtc {
@ -156,7 +155,6 @@ class AudioReceiveStreamImpl final : public webrtc::AudioReceiveStreamInterface,
SequenceChecker::kDetached};
webrtc::AudioReceiveStreamInterface::Config config_;
rtc::scoped_refptr<webrtc::AudioState> audio_state_;
SourceTracker source_tracker_;
const std::unique_ptr<voe::ChannelReceiveInterface> channel_receive_;
AudioSendStream* associated_send_stream_
RTC_GUARDED_BY(packet_sequence_checker_) = nullptr;

View File

@ -132,7 +132,6 @@ struct ConfigHelper {
.WillRepeatedly(Invoke([](const std::map<int, SdpAudioFormat>& codecs) {
EXPECT_THAT(codecs, ::testing::IsEmpty());
}));
EXPECT_CALL(*channel_receive_, SetSourceTracker(_));
EXPECT_CALL(*channel_receive_, GetLocalSsrc())
.WillRepeatedly(Return(kLocalSsrc));

View File

@ -171,7 +171,7 @@ class ChannelReceive : public ChannelReceiveInterface,
int PreferredSampleRate() const override;
void SetSourceTracker(SourceTracker* source_tracker) override;
std::vector<RtpSource> GetSources() const override;
// Associate to a send channel.
// Used for obtaining RTT for a receive-only channel.
@ -240,7 +240,7 @@ class ChannelReceive : public ChannelReceiveInterface,
std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
const uint32_t remote_ssrc_;
SourceTracker* source_tracker_ = nullptr;
SourceTracker source_tracker_ RTC_GUARDED_BY(&worker_thread_checker_);
// Info for GetSyncInfo is updated on network or worker thread, and queried on
// the worker thread.
@ -325,20 +325,18 @@ void ChannelReceive::OnReceivedPayloadData(
// Avoid inserting into NetEQ when we are not playing. Count the
// packet as discarded.
// If we have a source_tracker_, tell it that the frame has been
// "delivered". Normally, this happens in AudioReceiveStreamInterface when
// audio frames are pulled out, but when playout is muted, nothing is
// pulling frames. The downside of this approach is that frames delivered
// this way won't be delayed for playout, and therefore will be
// unsynchronized with (a) audio delay when playing and (b) any audio/video
// synchronization. But the alternative is that muting playout also stops
// the SourceTracker from updating RtpSource information.
if (source_tracker_) {
RtpPacketInfos::vector_type packet_vector = {
RtpPacketInfo(rtpHeader, receive_time)};
source_tracker_->OnFrameDelivered(RtpPacketInfos(packet_vector));
}
// Tell source_tracker_ that the frame has been "delivered". Normally, this
// happens in AudioReceiveStreamInterface when audio frames are pulled out,
// but when playout is muted, nothing is pulling frames. The downside of
// this approach is that frames delivered this way won't be delayed for
// playout, and therefore will be unsynchronized with (a) audio delay when
// playing and (b) any audio/video synchronization. But the alternative is
// that muting playout also stops the SourceTracker from updating RtpSource
// information.
RtpPacketInfos::vector_type packet_vector = {
RtpPacketInfo(rtpHeader, receive_time)};
source_tracker_.OnFrameDelivered(RtpPacketInfos(packet_vector),
env_.clock().CurrentTime());
return;
}
@ -482,7 +480,16 @@ AudioMixer::Source::AudioFrameInfo ChannelReceive::GetAudioFrameWithInfo(
}
packet_infos.push_back(std::move(new_packet_info));
}
audio_frame->packet_infos_ = RtpPacketInfos(packet_infos);
audio_frame->packet_infos_ = RtpPacketInfos(std::move(packet_infos));
if (!audio_frame->packet_infos_.empty()) {
RtpPacketInfos infos_copy = audio_frame->packet_infos_;
Timestamp delivery_time = env_.clock().CurrentTime();
worker_thread_->PostTask(
SafeTask(worker_safety_.flag(), [this, infos_copy, delivery_time]() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
source_tracker_.OnFrameDelivered(infos_copy, delivery_time);
}));
}
++audio_frame_interval_count_;
if (audio_frame_interval_count_ >= kHistogramReportingInterval) {
@ -514,10 +521,6 @@ int ChannelReceive::PreferredSampleRate() const {
acm_receiver_.last_output_sample_rate_hz());
}
void ChannelReceive::SetSourceTracker(SourceTracker* source_tracker) {
source_tracker_ = source_tracker;
}
ChannelReceive::ChannelReceive(
const Environment& env,
NetEqFactory* neteq_factory,
@ -538,6 +541,7 @@ ChannelReceive::ChannelReceive(
worker_thread_(TaskQueueBase::Current()),
rtp_receive_statistics_(ReceiveStatistics::Create(&env_.clock())),
remote_ssrc_(remote_ssrc),
source_tracker_(&env_.clock()),
acm_receiver_(env_,
AcmConfig(neteq_factory,
decoder_factory,
@ -1102,6 +1106,11 @@ int ChannelReceive::GetRtpTimestampRateHz() const {
: acm_receiver_.last_output_sample_rate_hz();
}
std::vector<RtpSource> ChannelReceive::GetSources() const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
return source_tracker_.GetSources();
}
} // namespace
std::unique_ptr<ChannelReceiveInterface> CreateChannelReceive(

View File

@ -148,9 +148,7 @@ class ChannelReceiveInterface : public RtpPacketSinkInterface {
virtual int PreferredSampleRate() const = 0;
// Sets the source tracker to notify about "delivered" packets when output is
// muted.
virtual void SetSourceTracker(SourceTracker* source_tracker) = 0;
virtual std::vector<RtpSource> GetSources() const = 0;
// Associate to a send channel.
// Used for obtaining RTT for a receive-only channel.

View File

@ -62,7 +62,7 @@ class MockChannelReceive : public voe::ChannelReceiveInterface {
(int sample_rate_hz, AudioFrame*),
(override));
MOCK_METHOD(int, PreferredSampleRate, (), (const, override));
MOCK_METHOD(void, SetSourceTracker, (SourceTracker*), (override));
MOCK_METHOD(std::vector<RtpSource>, GetSources, (), (const, override));
MOCK_METHOD(void,
SetAssociatedSendChannel,
(const voe::ChannelSendInterface*),

View File

@ -17,42 +17,26 @@
namespace webrtc {
SourceTracker::SourceTracker(Clock* clock)
: worker_thread_(TaskQueueBase::Current()), clock_(clock) {
RTC_DCHECK(worker_thread_);
SourceTracker::SourceTracker(Clock* clock) : clock_(clock) {
RTC_DCHECK(clock_);
}
void SourceTracker::OnFrameDelivered(RtpPacketInfos packet_infos) {
void SourceTracker::OnFrameDelivered(const RtpPacketInfos& packet_infos,
Timestamp delivery_time) {
TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
if (packet_infos.empty()) {
return;
}
Timestamp now = clock_->CurrentTime();
if (worker_thread_->IsCurrent()) {
RTC_DCHECK_RUN_ON(worker_thread_);
OnFrameDeliveredInternal(now, packet_infos);
} else {
worker_thread_->PostTask(
SafeTask(worker_safety_.flag(),
[this, packet_infos = std::move(packet_infos), now]() {
RTC_DCHECK_RUN_ON(worker_thread_);
OnFrameDeliveredInternal(now, packet_infos);
}));
if (delivery_time.IsInfinite()) {
delivery_time = clock_->CurrentTime();
}
}
void SourceTracker::OnFrameDeliveredInternal(
Timestamp now,
const RtpPacketInfos& packet_infos) {
TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
for (const RtpPacketInfo& packet_info : packet_infos) {
for (uint32_t csrc : packet_info.csrcs()) {
SourceKey key(RtpSourceType::CSRC, csrc);
SourceEntry& entry = UpdateEntry(key);
entry.timestamp = now;
entry.timestamp = delivery_time;
entry.audio_level = packet_info.audio_level();
entry.absolute_capture_time = packet_info.absolute_capture_time();
entry.local_capture_clock_offset =
@ -63,19 +47,17 @@ void SourceTracker::OnFrameDeliveredInternal(
SourceKey key(RtpSourceType::SSRC, packet_info.ssrc());
SourceEntry& entry = UpdateEntry(key);
entry.timestamp = now;
entry.timestamp = delivery_time;
entry.audio_level = packet_info.audio_level();
entry.absolute_capture_time = packet_info.absolute_capture_time();
entry.local_capture_clock_offset = packet_info.local_capture_clock_offset();
entry.rtp_timestamp = packet_info.rtp_timestamp();
}
PruneEntries(now);
PruneEntries(delivery_time);
}
std::vector<RtpSource> SourceTracker::GetSources() const {
RTC_DCHECK_RUN_ON(worker_thread_);
PruneEntries(clock_->CurrentTime());
std::vector<RtpSource> sources;

View File

@ -19,8 +19,6 @@
#include <vector>
#include "api/rtp_packet_infos.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/transport/rtp/rtp_source.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@ -34,6 +32,7 @@ namespace webrtc {
// - https://w3c.github.io/webrtc-pc/#dom-rtcrtpcontributingsource
// - https://w3c.github.io/webrtc-pc/#dom-rtcrtpsynchronizationsource
//
// This class is thread unsafe.
class SourceTracker {
public:
// Amount of time before the entry associated with an update is removed. See:
@ -49,7 +48,8 @@ class SourceTracker {
// Updates the source entries when a frame is delivered to the
// RTCRtpReceiver's MediaStreamTrack.
void OnFrameDelivered(RtpPacketInfos packet_infos);
void OnFrameDelivered(const RtpPacketInfos& packet_infos,
Timestamp delivery_time = Timestamp::MinusInfinity());
// Returns an `RtpSource` for each unique SSRC and CSRC identifier updated in
// the last `kTimeoutMs` milliseconds. Entries appear in reverse chronological
@ -116,27 +116,21 @@ class SourceTracker {
SourceKeyHasher,
SourceKeyComparator>;
void OnFrameDeliveredInternal(Timestamp now,
const RtpPacketInfos& packet_infos)
RTC_RUN_ON(worker_thread_);
// Updates an entry by creating it (if it didn't previously exist) and moving
// it to the front of the list. Returns a reference to the entry.
SourceEntry& UpdateEntry(const SourceKey& key) RTC_RUN_ON(worker_thread_);
SourceEntry& UpdateEntry(const SourceKey& key);
// Removes entries that have timed out. Marked as "const" so that we can do
// pruning in getters.
void PruneEntries(Timestamp now) const RTC_RUN_ON(worker_thread_);
void PruneEntries(Timestamp now) const;
TaskQueueBase* const worker_thread_;
Clock* const clock_;
// Entries are stored in reverse chronological order (i.e. with the most
// recently updated entries appearing first). Mutability is needed for timeout
// pruning in const functions.
mutable SourceList list_ RTC_GUARDED_BY(worker_thread_);
mutable SourceMap map_ RTC_GUARDED_BY(worker_thread_);
ScopedTaskSafety worker_safety_;
mutable SourceList list_;
mutable SourceMap map_;
};
} // namespace webrtc

View File

@ -625,11 +625,11 @@ int VideoReceiveStream2::GetBaseMinimumPlayoutDelayMs() const {
}
void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) {
source_tracker_.OnFrameDelivered(video_frame.packet_infos());
config_.renderer->OnFrame(video_frame);
// TODO(bugs.webrtc.org/10739): we should set local capture clock offset for
// `video_frame.packet_infos`. But VideoFrame is const qualified here.
// TODO: bugs.webrtc.org/42220804 - we should set local capture clock offset
// for `packet_infos`.
RtpPacketInfos packet_infos = video_frame.packet_infos();
// For frame delay metrics, calculated in `OnRenderedFrame`, to better reflect
// user experience measurements must be done as close as possible to frame
@ -640,7 +640,7 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) {
// rendered" callback from the renderer.
VideoFrameMetaData frame_meta(video_frame, env_.clock().CurrentTime());
call_->worker_thread()->PostTask(
SafeTask(task_safety_.flag(), [frame_meta, this]() {
SafeTask(task_safety_.flag(), [frame_meta, packet_infos, this]() {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
int64_t video_playout_ntp_ms;
int64_t sync_offset_ms;
@ -652,6 +652,8 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) {
estimated_freq_khz);
}
stats_proxy_.OnRenderedFrame(frame_meta);
source_tracker_.OnFrameDelivered(packet_infos,
frame_meta.decode_timestamp);
}));
webrtc::MutexLock lock(&pending_resolution_mutex_);
@ -1047,6 +1049,7 @@ void VideoReceiveStream2::UpdatePlayoutDelays() const {
}
std::vector<webrtc::RtpSource> VideoReceiveStream2::GetSources() const {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
return source_tracker_.GetSources();
}

View File

@ -264,7 +264,7 @@ class VideoReceiveStream2
bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
bool decoder_stopped_ RTC_GUARDED_BY(decode_sequence_checker_) = true;
SourceTracker source_tracker_;
SourceTracker source_tracker_ RTC_GUARDED_BY(worker_sequence_checker_);
ReceiveStatisticsProxy stats_proxy_;
// Shared by media and rtx stream receivers, since the latter has no RtpRtcp
// module of its own.