Remove pending packets from the pacer when an RTP module is removed.

This CL adds functionality to remove packets matching a given SSRC from
the pacer queue, and calls that with any SSRCs used by an RTP module
when that module is removed.

Bug: chromium:1395081
Change-Id: I13c0285ddca600e784ad04a806727a508ede6dcc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/287124
Reviewed-by: Jakob Ivarsson‎ <jakobi@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38880}
This commit is contained in:
Erik Språng 2022-12-09 21:38:44 +01:00 committed by WebRTC LUCI CQ
parent b02a8f5a7c
commit 1b11b58b56
14 changed files with 296 additions and 44 deletions

View File

@ -262,6 +262,11 @@ class RtpPacketSenderProxy : public RtpPacketSender {
rtp_packet_pacer_->EnqueuePackets(std::move(packets));
}
void RemovePacketsForSsrc(uint32_t ssrc) override {
MutexLock lock(&mutex_);
rtp_packet_pacer_->RemovePacketsForSsrc(ssrc);
}
private:
SequenceChecker thread_checker_;
Mutex mutex_;
@ -565,6 +570,7 @@ void ChannelSend::StopSend() {
RTC_DCHECK(packet_router_);
packet_router_->RemoveSendRtpModule(rtp_rtcp_.get());
rtp_packet_pacer_proxy_->RemovePacketsForSsrc(rtp_rtcp_->SSRC());
}
void ChannelSend::SetEncoder(int payload_type,

View File

@ -515,6 +515,17 @@ void RtpVideoSender::SetActiveModulesLocked(
// prevent any stray packets in the pacer from asynchronously arriving
// to a disabled module.
transport_->packet_router()->RemoveSendRtpModule(&rtp_module);
// Clear the pacer queue of any packets pertaining to this module.
transport_->packet_sender()->RemovePacketsForSsrc(rtp_module.SSRC());
if (rtp_module.RtxSsrc().has_value()) {
transport_->packet_sender()->RemovePacketsForSsrc(
*rtp_module.RtxSsrc());
}
if (rtp_module.FlexfecSsrc().has_value()) {
transport_->packet_sender()->RemovePacketsForSsrc(
*rtp_module.FlexfecSsrc());
}
}
// If set to false this module won't send media.

View File

@ -1098,4 +1098,81 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) {
}
}
TEST(RtpVideoSenderTest, ClearsPendingPacketsOnInactivation) {
RtpVideoSenderTestFixture test({kSsrc1}, {kRtxSsrc1}, kPayloadType, {});
test.SetActiveModules({true});
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
kDependencyDescriptorExtensionId);
std::vector<RtpPacket> sent_packets;
ON_CALL(test.transport(), SendRtp)
.WillByDefault([&](const uint8_t* packet, size_t length,
const PacketOptions& options) {
sent_packets.emplace_back(&extensions);
EXPECT_TRUE(sent_packets.back().Parse(packet, length));
return true;
});
// Set a very low bitrate.
test.router()->OnBitrateUpdated(
CreateBitrateAllocationUpdate(/*rate_bps=*/30'000),
/*framerate=*/30);
// Create and send a large keyframe.
const size_t kImageSizeBytes = 10000;
constexpr uint8_t kPayload[kImageSizeBytes] = {'a'};
EncodedImage encoded_image;
encoded_image.SetTimestamp(1);
encoded_image.capture_time_ms_ = 2;
encoded_image._frameType = VideoFrameType::kVideoFrameKey;
encoded_image.SetEncodedData(
EncodedImageBuffer::Create(kPayload, sizeof(kPayload)));
EXPECT_EQ(test.router()
->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
.error,
EncodedImageCallback::Result::OK);
// Advance time a small amount, check that sent data is only part of the
// image.
test.AdvanceTime(TimeDelta::Millis(5));
DataSize transmittedPayload = DataSize::Zero();
for (const RtpPacket& packet : sent_packets) {
transmittedPayload += DataSize::Bytes(packet.payload_size());
// Make sure we don't see the end of the frame.
EXPECT_FALSE(packet.Marker());
}
EXPECT_GT(transmittedPayload, DataSize::Zero());
EXPECT_LT(transmittedPayload, DataSize::Bytes(kImageSizeBytes / 4));
// Record the RTP timestamp of the first frame.
const uint32_t first_frame_timestamp = sent_packets[0].Timestamp();
sent_packets.clear();
// Disable the sending module and advance time slightly. No packets should be
// sent.
test.SetActiveModules({false});
test.AdvanceTime(TimeDelta::Millis(20));
EXPECT_TRUE(sent_packets.empty());
// Reactive the send module - any packets should have been removed, so nothing
// should be transmitted.
test.SetActiveModules({true});
test.AdvanceTime(TimeDelta::Millis(33));
EXPECT_TRUE(sent_packets.empty());
// Send a new frame.
encoded_image.SetTimestamp(3);
encoded_image.capture_time_ms_ = 4;
EXPECT_EQ(test.router()
->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
.error,
EncodedImageCallback::Result::OK);
test.AdvanceTime(TimeDelta::Millis(33));
// Advance time, check we get new packets - but only for the second frame.
EXPECT_FALSE(sent_packets.empty());
EXPECT_NE(sent_packets[0].Timestamp(), first_frame_timestamp);
}
} // namespace webrtc

View File

@ -148,6 +148,10 @@ void PacingController::SetCircuitBreakerThreshold(int num_iterations) {
circuit_breaker_threshold_ = num_iterations;
}
void PacingController::RemovePacketsForSsrc(uint32_t ssrc) {
packet_queue_.RemovePacketsForSsrc(ssrc);
}
bool PacingController::IsProbing() const {
return prober_.is_probing();
}

View File

@ -166,6 +166,9 @@ class PacingController {
// is considered erroneous to exceed.
void SetCircuitBreakerThreshold(int num_iterations);
// Remove any pending packets matching this SSRC from the packet queue.
void RemovePacketsForSsrc(uint32_t ssrc);
private:
TimeDelta UpdateTimeAndGetElapsed(Timestamp now);
bool ShouldSendKeepalive(Timestamp now) const;

View File

@ -60,7 +60,7 @@ bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet,
}
PrioritizedPacketQueue::QueuedPacket
PrioritizedPacketQueue::StreamQueue::DequePacket(int priority_level) {
PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) {
RTC_DCHECK(!packets_[priority_level].empty());
QueuedPacket packet = std::move(packets_[priority_level].front());
packets_[priority_level].pop_front();
@ -91,6 +91,16 @@ Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const {
return last_enqueue_time_;
}
std::array<std::deque<PrioritizedPacketQueue::QueuedPacket>,
PrioritizedPacketQueue::kNumPriorityLevels>
PrioritizedPacketQueue::StreamQueue::DequeueAll() {
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio;
for (int i = 0; i < kNumPriorityLevels; ++i) {
packets_by_prio[i].swap(packets_[i]);
}
return packets_by_prio;
}
PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time)
: queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()),
@ -162,54 +172,16 @@ std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {
RTC_DCHECK_GE(top_active_prio_level_, 0);
StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
QueuedPacket packet = stream_queue.DequePacket(top_active_prio_level_);
--size_packets_;
RTC_DCHECK(packet.packet->packet_type().has_value());
RtpPacketMediaType packet_type = packet.packet->packet_type().value();
--size_packets_per_media_type_[static_cast<size_t>(packet_type)];
RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
0);
size_payload_ -= packet.PacketSize();
// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the `pause_time_sum_ms_` was
// subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
TimeDelta time_in_non_paused_state =
last_update_time_ - packet.enqueue_time - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;
// Set the time spent in the send queue, which is the per-packet equivalent of
// totalPacketSendDelay. The notion of being paused is an implementation
// detail that we do not want to expose, so it makes sense to report the
// metric excluding the pause time. This also avoids spikes in the metric.
// https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
packet.packet->set_time_in_send_queue(time_in_non_paused_state);
RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
enqueue_times_.erase(packet.enqueue_time_iterator);
QueuedPacket packet = stream_queue.DequeuePacket(top_active_prio_level_);
DequeuePacketInternal(packet);
// Remove StreamQueue from head of fifo-queue for this prio level, and
// and add it to the end if it still has packets.
streams_by_prio_[top_active_prio_level_].pop_front();
if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
} else if (streams_by_prio_[top_active_prio_level_].empty()) {
// No stream queues have packets at this prio level, find top priority
// that is not empty.
if (size_packets_ == 0) {
top_active_prio_level_ = -1;
} else {
for (int i = 0; i < kNumPriorityLevels; ++i) {
if (!streams_by_prio_[i].empty()) {
top_active_prio_level_ = i;
break;
}
}
}
} else {
MaybeUpdateTopPrioLevel();
}
return std::move(packet.packet);
@ -276,4 +248,96 @@ void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) {
paused_ = paused;
}
void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
auto kv = streams_.find(ssrc);
if (kv != streams_.end()) {
// Dequeue all packets from the queue for this SSRC.
StreamQueue& queue = *kv->second;
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio =
queue.DequeueAll();
for (int i = 0; i < kNumPriorityLevels; ++i) {
std::deque<QueuedPacket>& packet_queue = packets_by_prio[i];
if (packet_queue.empty()) {
continue;
}
// First erase all packets at this prio level.
while (!packet_queue.empty()) {
QueuedPacket packet = std::move(packet_queue.front());
packet_queue.pop_front();
DequeuePacketInternal(packet);
}
// Next, deregister this `StreamQueue` from the round-robin tables.
RTC_DCHECK(!streams_by_prio_[i].empty());
if (streams_by_prio_[i].size() == 1) {
// This is the last and only queue that had packets for this prio level.
// Update the global top prio level if neccessary.
RTC_DCHECK(streams_by_prio_[i].front() == &queue);
streams_by_prio_[i].pop_front();
if (i == top_active_prio_level_) {
MaybeUpdateTopPrioLevel();
}
} else {
// More than stream had packets at this prio level, filter this one out.
std::deque<StreamQueue*> filtered_queue;
for (StreamQueue* queue_ptr : streams_by_prio_[i]) {
if (queue_ptr != &queue) {
filtered_queue.push_back(queue_ptr);
}
}
streams_by_prio_[i].swap(filtered_queue);
}
}
}
}
void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) {
--size_packets_;
RTC_DCHECK(packet.packet->packet_type().has_value());
RtpPacketMediaType packet_type = packet.packet->packet_type().value();
--size_packets_per_media_type_[static_cast<size_t>(packet_type)];
RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
0);
size_payload_ -= packet.PacketSize();
// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the `pause_time_sum_ms_` was
// subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
TimeDelta time_in_non_paused_state =
last_update_time_ - packet.enqueue_time - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;
// Set the time spent in the send queue, which is the per-packet equivalent of
// totalPacketSendDelay. The notion of being paused is an implementation
// detail that we do not want to expose, so it makes sense to report the
// metric excluding the pause time. This also avoids spikes in the metric.
// https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
packet.packet->set_time_in_send_queue(time_in_non_paused_state);
RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
enqueue_times_.erase(packet.enqueue_time_iterator);
}
void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
if (streams_by_prio_[top_active_prio_level_].empty()) {
// No stream queues have packets at this prio level, find top priority
// that is not empty.
if (size_packets_ == 0) {
top_active_prio_level_ = -1;
} else {
for (int i = 0; i < kNumPriorityLevels; ++i) {
if (!streams_by_prio_[i].empty()) {
top_active_prio_level_ = i;
break;
}
}
}
}
}
} // namespace webrtc

View File

@ -13,10 +13,12 @@
#include <stddef.h>
#include <array>
#include <deque>
#include <list>
#include <memory>
#include <unordered_map>
#include <vector>
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
@ -80,6 +82,9 @@ class PrioritizedPacketQueue {
// Set the pause state, while `paused` is true queuing time is not counted.
void SetPauseState(bool paused, Timestamp now);
// Remove any packets matching the given SSRC.
void RemovePacketsForSsrc(uint32_t ssrc);
private:
static constexpr int kNumPriorityLevels = 4;
@ -107,18 +112,27 @@ class PrioritizedPacketQueue {
// count for that priority level went from zero to non-zero.
bool EnqueuePacket(QueuedPacket packet, int priority_level);
QueuedPacket DequePacket(int priority_level);
QueuedPacket DequeuePacket(int priority_level);
bool HasPacketsAtPrio(int priority_level) const;
bool IsEmpty() const;
Timestamp LeadingPacketEnqueueTime(int priority_level) const;
Timestamp LastEnqueueTime() const;
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> DequeueAll();
private:
std::deque<QueuedPacket> packets_[kNumPriorityLevels];
Timestamp last_enqueue_time_;
};
// Remove the packet from the internal state, e.g. queue time / size etc.
void DequeuePacketInternal(QueuedPacket& packet);
// Check if the queue pointed to by `top_active_prio_level_` is empty and
// if so move it to the lowest non-empty index.
void MaybeUpdateTopPrioLevel();
// Cumulative sum, over all packets, of time spent in the queue.
TimeDelta queue_time_sum_;
// Cumulative sum of time the queue has spent in a paused state.

View File

@ -306,4 +306,58 @@ TEST(PrioritizedPacketQueue,
}
}
TEST(PrioritizedPacketQueue, ClearsPackets) {
Timestamp now = Timestamp::Zero();
PrioritizedPacketQueue queue(now);
const uint32_t kSsrc = 1;
// Add two packets of each type, all using the same SSRC.
int sequence_number = 0;
for (size_t i = 0; i < kNumMediaTypes; ++i) {
queue.Push(now, CreatePacket(static_cast<RtpPacketMediaType>(i),
sequence_number++, kSsrc));
queue.Push(now, CreatePacket(static_cast<RtpPacketMediaType>(i),
sequence_number++, kSsrc));
}
EXPECT_EQ(queue.SizeInPackets(), 2 * int{kNumMediaTypes});
// Remove all of them.
queue.RemovePacketsForSsrc(kSsrc);
EXPECT_TRUE(queue.Empty());
}
TEST(PrioritizedPacketQueue, ClearPacketsAffectsOnlySpecifiedSsrc) {
Timestamp now = Timestamp::Zero();
PrioritizedPacketQueue queue(now);
const uint32_t kRemovingSsrc = 1;
const uint32_t kStayingSsrc = 2;
// Add an audio packet and a retransmission for the SSRC we will remove,
// ensuring they are first in line.
queue.Push(
now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/1, kRemovingSsrc));
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/2,
kRemovingSsrc));
// Add a video packet and a retransmission for the SSRC that will remain.
// The retransmission packets now both have pointers to their respective qeues
// from the same prio level.
queue.Push(now,
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3, kStayingSsrc));
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/4,
kStayingSsrc));
EXPECT_EQ(queue.SizeInPackets(), 4);
// Clear the first two packets.
queue.RemovePacketsForSsrc(kRemovingSsrc);
EXPECT_EQ(queue.SizeInPackets(), 2);
// We should get the single remaining retransmission first, then the video
// packet.
EXPECT_EQ(queue.Pop()->SequenceNumber(), 4);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 3);
EXPECT_TRUE(queue.Empty());
}
} // namespace webrtc

View File

@ -181,6 +181,14 @@ void TaskQueuePacedSender::EnqueuePackets(
}));
}
void TaskQueuePacedSender::RemovePacketsForSsrc(uint32_t ssrc) {
task_queue_.RunOrPost([this, ssrc]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.RemovePacketsForSsrc(ssrc);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
task_queue_.RunOrPost([this, account_for_audio]() {
RTC_DCHECK_RUN_ON(&task_queue_);

View File

@ -70,6 +70,8 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// PacingController::PacketSender::SendPacket() when it's time to send.
void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
// Remove any pending packets matching this SSRC from the packet queue.
void RemovePacketsForSsrc(uint32_t ssrc) override;
// Methods implementing RtpPacketPacer.

View File

@ -28,6 +28,11 @@ class RtpPacketSender {
// packets and the current target send rate.
virtual void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) = 0;
// Clear any pending packets with the given SSRC from the queue.
// TODO(crbug.com/1395081): Make pure virtual when downstream code has been
// updated.
virtual void RemovePacketsForSsrc(uint32_t ssrc) {}
};
} // namespace webrtc

View File

@ -43,6 +43,7 @@ class DEPRECATED_RtpSenderEgress {
void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
void RemovePacketsForSsrc(uint32_t ssrc) override {}
private:
uint16_t transport_sequence_number_;

View File

@ -49,6 +49,8 @@ class RtpSenderEgress {
void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
// Since we don't pace packets, there's no pending packets to remove.
void RemovePacketsForSsrc(uint32_t ssrc) override {}
private:
void PrepareForSend(RtpPacketToSend* packet);

View File

@ -102,6 +102,7 @@ class MockRtpPacketPacer : public RtpPacketSender {
EnqueuePackets,
(std::vector<std::unique_ptr<RtpPacketToSend>>),
(override));
MOCK_METHOD(void, RemovePacketsForSsrc, (uint32_t), (override));
};
} // namespace