Prune packets from pacer when sending deactivated via bitrate allocation

Fixed: webrtc:368059232
Change-Id: I62a1b02db0da27cf5ebf24b9b57e8989c1d0664d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/363280
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#43076}
This commit is contained in:
Danil Chapovalov 2024-09-24 11:22:55 +02:00 committed by WebRTC LUCI CQ
parent 965f134b2b
commit 0f02728dd4
5 changed files with 145 additions and 24 deletions

View File

@ -246,6 +246,7 @@ rtc_library("rtp_sender") {
"../rtc_base/system:no_unique_address",
"../rtc_base/task_utils:repeating_task",
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/base:nullability",
"//third_party/abseil-cpp/absl/container:inlined_vector",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/strings:string_view",

View File

@ -167,8 +167,8 @@ RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>(
env_, suspended_ssrcs, states, rtp_config, rtcp_report_interval_ms,
send_transport, observers,
env_, task_queue_, suspended_ssrcs, states, rtp_config,
rtcp_report_interval_ms, send_transport, observers,
// TODO(holmer): Remove this circular dependency by injecting
// the parts of RtpTransportControllerSendInterface that are really used.
this, &retransmission_rate_limiter_, std::move(fec_controller),

View File

@ -387,6 +387,7 @@ bool IsFirstFrameOfACodedVideoSequence(
RtpVideoSender::RtpVideoSender(
const Environment& env,
absl::Nonnull<TaskQueueBase*> transport_queue,
const std::map<uint32_t, RtpState>& suspended_ssrcs,
const std::map<uint32_t, RtpPayloadState>& states,
const RtpConfig& rtp_config,
@ -404,6 +405,7 @@ RtpVideoSender::RtpVideoSender(
env.field_trials().Lookup("WebRTC-Video-UseFrameRateForOverhead"),
"Enabled")),
has_packet_feedback_(TransportSeqNumExtensionConfigured(rtp_config)),
transport_queue_(*transport_queue),
active_(false),
fec_controller_(std::move(fec_controller)),
fec_allowed_(true),
@ -428,7 +430,10 @@ RtpVideoSender::RtpVideoSender(
transport_overhead_bytes_per_packet_(0),
encoder_target_rate_bps_(0),
frame_counts_(rtp_config.ssrcs.size()),
frame_count_observer_(observers.frame_count_observer) {
frame_count_observer_(observers.frame_count_observer),
safety_(PendingTaskSafetyFlag::CreateAttachedToTaskQueue(
/*alive=*/true,
transport_queue)) {
transport_checker_.Detach();
RTC_DCHECK_EQ(rtp_config_.ssrcs.size(), rtp_streams_.size());
if (has_packet_feedback_)
@ -493,8 +498,7 @@ RtpVideoSender::RtpVideoSender(
RtpVideoSender::~RtpVideoSender() {
RTC_DCHECK_RUN_ON(&transport_checker_);
SetActiveModulesLocked(
/*sending=*/false);
SetActiveModulesLocked(/*sending=*/false);
}
void RtpVideoSender::SetSending(bool enabled) {
@ -512,15 +516,8 @@ void RtpVideoSender::SetActiveModulesLocked(bool sending) {
return;
}
active_ = sending;
for (size_t i = 0; i < rtp_streams_.size(); ++i) {
RtpRtcpInterface& rtp_module = *rtp_streams_[i].rtp_rtcp;
rtp_module.SetSendingStatus(sending);
rtp_module.SetSendingMediaStatus(sending);
if (sending) {
transport_->RegisterSendingRtpStream(rtp_module);
} else {
transport_->DeRegisterSendingRtpStream(rtp_module);
}
for (const RtpStreamSender& stream : rtp_streams_) {
SetModuleIsActive(sending, *stream.rtp_rtcp);
}
auto* feedback_provider = transport_->GetStreamFeedbackProvider();
if (!sending) {
@ -530,6 +527,21 @@ void RtpVideoSender::SetActiveModulesLocked(bool sending) {
}
}
void RtpVideoSender::SetModuleIsActive(bool sending,
RtpRtcpInterface& rtp_module) {
if (rtp_module.SendingMedia() == sending) {
return;
}
rtp_module.SetSendingStatus(sending);
rtp_module.SetSendingMediaStatus(sending);
if (sending) {
transport_->RegisterSendingRtpStream(rtp_module);
} else {
transport_->DeRegisterSendingRtpStream(rtp_module);
}
}
bool RtpVideoSender::IsActive() {
RTC_DCHECK_RUN_ON(&transport_checker_);
MutexLock lock(&mutex_);
@ -654,6 +666,7 @@ void RtpVideoSender::OnBitrateAllocationUpdated(
}
}
}
void RtpVideoSender::OnVideoLayersAllocationUpdated(
const VideoLayersAllocation& allocation) {
MutexLock lock(&mutex_);
@ -663,15 +676,28 @@ void RtpVideoSender::OnVideoLayersAllocationUpdated(
stream_allocation.rtp_stream_index = i;
rtp_streams_[i].sender_video->SetVideoLayersAllocation(
std::move(stream_allocation));
// Only send video frames on the rtp module if the encoder is configured
// to send. This is to prevent stray frames to be sent after an encoder
// has been reconfigured.
rtp_streams_[i].rtp_rtcp->SetSendingMediaStatus(
absl::c_any_of(allocation.active_spatial_layers,
[&i](const VideoLayersAllocation::SpatialLayer layer) {
return layer.rtp_stream_index == static_cast<int>(i);
}));
}
// Only send video frames on the rtp module if the encoder is configured
// to send. This is to prevent stray frames to be sent after an encoder
// has been reconfigured.
// Reconfiguration of the RtpRtcp modules must happen on the transport queue
// to avoid races with batch sending of packets.
std::vector<bool> sending(rtp_streams_.size(), false);
for (const VideoLayersAllocation::SpatialLayer& layer :
allocation.active_spatial_layers) {
if (layer.rtp_stream_index < static_cast<int>(sending.size())) {
sending[layer.rtp_stream_index] = true;
}
}
transport_queue_.PostTask(
SafeTask(safety_.flag(), [this, sending = std::move(sending)] {
RTC_DCHECK_RUN_ON(&transport_checker_);
RTC_CHECK_EQ(sending.size(), rtp_streams_.size());
for (size_t i = 0; i < sending.size(); ++i) {
SetModuleIsActive(sending[i], *rtp_streams_[i].rtp_rtcp);
}
}));
}
}

View File

@ -18,6 +18,7 @@
#include <optional>
#include <vector>
#include "absl/base/nullability.h"
#include "api/array_view.h"
#include "api/call/bitrate_allocation.h"
#include "api/call/transport.h"
@ -27,6 +28,8 @@
#include "api/frame_transformer_interface.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/frequency.h"
@ -84,6 +87,7 @@ class RtpVideoSender : public RtpVideoSenderInterface,
// Rtp modules are assumed to be sorted in simulcast index order.
RtpVideoSender(
const Environment& env,
absl::Nonnull<TaskQueueBase*> transport_queue,
const std::map<uint32_t, RtpState>& suspended_ssrcs,
const std::map<uint32_t, RtpPayloadState>& states,
const RtpConfig& rtp_config,
@ -173,6 +177,8 @@ class RtpVideoSender : public RtpVideoSenderInterface,
DataSize packet_size,
DataSize overhead_per_packet,
Frequency framerate) const;
void SetModuleIsActive(bool sending, RtpRtcpInterface& rtp_module)
RTC_RUN_ON(transport_checker_);
const Environment env_;
const bool use_frame_rate_for_overhead_;
@ -181,6 +187,7 @@ class RtpVideoSender : public RtpVideoSenderInterface,
// Semantically equivalent to checking for `transport_->GetWorkerQueue()`
// but some tests need to be updated to call from the correct context.
RTC_NO_UNIQUE_ADDRESS SequenceChecker transport_checker_;
TaskQueueBase& transport_queue_;
// TODO(bugs.webrtc.org/13517): Remove mutex_ once RtpVideoSender runs on the
// transport task queue.
@ -218,6 +225,8 @@ class RtpVideoSender : public RtpVideoSenderInterface,
// This map is set at construction time and never changed, but it's
// non-trivial to make it properly const.
std::map<uint32_t, RtpRtcpInterface*> ssrc_to_rtp_module_;
ScopedTaskSafety safety_;
};
} // namespace webrtc

View File

@ -75,6 +75,8 @@ namespace webrtc {
namespace {
using ::testing::_;
using ::testing::Ge;
using ::testing::IsEmpty;
using ::testing::NiceMock;
using ::testing::SaveArg;
using ::testing::SizeIs;
@ -175,8 +177,9 @@ class RtpVideoSenderTestFixture {
transport_controller_.EnsureStarted();
std::map<uint32_t, RtpState> suspended_ssrcs;
router_ = std::make_unique<RtpVideoSender>(
env_, suspended_ssrcs, suspended_payload_states, config_.rtp,
config_.rtcp_report_interval_ms, &transport_,
env_, time_controller_.GetMainThread(), suspended_ssrcs,
suspended_payload_states, config_.rtp, config_.rtcp_report_interval_ms,
&transport_,
CreateObservers(&encoder_feedback_, &stats_proxy_, &stats_proxy_,
&stats_proxy_, frame_count_observer, &stats_proxy_),
&transport_controller_, &retransmission_rate_limiter_,
@ -348,6 +351,7 @@ TEST(RtpVideoSenderTest,
// Only rtp stream index 0 is configured to send a stream.
test.router()->OnVideoLayersAllocationUpdated(
{.active_spatial_layers = {{.rtp_stream_index = 0}}});
test.AdvanceTime(TimeDelta::Millis(33));
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
EXPECT_NE(EncodedImageCallback::Result::OK,
@ -1339,6 +1343,87 @@ TEST(RtpVideoSenderTest, ClearsPendingPacketsOnInactivation) {
EXPECT_NE(sent_packets[0].Timestamp(), first_frame_timestamp);
}
TEST(RtpVideoSenderTest,
ClearsPendingPacketsOnInactivationWithLayerAllocation) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {});
test.SetSending(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
kDependencyDescriptorExtensionId);
std::vector<RtpPacket> sent_packets;
ON_CALL(test.transport(), SendRtp)
.WillByDefault([&](rtc::ArrayView<const uint8_t> packet,
const PacketOptions& options) {
sent_packets.emplace_back(&extensions);
EXPECT_TRUE(sent_packets.back().Parse(packet));
return true;
});
// Set a very low bitrate.
test.router()->OnBitrateUpdated(
CreateBitrateAllocationUpdate(/*rate_bps=*/10'000),
/*framerate=*/30);
// Create and send a large keyframe.
constexpr uint8_t kImage[10'000] = {};
EncodedImage encoded_image;
encoded_image.SetSimulcastIndex(0);
encoded_image.SetRtpTimestamp(1);
encoded_image.capture_time_ms_ = 2;
encoded_image._frameType = VideoFrameType::kVideoFrameKey;
encoded_image.SetEncodedData(
EncodedImageBuffer::Create(kImage, std::size(kImage)));
EXPECT_EQ(test.router()
->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
.error,
EncodedImageCallback::Result::OK);
// Advance time a small amount, check that sent data is only part of the
// image.
test.AdvanceTime(TimeDelta::Millis(5));
DataSize transmitted_payload = DataSize::Zero();
for (const RtpPacket& packet : sent_packets) {
transmitted_payload += DataSize::Bytes(packet.payload_size());
// Make sure we don't see the end of the frame.
EXPECT_FALSE(packet.Marker());
}
EXPECT_GT(transmitted_payload, DataSize::Zero());
EXPECT_LT(transmitted_payload, DataSize::Bytes(std::size(kImage)) / 3);
// Record the RTP timestamp of the first frame.
const uint32_t first_frame_timestamp = sent_packets[0].Timestamp();
sent_packets.clear();
// Disable the 1st sending module and advance time slightly. No packets should
// be sent.
test.router()->OnVideoLayersAllocationUpdated(
{.active_spatial_layers = {{.rtp_stream_index = 1}}});
test.AdvanceTime(TimeDelta::Millis(20));
EXPECT_THAT(sent_packets, IsEmpty());
// Reactive the send module - any packets should have been removed, so nothing
// should be transmitted.
test.router()->OnVideoLayersAllocationUpdated(
{.active_spatial_layers = {{.rtp_stream_index = 0},
{.rtp_stream_index = 1}}});
test.AdvanceTime(TimeDelta::Millis(33));
EXPECT_THAT(sent_packets, IsEmpty());
// Send a new frame.
encoded_image.SetRtpTimestamp(3);
encoded_image.capture_time_ms_ = 4;
EXPECT_EQ(test.router()
->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
.error,
EncodedImageCallback::Result::OK);
test.AdvanceTime(TimeDelta::Millis(33));
// Advance time, check we get new packets - but only for the second frame.
ASSERT_THAT(sent_packets, SizeIs(Ge(1)));
EXPECT_NE(sent_packets[0].Timestamp(), first_frame_timestamp);
}
// Integration test verifying that when retransmission mode is set to
// kRetransmitBaseLayer,only base layer is retransmitted.
TEST(RtpVideoSenderTest, RetransmitsBaseLayerOnly) {