Prepare packet router for flushing mechanism.

This CL adds the ability to forward aborted retransmission notifications
to specified RTP modules, as well as a way to find the RTX ssrc
associated with a media SSRC.
These will both be used by upcoming logic that can selectively flush
given streams from the pacer queue.

Bug: webrtc:11340
Change-Id: Ief3be47e4fd7dc5a1499bc21890e8979400ecb44
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/274706
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38050}
This commit is contained in:
Erik Språng 2022-09-09 15:06:51 +02:00 committed by WebRTC LUCI CQ
parent 5592a6ddad
commit 767f504875
5 changed files with 132 additions and 7 deletions

View File

@ -52,6 +52,15 @@ class PacingController {
virtual std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() = 0;
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) = 0;
// TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects
// have been updated.
virtual void OnAbortedRetransmissions(
uint32_t ssrc,
rtc::ArrayView<const uint16_t> sequence_numbers) {}
virtual absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const {
return absl::nullopt;
}
};
// Expected max pacer delay. If ExpectedQueueTime() is higher than

View File

@ -128,6 +128,14 @@ class MockPacingControllerCallback : public PacingController::PacketSender {
(),
(override));
MOCK_METHOD(size_t, SendPadding, (size_t target_size));
MOCK_METHOD(void,
OnAbortedRetransmissions,
(uint32_t, rtc::ArrayView<const uint16_t>),
(override));
MOCK_METHOD(absl::optional<uint32_t>,
GetRtxSsrcForMedia,
(uint32_t),
(const, override));
};
// Mock callback implementing the raw api.
@ -147,6 +155,14 @@ class MockPacketSender : public PacingController::PacketSender {
GeneratePadding,
(DataSize target_size),
(override));
MOCK_METHOD(void,
OnAbortedRetransmissions,
(uint32_t, rtc::ArrayView<const uint16_t>),
(override));
MOCK_METHOD(absl::optional<uint32_t>,
GetRtxSsrcForMedia,
(uint32_t),
(const, override));
};
class PacingControllerPadding : public PacingController::PacketSender {
@ -178,6 +194,12 @@ class PacingControllerPadding : public PacingController::PacketSender {
return packets;
}
void OnAbortedRetransmissions(uint32_t,
rtc::ArrayView<const uint16_t>) override {}
absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
return absl::nullopt;
}
size_t padding_sent() { return padding_sent_; }
size_t total_bytes_sent() { return total_bytes_sent_; }
@ -220,6 +242,12 @@ class PacingControllerProbing : public PacingController::PacketSender {
return packets;
}
void OnAbortedRetransmissions(uint32_t,
rtc::ArrayView<const uint16_t>) override {}
absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
return absl::nullopt;
}
int packets_sent() const { return packets_sent_; }
int padding_sent() const { return padding_sent_; }
int total_packets_sent() const { return packets_sent_ + padding_sent_; }

View File

@ -86,10 +86,10 @@ void PacketRouter::AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module,
}
void PacketRouter::RemoveSendRtpModuleFromMap(uint32_t ssrc) {
auto kv = send_modules_map_.find(ssrc);
RTC_DCHECK(kv != send_modules_map_.end());
send_modules_list_.remove(kv->second);
send_modules_map_.erase(kv);
auto it = send_modules_map_.find(ssrc);
RTC_DCHECK(it != send_modules_map_.end());
send_modules_list_.remove(it->second);
send_modules_map_.erase(it);
}
void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) {
@ -151,8 +151,8 @@ void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
}
uint32_t ssrc = packet->Ssrc();
auto kv = send_modules_map_.find(ssrc);
if (kv == send_modules_map_.end()) {
auto it = send_modules_map_.find(ssrc);
if (it == send_modules_map_.end()) {
RTC_LOG(LS_WARNING)
<< "Failed to send packet, matching RTP module not found "
"or transport error. SSRC = "
@ -160,7 +160,7 @@ void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
return;
}
RtpRtcpInterface* rtp_module = kv->second;
RtpRtcpInterface* rtp_module = it->second;
if (!rtp_module->TrySendPacket(packet.get(), cluster_info)) {
RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
return;
@ -235,6 +235,27 @@ std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
return padding_packets;
}
void PacketRouter::OnAbortedRetransmissions(
uint32_t ssrc,
rtc::ArrayView<const uint16_t> sequence_numbers) {
MutexLock lock(&modules_mutex_);
auto it = send_modules_map_.find(ssrc);
if (it != send_modules_map_.end()) {
it->second->OnAbortedRetransmissions(sequence_numbers);
}
}
absl::optional<uint32_t> PacketRouter::GetRtxSsrcForMedia(uint32_t ssrc) const {
MutexLock lock(&modules_mutex_);
auto it = send_modules_map_.find(ssrc);
if (it != send_modules_map_.end() && it->second->SSRC() == ssrc) {
// A module is registered with the given SSRC, and that SSRC is the main
// media SSRC for that RTP module.
return it->second->RtxSsrc();
}
return absl::nullopt;
}
uint16_t PacketRouter::CurrentTransportSequenceNumber() const {
MutexLock lock(&modules_mutex_);
return transport_seq_ & 0xFFFF;

View File

@ -58,6 +58,10 @@ class PacketRouter : public PacingController::PacketSender {
std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() override;
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override;
void OnAbortedRetransmissions(
uint32_t ssrc,
rtc::ArrayView<const uint16_t> sequence_numbers) override;
absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const override;
uint16_t CurrentTransportSequenceNumber() const;

View File

@ -36,6 +36,7 @@ namespace {
using ::testing::_;
using ::testing::AnyNumber;
using ::testing::AtLeast;
using ::testing::ElementsAreArray;
using ::testing::Field;
using ::testing::Gt;
using ::testing::Le;
@ -436,6 +437,68 @@ TEST_F(PacketRouterTest, DoesNotIncrementTransportSequenceNumberOnSendFailure) {
packet_router_.RemoveSendRtpModule(&rtp);
}
TEST_F(PacketRouterTest, ForwardsAbortedRetransmissions) {
NiceMock<MockRtpRtcpInterface> rtp_1;
NiceMock<MockRtpRtcpInterface> rtp_2;
const uint32_t kSsrc1 = 1234;
const uint32_t kSsrc2 = 2345;
const uint32_t kInvalidSsrc = 3456;
ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
packet_router_.AddSendRtpModule(&rtp_1, false);
packet_router_.AddSendRtpModule(&rtp_2, false);
// Sets of retransmission sequence numbers we wish to abort, per ssrc.
const uint16_t kAbortedRetransmissionsOnSsrc1[] = {17, 42};
const uint16_t kAbortedRetransmissionsOnSsrc2[] = {1337, 4711};
const uint16_t kAbortedRetransmissionsOnSsrc3[] = {123};
EXPECT_CALL(rtp_1, OnAbortedRetransmissions(
ElementsAreArray(kAbortedRetransmissionsOnSsrc1)));
EXPECT_CALL(rtp_2, OnAbortedRetransmissions(
ElementsAreArray(kAbortedRetransmissionsOnSsrc2)));
packet_router_.OnAbortedRetransmissions(kSsrc1,
kAbortedRetransmissionsOnSsrc1);
packet_router_.OnAbortedRetransmissions(kSsrc2,
kAbortedRetransmissionsOnSsrc2);
// Should be noop and not cause any issues.
packet_router_.OnAbortedRetransmissions(kInvalidSsrc,
kAbortedRetransmissionsOnSsrc3);
packet_router_.RemoveSendRtpModule(&rtp_1);
packet_router_.RemoveSendRtpModule(&rtp_2);
}
TEST_F(PacketRouterTest, ReportsRtxSsrc) {
NiceMock<MockRtpRtcpInterface> rtp_1;
NiceMock<MockRtpRtcpInterface> rtp_2;
const uint32_t kSsrc1 = 1234;
const uint32_t kRtxSsrc1 = 1235;
const uint32_t kSsrc2 = 2345;
const uint32_t kInvalidSsrc = 3456;
ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
ON_CALL(rtp_1, RtxSsrc).WillByDefault(Return(kRtxSsrc1));
ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
packet_router_.AddSendRtpModule(&rtp_1, false);
packet_router_.AddSendRtpModule(&rtp_2, false);
EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kSsrc1), kRtxSsrc1);
EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kRtxSsrc1), absl::nullopt);
EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kSsrc2), absl::nullopt);
EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kInvalidSsrc), absl::nullopt);
packet_router_.RemoveSendRtpModule(&rtp_1);
packet_router_.RemoveSendRtpModule(&rtp_2);
}
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
using PacketRouterDeathTest = PacketRouterTest;
TEST_F(PacketRouterDeathTest, DoubleRegistrationOfSendModuleDisallowed) {