Ensure RtpSenderEgress run on worker queue

VoipCore still use RtpSenderEgress::NonPacedPacketSender, therefore
packets sent using NonPacedPacketSender::EnqueuePackets are proxied
to the worker thead.
When NonPacedPacketSender is used, the Pacer already guarantee that packets are sent on the worker queue.

Lock is removed from RtpSenderEgress and instead calls must be made on
the worker thread.


Bug: webrtc:15209
Change-Id: Iaf03377ad8a037ecedbbe588a4c1e8e4eadacd81
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/306960
Reviewed-by: Erik Språng <sprang@webrtc.org>
Reviewed-by: Jakob Ivarsson‎ <jakobi@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40252}
This commit is contained in:
Per K 2023-05-31 15:38:26 +02:00 committed by WebRTC LUCI CQ
parent 2b5beb98dd
commit 48c44e3543
8 changed files with 154 additions and 164 deletions

View File

@ -70,6 +70,8 @@ if (rtc_include_tests) {
"../../../api/audio_codecs:builtin_audio_decoder_factory",
"../../../api/audio_codecs:builtin_audio_encoder_factory",
"../../../api/task_queue:default_task_queue_factory",
"../../../api/units:time_delta",
"../../../api/units:timestamp",
"../../../modules/audio_mixer:audio_mixer_test_utils",
"../../../modules/rtp_rtcp:rtp_rtcp",
"../../../rtc_base:logging",
@ -77,6 +79,7 @@ if (rtc_include_tests) {
"../../../test:mock_transport",
"../../../test:run_loop",
"../../../test:test_support",
"../../../test/time_controller:time_controller",
]
}
@ -88,6 +91,8 @@ if (rtc_include_tests) {
"../../../api:transport_api",
"../../../api/audio_codecs:builtin_audio_encoder_factory",
"../../../api/task_queue:default_task_queue_factory",
"../../../api/units:time_delta",
"../../../api/units:timestamp",
"../../../modules/audio_mixer:audio_mixer_test_utils",
"../../../modules/rtp_rtcp:rtp_rtcp",
"../../../modules/rtp_rtcp:rtp_rtcp_format",
@ -96,6 +101,7 @@ if (rtc_include_tests) {
"../../../test:mock_transport",
"../../../test:run_loop",
"../../../test:test_support",
"../../../test/time_controller:time_controller",
]
}
}

View File

@ -13,6 +13,8 @@
#include "api/audio_codecs/builtin_audio_encoder_factory.h"
#include "api/call/transport.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/audio_mixer/sine_wave_generator.h"
#include "modules/rtp_rtcp/source/rtp_packet_received.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
@ -22,6 +24,7 @@
#include "test/gtest.h"
#include "test/mock_transport.h"
#include "test/run_loop.h"
#include "test/time_controller/simulated_time_controller.h"
namespace webrtc {
namespace {
@ -57,18 +60,18 @@ class AudioEgressTest : public ::testing::Test {
static constexpr uint32_t kRemoteSsrc = 0xDEADBEEF;
const SdpAudioFormat kPcmuFormat = {"pcmu", 8000, 1};
AudioEgressTest()
: fake_clock_(kStartTime), wave_generator_(1000.0, kAudioLevel) {
task_queue_factory_ = CreateDefaultTaskQueueFactory();
AudioEgressTest() : wave_generator_(1000.0, kAudioLevel) {
encoder_factory_ = CreateBuiltinAudioEncoderFactory();
}
// Prepare test on audio egress by using PCMu codec with specific
// sequence number and its status to be running.
void SetUp() override {
rtp_rtcp_ = CreateRtpStack(&fake_clock_, &transport_, kRemoteSsrc);
egress_ = std::make_unique<AudioEgress>(rtp_rtcp_.get(), &fake_clock_,
task_queue_factory_.get());
rtp_rtcp_ =
CreateRtpStack(time_controller_.GetClock(), &transport_, kRemoteSsrc);
egress_ = std::make_unique<AudioEgress>(
rtp_rtcp_.get(), time_controller_.GetClock(),
time_controller_.GetTaskQueueFactory());
constexpr int kPcmuPayload = 0;
egress_->SetEncoder(kPcmuPayload, kPcmuFormat,
encoder_factory_->MakeAudioEncoder(
@ -100,14 +103,10 @@ class AudioEgressTest : public ::testing::Test {
return frame;
}
test::RunLoop run_loop_;
// SimulatedClock doesn't directly affect this testcase as the the
// AudioFrame's timestamp is driven by GetAudioFrame.
SimulatedClock fake_clock_;
GlobalSimulatedTimeController time_controller_{Timestamp::Micros(kStartTime)};
NiceMock<MockTransport> transport_;
SineWaveGenerator wave_generator_;
std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
std::unique_ptr<TaskQueueFactory> task_queue_factory_;
rtc::scoped_refptr<AudioEncoderFactory> encoder_factory_;
std::unique_ptr<AudioEgress> egress_;
};
@ -138,7 +137,7 @@ TEST_F(AudioEgressTest, ProcessAudioWithMute) {
// Two 10 ms audio frames will result in rtp packet with ptime 20.
for (size_t i = 0; i < kExpected * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
event.Wait(TimeDelta::Seconds(1));
@ -174,7 +173,7 @@ TEST_F(AudioEgressTest, ProcessAudioWithSineWave) {
// Two 10 ms audio frames will result in rtp packet with ptime 20.
for (size_t i = 0; i < kExpected * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
event.Wait(TimeDelta::Seconds(1));
@ -208,7 +207,7 @@ TEST_F(AudioEgressTest, SkipAudioEncodingAfterStopSend) {
// Two 10 ms audio frames will result in rtp packet with ptime 20.
for (size_t i = 0; i < kExpected * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
event.Wait(TimeDelta::Seconds(1));
@ -222,7 +221,7 @@ TEST_F(AudioEgressTest, SkipAudioEncodingAfterStopSend) {
// result in crahses or sanitizer errors due to remaining data.
for (size_t i = 0; i < kExpected * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
}
@ -284,7 +283,7 @@ TEST_F(AudioEgressTest, SendDTMF) {
// Two 10 ms audio frames will result in rtp packet with ptime 20.
for (size_t i = 0; i < kExpected * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
event.Wait(TimeDelta::Seconds(1));
@ -309,7 +308,7 @@ TEST_F(AudioEgressTest, TestAudioInputLevelAndEnergyDuration) {
// Two 10 ms audio frames will result in rtp packet with ptime 20.
for (size_t i = 0; i < kExpected * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
event.Wait(/*give_up_after=*/TimeDelta::Seconds(1));

View File

@ -14,6 +14,7 @@
#include "api/audio_codecs/builtin_audio_encoder_factory.h"
#include "api/call/transport.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/units/time_delta.h"
#include "audio/voip/audio_egress.h"
#include "modules/audio_mixer/sine_wave_generator.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
@ -23,6 +24,7 @@
#include "test/gtest.h"
#include "test/mock_transport.h"
#include "test/run_loop.h"
#include "test/time_controller/simulated_time_controller.h"
namespace webrtc {
namespace {
@ -37,12 +39,12 @@ class AudioIngressTest : public ::testing::Test {
public:
const SdpAudioFormat kPcmuFormat = {"pcmu", 8000, 1};
AudioIngressTest()
: fake_clock_(123456789), wave_generator_(1000.0, kAudioLevel) {
receive_statistics_ = ReceiveStatistics::Create(&fake_clock_);
AudioIngressTest() : wave_generator_(1000.0, kAudioLevel) {
receive_statistics_ =
ReceiveStatistics::Create(time_controller_.GetClock());
RtpRtcpInterface::Configuration rtp_config;
rtp_config.clock = &fake_clock_;
rtp_config.clock = time_controller_.GetClock();
rtp_config.audio = true;
rtp_config.receive_statistics = receive_statistics_.get();
rtp_config.rtcp_report_interval_ms = 5000;
@ -53,20 +55,20 @@ class AudioIngressTest : public ::testing::Test {
rtp_rtcp_->SetSendingMediaStatus(false);
rtp_rtcp_->SetRTCPStatus(RtcpMode::kCompound);
task_queue_factory_ = CreateDefaultTaskQueueFactory();
encoder_factory_ = CreateBuiltinAudioEncoderFactory();
decoder_factory_ = CreateBuiltinAudioDecoderFactory();
}
void SetUp() override {
constexpr int kPcmuPayload = 0;
ingress_ = std::make_unique<AudioIngress>(rtp_rtcp_.get(), &fake_clock_,
receive_statistics_.get(),
decoder_factory_);
ingress_ = std::make_unique<AudioIngress>(
rtp_rtcp_.get(), time_controller_.GetClock(), receive_statistics_.get(),
decoder_factory_);
ingress_->SetReceiveCodecs({{kPcmuPayload, kPcmuFormat}});
egress_ = std::make_unique<AudioEgress>(rtp_rtcp_.get(), &fake_clock_,
task_queue_factory_.get());
egress_ = std::make_unique<AudioEgress>(
rtp_rtcp_.get(), time_controller_.GetClock(),
time_controller_.GetTaskQueueFactory());
egress_->SetEncoder(kPcmuPayload, kPcmuFormat,
encoder_factory_->MakeAudioEncoder(
kPcmuPayload, kPcmuFormat, absl::nullopt));
@ -93,15 +95,13 @@ class AudioIngressTest : public ::testing::Test {
return frame;
}
test::RunLoop run_loop_;
SimulatedClock fake_clock_;
GlobalSimulatedTimeController time_controller_{Timestamp::Micros(123456789)};
SineWaveGenerator wave_generator_;
NiceMock<MockTransport> transport_;
std::unique_ptr<ReceiveStatistics> receive_statistics_;
std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
rtc::scoped_refptr<AudioEncoderFactory> encoder_factory_;
rtc::scoped_refptr<AudioDecoderFactory> decoder_factory_;
std::unique_ptr<TaskQueueFactory> task_queue_factory_;
std::unique_ptr<AudioIngress> ingress_;
std::unique_ptr<AudioEgress> egress_;
};
@ -122,7 +122,8 @@ TEST_F(AudioIngressTest, GetAudioFrameAfterRtpReceived) {
EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp));
egress_->SendAudioData(GetAudioFrame(0));
egress_->SendAudioData(GetAudioFrame(1));
event.Wait(TimeDelta::Seconds(1));
time_controller_.AdvanceTime(TimeDelta::Zero());
ASSERT_TRUE(event.Wait(TimeDelta::Seconds(1)));
AudioFrame audio_frame;
EXPECT_EQ(
@ -153,7 +154,7 @@ TEST_F(AudioIngressTest, TestSpeechOutputLevelAndEnergyDuration) {
EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp));
for (int i = 0; i < kNumRtp * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
event.Wait(/*give_up_after=*/TimeDelta::Seconds(1));
@ -182,7 +183,8 @@ TEST_F(AudioIngressTest, PreferredSampleRate) {
EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp));
egress_->SendAudioData(GetAudioFrame(0));
egress_->SendAudioData(GetAudioFrame(1));
event.Wait(TimeDelta::Seconds(1));
time_controller_.AdvanceTime(TimeDelta::Zero());
ASSERT_TRUE(event.Wait(TimeDelta::Seconds(1)));
AudioFrame audio_frame;
EXPECT_EQ(
@ -212,7 +214,7 @@ TEST_F(AudioIngressTest, GetMutedAudioFrameAfterRtpReceivedAndStopPlay) {
EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp));
for (int i = 0; i < kNumRtp * 2; i++) {
egress_->SendAudioData(GetAudioFrame(i));
fake_clock_.AdvanceTimeMilliseconds(10);
time_controller_.AdvanceTime(TimeDelta::Millis(10));
}
event.Wait(/*give_up_after=*/TimeDelta::Seconds(1));

View File

@ -64,6 +64,7 @@ RtpPacketHistory::PaddingMode GetPaddingMode(
} // namespace
ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
TaskQueueBase& worker_queue,
const RtpRtcpInterface::Configuration& config)
: packet_history(config.clock, GetPaddingMode(config.field_trials)),
sequencer(config.local_media_ssrc,
@ -71,7 +72,7 @@ ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
/*require_marker_before_media_padding=*/!config.audio,
config.clock),
packet_sender(config, &packet_history),
non_paced_sender(&packet_sender, &sequencer),
non_paced_sender(worker_queue, &packet_sender, &sequencer),
packet_generator(
config,
&packet_history,
@ -94,7 +95,8 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
RTC_DCHECK(worker_queue_);
rtcp_thread_checker_.Detach();
if (!configuration.receiver_only) {
rtp_sender_ = std::make_unique<RtpSenderContext>(configuration);
rtp_sender_ =
std::make_unique<RtpSenderContext>(*worker_queue_, configuration);
rtp_sender_->sequencing_checker.Detach();
// Make sure rtcp sender use same timestamp offset as rtp sender.
rtcp_sender_.SetTimestampOffset(
@ -259,7 +261,9 @@ RTCPSender::FeedbackState ModuleRtpRtcpImpl2::GetFeedbackState() {
state.media_bytes_sent = rtp_stats.transmitted.payload_bytes +
rtx_stats.transmitted.payload_bytes;
state.send_bitrate =
rtp_sender_->packet_sender.GetSendRates().Sum().bps<uint32_t>();
rtp_sender_->packet_sender.GetSendRates(clock_->CurrentTime())
.Sum()
.bps<uint32_t>();
}
state.receiver = &rtcp_receiver_;
@ -310,9 +314,9 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp,
int64_t capture_time_ms,
int payload_type,
bool force_sender_report) {
if (!Sending())
if (!Sending()) {
return false;
}
// TODO(bugs.webrtc.org/12873): Migrate this method and it's users to use
// optional Timestamps.
absl::optional<Timestamp> capture_time;
@ -322,11 +326,20 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp,
absl::optional<int> payload_type_optional;
if (payload_type >= 0)
payload_type_optional = payload_type;
rtcp_sender_.SetLastRtpTime(timestamp, capture_time, payload_type_optional);
// Make sure an RTCP report isn't queued behind a key frame.
if (rtcp_sender_.TimeToSendRTCPReport(force_sender_report))
rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
auto closure = [this, timestamp, capture_time, payload_type_optional,
force_sender_report] {
RTC_DCHECK_RUN_ON(worker_queue_);
rtcp_sender_.SetLastRtpTime(timestamp, capture_time, payload_type_optional);
// Make sure an RTCP report isn't queued behind a key frame.
if (rtcp_sender_.TimeToSendRTCPReport(force_sender_report))
rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
};
if (worker_queue_->IsCurrent()) {
closure();
} else {
worker_queue_->PostTask(SafeTask(task_safety_.flag(), std::move(closure)));
}
return true;
}
@ -645,9 +658,8 @@ void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) {
}
RtpSendRates ModuleRtpRtcpImpl2::GetSendRates() const {
// Typically called on the `rtp_transport_queue_` owned by an
// RtpTransportControllerSendInterface instance.
return rtp_sender_->packet_sender.GetSendRates();
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
return rtp_sender_->packet_sender.GetSendRates(clock_->CurrentTime());
}
void ModuleRtpRtcpImpl2::OnRequestSendReport() {

View File

@ -252,7 +252,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
FRIEND_TEST_ALL_PREFIXES(RtpRtcpImpl2Test, RttForReceiverOnly);
struct RtpSenderContext {
explicit RtpSenderContext(const RtpRtcpInterface::Configuration& config);
explicit RtpSenderContext(TaskQueueBase& worker_queue,
const RtpRtcpInterface::Configuration& config);
// Storage of packets, for retransmissions and padding, if applicable.
RtpPacketHistory packet_history;
SequenceChecker sequencing_checker;

View File

@ -16,6 +16,7 @@
#include <utility>
#include "absl/strings/match.h"
#include "api/units/timestamp.h"
#include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
#include "rtc_base/logging.h"
@ -30,15 +31,29 @@ constexpr TimeDelta kUpdateInterval =
} // namespace
RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
TaskQueueBase& worker_queue,
RtpSenderEgress* sender,
PacketSequencer* sequencer)
: transport_sequence_number_(0), sender_(sender), sequencer_(sequencer) {
: worker_queue_(worker_queue),
transport_sequence_number_(0),
sender_(sender),
sequencer_(sequencer) {
RTC_DCHECK(sequencer);
}
RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() = default;
RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() {
RTC_DCHECK_RUN_ON(&worker_queue_);
}
void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
if (!worker_queue_.IsCurrent()) {
worker_queue_.PostTask(SafeTask(
task_safety_.flag(), [this, packets = std::move(packets)]() mutable {
EnqueuePackets(std::move(packets));
}));
return;
}
RTC_DCHECK_RUN_ON(&worker_queue_);
for (auto& packet : packets) {
PrepareForSend(packet.get());
sender_->SendPacket(std::move(packet), PacedPacketInfo());
@ -51,6 +66,7 @@ void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
void RtpSenderEgress::NonPacedPacketSender::PrepareForSend(
RtpPacketToSend* packet) {
RTC_DCHECK_RUN_ON(&worker_queue_);
// Assign sequence numbers, but not for flexfec which is already running on
// an internally maintained sequence number series.
if (packet->Ssrc() != sender_->FlexFecSsrc()) {
@ -97,7 +113,6 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
kRtpSequenceNumberMapMaxEntries)
: nullptr) {
RTC_DCHECK(worker_queue_);
pacer_checker_.Detach();
if (bitrate_callback_) {
update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
kUpdateInterval, [this]() {
@ -114,7 +129,7 @@ RtpSenderEgress::~RtpSenderEgress() {
void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(packet);
if (packet->Ssrc() == ssrc_ &&
@ -139,30 +154,17 @@ void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
}
const Timestamp now = clock_->CurrentTime();
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
worker_queue_->PostTask(SafeTask(task_safety_.flag(),
[this, now, packet_ssrc = packet->Ssrc()]() {
BweTestLoggingPlot(now, packet_ssrc);
}));
BweTestLoggingPlot(now, packet->Ssrc());
#endif
if (need_rtp_packet_infos_ &&
packet->packet_type() == RtpPacketToSend::Type::kVideo) {
worker_queue_->PostTask(SafeTask(
task_safety_.flag(),
[this, packet_timestamp = packet->Timestamp(),
is_first_packet_of_frame = packet->is_first_packet_of_frame(),
is_last_packet_of_frame = packet->Marker(),
sequence_number = packet->SequenceNumber()]() {
RTC_DCHECK_RUN_ON(worker_queue_);
// Last packet of a frame, add it to sequence number info map.
const uint32_t timestamp = packet_timestamp - timestamp_offset_;
rtp_sequence_number_map_->InsertPacket(
sequence_number,
RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
is_last_packet_of_frame));
}));
// Last packet of a frame, add it to sequence number info map.
const uint32_t timestamp = packet->Timestamp() - timestamp_offset_;
rtp_sequence_number_map_->InsertPacket(
packet->SequenceNumber(),
RtpSequenceNumberMap::Info(
timestamp, packet->is_first_packet_of_frame(), packet->Marker()));
}
if (fec_generator_ && packet->fec_protect_packet()) {
@ -171,10 +173,7 @@ void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo);
absl::optional<std::pair<FecProtectionParams, FecProtectionParams>>
new_fec_params;
{
MutexLock lock(&lock_);
new_fec_params.swap(pending_fec_params_);
}
new_fec_params.swap(pending_fec_params_);
if (new_fec_params) {
fec_generator_->SetProtectionParameters(new_fec_params->first,
new_fec_params->second);
@ -234,7 +233,7 @@ void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
}
void RtpSenderEgress::OnBatchComplete() {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK_RUN_ON(worker_queue_);
for (auto& packet : packets_to_send_) {
CompleteSendPacket(packet, &packet == &packets_to_send_.back());
}
@ -243,16 +242,14 @@ void RtpSenderEgress::OnBatchComplete() {
void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet,
bool last_in_batch) {
RTC_DCHECK_RUN_ON(worker_queue_);
auto& [packet, pacing_info, now] = compound_packet;
const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
packet->packet_type() == RtpPacketMediaType::kVideo;
PacketOptions options;
{
MutexLock lock(&lock_);
options.included_in_allocation = force_part_of_allocation_;
}
options.included_in_allocation = force_part_of_allocation_;
// Downstream code actually uses this flag to distinguish between media and
// everything else.
@ -298,29 +295,12 @@ void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet,
RtpPacketMediaType packet_type = *packet->packet_type();
RtpPacketCounter counter(*packet);
size_t size = packet->size();
// TODO(crbug.com/1373439): clean up task posting when the combined
// network/worker project launches.
if (TaskQueueBase::Current() != worker_queue_) {
worker_queue_->PostTask(SafeTask(
task_safety_.flag(), [this, now = now, packet_ssrc, packet_type,
counter = std::move(counter), size]() {
RTC_DCHECK_RUN_ON(worker_queue_);
UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter),
size);
}));
} else {
RTC_DCHECK_RUN_ON(worker_queue_);
UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), size);
}
UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), size);
}
}
RtpSendRates RtpSenderEgress::GetSendRates() const {
MutexLock lock(&lock_);
return GetSendRatesLocked(clock_->CurrentTime());
}
RtpSendRates RtpSenderEgress::GetSendRatesLocked(Timestamp now) const {
RtpSendRates RtpSenderEgress::GetSendRates(Timestamp now) const {
RTC_DCHECK_RUN_ON(worker_queue_);
RtpSendRates current_rates;
for (size_t i = 0; i < kNumMediaTypes; ++i) {
RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
@ -332,26 +312,24 @@ RtpSendRates RtpSenderEgress::GetSendRatesLocked(Timestamp now) const {
void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
StreamDataCounters* rtx_stats) const {
// TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
// only touched on the worker thread.
MutexLock lock(&lock_);
RTC_DCHECK_RUN_ON(worker_queue_);
*rtp_stats = rtp_stats_;
*rtx_stats = rtx_rtp_stats_;
}
void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
bool part_of_allocation) {
MutexLock lock(&lock_);
RTC_DCHECK_RUN_ON(worker_queue_);
force_part_of_allocation_ = part_of_allocation;
}
bool RtpSenderEgress::MediaHasBeenSent() const {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK_RUN_ON(worker_queue_);
return media_has_been_sent_;
}
void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK_RUN_ON(worker_queue_);
media_has_been_sent_ = media_sent;
}
@ -387,15 +365,13 @@ std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
void RtpSenderEgress::SetFecProtectionParameters(
const FecProtectionParams& delta_params,
const FecProtectionParams& key_params) {
// TODO(sprang): Post task to pacer queue instead, one pacer is fully
// migrated to a task queue.
MutexLock lock(&lock_);
RTC_DCHECK_RUN_ON(worker_queue_);
pending_fec_params_.emplace(delta_params, key_params);
}
std::vector<std::unique_ptr<RtpPacketToSend>>
RtpSenderEgress::FetchFecPackets() {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK_RUN_ON(worker_queue_);
if (fec_generator_) {
return fec_generator_->GetFecPackets();
}
@ -404,7 +380,7 @@ RtpSenderEgress::FetchFecPackets() {
void RtpSenderEgress::OnAbortedRetransmissions(
rtc::ArrayView<const uint16_t> sequence_numbers) {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK_RUN_ON(worker_queue_);
// Mark aborted retransmissions as sent, rather than leaving them in
// a 'pending' state - otherwise they can not be requested again and
// will not be cleared until the history has reached its max size.
@ -469,13 +445,13 @@ void RtpSenderEgress::AddPacketToTransportFeedback(
void RtpSenderEgress::UpdateDelayStatistics(Timestamp capture_time,
Timestamp now,
uint32_t ssrc) {
RTC_DCHECK_RUN_ON(worker_queue_);
if (!send_side_delay_observer_ || capture_time.IsInfinite())
return;
TimeDelta avg_delay = TimeDelta::Zero();
TimeDelta max_delay = TimeDelta::Zero();
{
MutexLock lock(&lock_);
// Compute the max and average of the recent capture-to-send delays.
// The time complexity of the current approach depends on the distribution
// of the delay values. This could be done more efficiently.
@ -524,6 +500,7 @@ void RtpSenderEgress::UpdateDelayStatistics(Timestamp capture_time,
}
void RtpSenderEgress::RecomputeMaxSendDelay() {
RTC_DCHECK_RUN_ON(worker_queue_);
max_delay_it_ = send_delays_.begin();
for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) {
if (it->second >= max_delay_it_->second) {
@ -545,6 +522,7 @@ void RtpSenderEgress::UpdateOnSendPacket(int packet_id,
bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK_RUN_ON(worker_queue_);
int bytes_sent = -1;
if (transport_) {
bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
@ -573,32 +551,27 @@ void RtpSenderEgress::UpdateRtpStats(Timestamp now,
// TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
// worker thread.
RtpSendRates send_rates;
{
MutexLock lock(&lock_);
// TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
// only touched on the worker thread.
StreamDataCounters* counters =
packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
StreamDataCounters* counters =
packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
counters->MaybeSetFirstPacketTime(now);
counters->MaybeSetFirstPacketTime(now);
if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
counters->fec.Add(counter);
} else if (packet_type == RtpPacketMediaType::kRetransmission) {
counters->retransmitted.Add(counter);
}
if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
counters->fec.Add(counter);
} else if (packet_type == RtpPacketMediaType::kRetransmission) {
counters->retransmitted.Add(counter);
}
counters->transmitted.Add(counter);
send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now.ms());
if (bitrate_callback_) {
send_rates = GetSendRatesLocked(now);
send_rates = GetSendRates(now);
}
if (rtp_stats_callback_) {
rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc);
}
}
// The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
// to the same object, so these callbacks could be consolidated into one.
@ -612,7 +585,7 @@ void RtpSenderEgress::UpdateRtpStats(Timestamp now,
void RtpSenderEgress::PeriodicUpdate() {
RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(bitrate_callback_);
RtpSendRates send_rates = GetSendRates();
RtpSendRates send_rates = GetSendRates(clock_->CurrentTime());
bitrate_callback_->Notify(
send_rates.Sum().bps(),
send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
@ -622,7 +595,7 @@ void RtpSenderEgress::PeriodicUpdate() {
void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) {
RTC_DCHECK_RUN_ON(worker_queue_);
const auto rates = GetSendRates();
const auto rates = GetSendRates(now);
if (is_audio_) {
BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now.ms(),
rates.Sum().kbps(), packet_ssrc);
@ -638,5 +611,4 @@ void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) {
}
}
#endif // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
} // namespace webrtc

View File

@ -46,7 +46,9 @@ class RtpSenderEgress {
// without passing through an actual paced sender.
class NonPacedPacketSender : public RtpPacketSender {
public:
NonPacedPacketSender(RtpSenderEgress* sender, PacketSequencer* sequencer);
NonPacedPacketSender(TaskQueueBase& worker_queue,
RtpSenderEgress* sender,
PacketSequencer* sequencer);
virtual ~NonPacedPacketSender();
void EnqueuePackets(
@ -56,9 +58,11 @@ class RtpSenderEgress {
private:
void PrepareForSend(RtpPacketToSend* packet);
TaskQueueBase& worker_queue_;
uint16_t transport_sequence_number_;
RtpSenderEgress* const sender_;
PacketSequencer* sequencer_;
ScopedTaskSafety task_safety_;
};
RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
@ -66,22 +70,21 @@ class RtpSenderEgress {
~RtpSenderEgress();
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_);
const PacedPacketInfo& pacing_info);
void OnBatchComplete();
uint32_t Ssrc() const { return ssrc_; }
absl::optional<uint32_t> RtxSsrc() const { return rtx_ssrc_; }
absl::optional<uint32_t> FlexFecSsrc() const { return flexfec_ssrc_; }
RtpSendRates GetSendRates() const RTC_LOCKS_EXCLUDED(lock_);
RtpSendRates GetSendRates(Timestamp now) const;
void GetDataCounters(StreamDataCounters* rtp_stats,
StreamDataCounters* rtx_stats) const
RTC_LOCKS_EXCLUDED(lock_);
StreamDataCounters* rtx_stats) const;
void ForceIncludeSendPacketsInAllocation(bool part_of_allocation)
RTC_LOCKS_EXCLUDED(lock_);
bool MediaHasBeenSent() const RTC_LOCKS_EXCLUDED(lock_);
void SetMediaHasBeenSent(bool media_sent) RTC_LOCKS_EXCLUDED(lock_);
void SetTimestampOffset(uint32_t timestamp) RTC_LOCKS_EXCLUDED(lock_);
void ForceIncludeSendPacketsInAllocation(bool part_of_allocation);
bool MediaHasBeenSent() const;
void SetMediaHasBeenSent(bool media_sent);
void SetTimestampOffset(uint32_t timestamp);
// For each sequence number in `sequence_number`, recall the last RTP packet
// which bore it - its timestamp and whether it was the first and/or last
@ -89,8 +92,7 @@ class RtpSenderEgress {
// recalled, return a vector with all of them (in corresponding order).
// If any could not be recalled, return an empty vector.
std::vector<RtpSequenceNumberMap::Info> GetSentRtpPacketInfos(
rtc::ArrayView<const uint16_t> sequence_numbers) const
RTC_LOCKS_EXCLUDED(lock_);
rtc::ArrayView<const uint16_t> sequence_numbers) const;
void SetFecProtectionParameters(const FecProtectionParams& delta_params,
const FecProtectionParams& key_params);
@ -106,10 +108,7 @@ class RtpSenderEgress {
PacedPacketInfo info;
Timestamp now;
};
void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch)
RTC_LOCKS_EXCLUDED(lock_) RTC_RUN_ON(pacer_checker_);
RtpSendRates GetSendRatesLocked(Timestamp now) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch);
bool HasCorrectSsrc(const RtpPacketToSend& packet) const;
void AddPacketToTransportFeedback(uint16_t packet_id,
const RtpPacketToSend& packet,
@ -117,13 +116,12 @@ class RtpSenderEgress {
void UpdateDelayStatistics(Timestamp capture_time,
Timestamp now,
uint32_t ssrc);
void RecomputeMaxSendDelay() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void RecomputeMaxSendDelay();
void UpdateOnSendPacket(int packet_id, Timestamp capture_time, uint32_t ssrc);
// Sends packet on to `transport_`, leaving the RTP module.
bool SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options,
const PacedPacketInfo& pacing_info);
void UpdateRtpStats(Timestamp now,
uint32_t packet_ssrc,
RtpPacketMediaType packet_type,
@ -138,20 +136,19 @@ class RtpSenderEgress {
const bool enable_send_packet_batching_;
TaskQueueBase* const worker_queue_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_checker_;
const uint32_t ssrc_;
const absl::optional<uint32_t> rtx_ssrc_;
const absl::optional<uint32_t> flexfec_ssrc_;
const bool populate_network2_timestamp_;
Clock* const clock_;
RtpPacketHistory* const packet_history_;
RtpPacketHistory* const packet_history_ RTC_GUARDED_BY(worker_queue_);
Transport* const transport_;
RtcEventLog* const event_log_;
const bool is_audio_;
const bool need_rtp_packet_infos_;
VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(pacer_checker_);
absl::optional<uint16_t> last_sent_seq_ RTC_GUARDED_BY(pacer_checker_);
absl::optional<uint16_t> last_sent_rtx_seq_ RTC_GUARDED_BY(pacer_checker_);
VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(worker_queue_);
absl::optional<uint16_t> last_sent_seq_ RTC_GUARDED_BY(worker_queue_);
absl::optional<uint16_t> last_sent_rtx_seq_ RTC_GUARDED_BY(worker_queue_);
TransportFeedbackObserver* const transport_feedback_observer_;
SendSideDelayObserver* const send_side_delay_observer_;
@ -159,24 +156,23 @@ class RtpSenderEgress {
StreamDataCountersCallback* const rtp_stats_callback_;
BitrateStatisticsObserver* const bitrate_callback_;
mutable Mutex lock_;
bool media_has_been_sent_ RTC_GUARDED_BY(pacer_checker_);
bool force_part_of_allocation_ RTC_GUARDED_BY(lock_);
bool media_has_been_sent_ RTC_GUARDED_BY(worker_queue_);
bool force_part_of_allocation_ RTC_GUARDED_BY(worker_queue_);
uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_);
// Maps capture time to send-side delay. Send-side delay is the difference
// between transmission time and capture time.
std::map<Timestamp, TimeDelta> send_delays_ RTC_GUARDED_BY(lock_);
std::map<Timestamp, TimeDelta> send_delays_ RTC_GUARDED_BY(worker_queue_);
std::map<Timestamp, TimeDelta>::const_iterator max_delay_it_
RTC_GUARDED_BY(lock_);
RTC_GUARDED_BY(worker_queue_);
// The sum of delays over a kSendSideDelayWindowMs sliding window.
TimeDelta sum_delays_ RTC_GUARDED_BY(lock_);
StreamDataCounters rtp_stats_ RTC_GUARDED_BY(lock_);
StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(lock_);
TimeDelta sum_delays_ RTC_GUARDED_BY(worker_queue_);
StreamDataCounters rtp_stats_ RTC_GUARDED_BY(worker_queue_);
StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(worker_queue_);
// One element per value in RtpPacketMediaType, with index matching value.
std::vector<RateStatistics> send_rates_ RTC_GUARDED_BY(lock_);
std::vector<RateStatistics> send_rates_ RTC_GUARDED_BY(worker_queue_);
absl::optional<std::pair<FecProtectionParams, FecProtectionParams>>
pending_fec_params_ RTC_GUARDED_BY(lock_);
pending_fec_params_ RTC_GUARDED_BY(worker_queue_);
// Maps sent packets' sequence numbers to a tuple consisting of:
// 1. The timestamp, without the randomizing offset mandated by the RFC.
@ -185,7 +181,7 @@ class RtpSenderEgress {
const std::unique_ptr<RtpSequenceNumberMap> rtp_sequence_number_map_
RTC_GUARDED_BY(worker_queue_);
RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_);
std::vector<Packet> packets_to_send_ RTC_GUARDED_BY(pacer_checker_);
std::vector<Packet> packets_to_send_ RTC_GUARDED_BY(worker_queue_);
ScopedTaskSafety task_safety_;
};

View File

@ -465,7 +465,9 @@ TEST_F(RtpSenderEgressTest, ReportsFecRate) {
}
EXPECT_NEAR(
(sender->GetSendRates()[RtpPacketMediaType::kForwardErrorCorrection])
(sender->GetSendRates(
time_controller_.GetClock()
->CurrentTime())[RtpPacketMediaType::kForwardErrorCorrection])
.bps(),
(total_fec_data_sent / (kTimeBetweenPackets * kNumPackets)).bps(), 500);
}