in RtcpTransceiver implement sending rtcp sender reports

Bug: webrtc:8239
Change-Id: Id3298bf4e0eb18a3fc8072fb19416e67a126705f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249788
Reviewed-by: Emil Lundmark <lndmrk@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35995}
This commit is contained in:
Danil Chapovalov 2022-02-14 15:24:30 +01:00 committed by WebRTC LUCI CQ
parent 66bfd6f57d
commit f2b987377b
5 changed files with 298 additions and 37 deletions

View File

@ -377,6 +377,7 @@ rtc_library("rtcp_transceiver") {
"../../api/units:timestamp",
"../../api/video:video_bitrate_allocation",
"../../rtc_base:checks",
"../../rtc_base:divide_round",
"../../rtc_base:rtc_base_approved",
"../../rtc_base/containers:flat_map",
"../../rtc_base/task_utils:repeating_task",

View File

@ -64,6 +64,41 @@ class MediaReceiverRtcpObserver {
const VideoBitrateAllocation& allocation) {}
};
// Handles RTCP related messages for a single RTP stream (i.e. single SSRC)
class RtpStreamRtcpHandler {
public:
virtual ~RtpStreamRtcpHandler() = default;
// Statistic about sent RTP packets to propagate to RTCP sender report.
class RtpStats {
public:
RtpStats() = default;
RtpStats(const RtpStats&) = default;
RtpStats& operator=(const RtpStats&) = default;
~RtpStats() = default;
size_t num_sent_packets() const { return num_sent_packets_; }
size_t num_sent_bytes() const { return num_sent_bytes_; }
Timestamp last_capture_time() const { return last_capture_time_; }
uint32_t last_rtp_timestamp() const { return last_rtp_timestamp_; }
int last_clock_rate() const { return last_clock_rate_; }
void set_num_sent_packets(size_t v) { num_sent_packets_ = v; }
void set_num_sent_bytes(size_t v) { num_sent_bytes_ = v; }
void set_last_capture_time(Timestamp v) { last_capture_time_ = v; }
void set_last_rtp_timestamp(uint32_t v) { last_rtp_timestamp_ = v; }
void set_last_clock_rate(int v) { last_clock_rate_ = v; }
private:
size_t num_sent_packets_ = 0;
size_t num_sent_bytes_ = 0;
Timestamp last_capture_time_ = Timestamp::Zero();
uint32_t last_rtp_timestamp_ = 0;
int last_clock_rate_ = 90'000;
};
virtual RtpStats SentStats() = 0;
};
struct RtcpTransceiverConfig {
RtcpTransceiverConfig();
RtcpTransceiverConfig(const RtcpTransceiverConfig&);

View File

@ -33,6 +33,7 @@
#include "modules/rtp_rtcp/source/time_util.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/divide_round.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
@ -53,6 +54,11 @@ struct RtcpTransceiverImpl::RemoteSenderState {
std::vector<MediaReceiverRtcpObserver*> observers;
};
struct RtcpTransceiverImpl::LocalSenderState {
uint32_t ssrc;
RtpStreamRtcpHandler* handler = nullptr;
};
// Helper to put several RTCP packets into lower layer datagram composing
// Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2.
// TODO(danilchap): When in compound mode and packets are so many that several
@ -120,6 +126,32 @@ void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver(
stored.erase(it);
}
bool RtcpTransceiverImpl::AddMediaSender(uint32_t local_ssrc,
RtpStreamRtcpHandler* handler) {
RTC_DCHECK(handler != nullptr);
LocalSenderState state;
state.ssrc = local_ssrc;
state.handler = handler;
local_senders_.push_back(state);
auto it = std::prev(local_senders_.end());
auto [unused, inserted] = local_senders_by_ssrc_.emplace(local_ssrc, it);
if (!inserted) {
local_senders_.pop_back();
return false;
}
return true;
}
bool RtcpTransceiverImpl::RemoveMediaSender(uint32_t local_ssrc) {
auto index_it = local_senders_by_ssrc_.find(local_ssrc);
if (index_it == local_senders_by_ssrc_.end()) {
return false;
}
local_senders_.erase(index_it->second);
local_senders_by_ssrc_.erase(index_it);
return true;
}
void RtcpTransceiverImpl::SetReadyToSend(bool ready) {
if (config_.schedule_periodic_compound_packets) {
if (ready_to_send_ && !ready)
@ -442,9 +474,10 @@ void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
});
}
void RtcpTransceiverImpl::FillReports(Timestamp now,
size_t reserved_bytes,
PacketSender& rtcp_sender) {
RtcpTransceiverImpl::CompoundPacketInfo RtcpTransceiverImpl::FillReports(
Timestamp now,
size_t reserved_bytes,
PacketSender& rtcp_sender) {
// Sender/receiver reports should be first in the RTCP packet.
RTC_DCHECK(rtcp_sender.IsEmpty());
@ -457,42 +490,95 @@ void RtcpTransceiverImpl::FillReports(Timestamp now,
available_bytes -= reserved_bytes;
}
static constexpr size_t kReceiverReportSizeBytes = 8;
static constexpr size_t kFullReceiverReportSizeBytes =
kReceiverReportSizeBytes +
rtcp::ReceiverReport::kMaxNumberOfReportBlocks *
rtcp::ReportBlock::kLength;
size_t max_full_receiver_reports =
available_bytes / kFullReceiverReportSizeBytes;
size_t max_report_blocks = max_full_receiver_reports *
rtcp::ReceiverReport::kMaxNumberOfReportBlocks;
size_t available_bytes_for_last_receiver_report =
available_bytes -
max_full_receiver_reports * kFullReceiverReportSizeBytes;
if (available_bytes_for_last_receiver_report >= kReceiverReportSizeBytes) {
CompoundPacketInfo result;
result.sender_ssrc = config_.feedback_ssrc;
result.has_sender_report = false;
static constexpr size_t kSenderReportSizeBytes = 28;
static constexpr size_t kFullSenderReportSizeBytes =
kSenderReportSizeBytes +
rtcp::SenderReport::kMaxNumberOfReportBlocks * rtcp::ReportBlock::kLength;
size_t max_full_sender_reports = available_bytes / kFullSenderReportSizeBytes;
size_t max_report_blocks =
max_full_sender_reports * rtcp::SenderReport::kMaxNumberOfReportBlocks;
size_t available_bytes_for_last_sender_report =
available_bytes - max_full_sender_reports * kFullSenderReportSizeBytes;
if (available_bytes_for_last_sender_report >= kSenderReportSizeBytes) {
max_report_blocks +=
(available_bytes_for_last_receiver_report - kReceiverReportSizeBytes) /
(available_bytes_for_last_sender_report - kSenderReportSizeBytes) /
rtcp::ReportBlock::kLength;
}
std::vector<rtcp::ReportBlock> report_blocks =
CreateReportBlocks(now, max_report_blocks);
// Previous calculation of max number of sender report made space for max
// number of report blocks per sender report, but if number of report blocks
// is low, more sender reports may fit in.
size_t max_sender_reports =
(available_bytes - report_blocks.size() * rtcp::ReportBlock::kLength) /
kSenderReportSizeBytes;
auto last_handled_sender_it = local_senders_.end();
auto report_block_it = report_blocks.begin();
size_t num_sender_reports = 0;
for (auto it = local_senders_.begin();
it != local_senders_.end() && num_sender_reports < max_sender_reports;
++it) {
LocalSenderState& rtp_sender = *it;
RtpStreamRtcpHandler::RtpStats stats = rtp_sender.handler->SentStats();
last_handled_sender_it = it;
rtcp::SenderReport sender_report;
sender_report.SetSenderSsrc(rtp_sender.ssrc);
sender_report.SetPacketCount(stats.num_sent_packets());
sender_report.SetOctetCount(stats.num_sent_bytes());
sender_report.SetNtp(config_.clock->ConvertTimestampToNtpTime(now));
RTC_DCHECK_GE(now, stats.last_capture_time());
sender_report.SetRtpTimestamp(
stats.last_rtp_timestamp() +
((now - stats.last_capture_time()) * stats.last_clock_rate())
.seconds());
if (report_block_it != report_blocks.end()) {
size_t num_blocks =
std::min<size_t>(rtcp::SenderReport::kMaxNumberOfReportBlocks,
report_blocks.end() - report_block_it);
std::vector<rtcp::ReportBlock> sub_blocks(report_block_it,
report_block_it + num_blocks);
sender_report.SetReportBlocks(std::move(sub_blocks));
report_block_it += num_blocks;
}
rtcp_sender.AppendPacket(sender_report);
++num_sender_reports;
if (!result.has_sender_report) {
result.has_sender_report = true;
result.sender_ssrc = rtp_sender.ssrc;
}
}
if (last_handled_sender_it != local_senders_.end()) {
// Rotate `local_senders_` so that the 1st unhandled sender become first in
// the list, and thus will be first to generate rtcp sender report for on
// the next call to `FillReports`.
local_senders_.splice(local_senders_.end(), local_senders_,
local_senders_.begin(),
std::next(last_handled_sender_it));
}
// Calculcate number of receiver reports to attach remaining report blocks to.
size_t num_receiver_reports =
(report_blocks.size() + rtcp::ReceiverReport::kMaxNumberOfReportBlocks -
1) /
rtcp::ReceiverReport::kMaxNumberOfReportBlocks;
DivideRoundUp(report_blocks.end() - report_block_it,
rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
// In compund mode each RTCP packet has to start with a sender or receiver
// In compound mode each RTCP packet has to start with a sender or receiver
// report.
if (config_.rtcp_mode == RtcpMode::kCompound && num_receiver_reports == 0) {
if (config_.rtcp_mode == RtcpMode::kCompound && num_sender_reports == 0 &&
num_receiver_reports == 0) {
num_receiver_reports = 1;
}
auto report_block_it = report_blocks.begin();
for (size_t i = 0; i < num_receiver_reports; ++i) {
rtcp::ReceiverReport receiver_report;
receiver_report.SetSenderSsrc(config_.feedback_ssrc);
receiver_report.SetSenderSsrc(result.sender_ssrc);
size_t num_blocks =
std::min<size_t>(rtcp::ReceiverReport::kMaxNumberOfReportBlocks,
report_blocks.end() - report_block_it);
@ -504,6 +590,7 @@ void RtcpTransceiverImpl::FillReports(Timestamp now,
}
// All report blocks should be attached at this point.
RTC_DCHECK_EQ(report_blocks.end() - report_block_it, 0);
return result;
}
void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now,
@ -526,22 +613,18 @@ void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now,
reserved_bytes += (4 + 4 + rtcp::Rrtr::kLength);
}
FillReports(now, reserved_bytes, sender);
const uint32_t sender_ssrc = config_.feedback_ssrc;
CompoundPacketInfo result = FillReports(now, reserved_bytes, sender);
if (sdes.has_value() && !sender.IsEmpty()) {
sender.AppendPacket(*sdes);
}
if (remb_.has_value()) {
remb_->SetSenderSsrc(sender_ssrc);
remb_->SetSenderSsrc(result.sender_ssrc);
sender.AppendPacket(*remb_);
}
// TODO(bugs.webrtc.org/8239): Do not send rrtr if this packet starts with
// SenderReport instead of ReceiverReport
// when RtcpTransceiver supports rtp senders.
if (config_.non_sender_rtt_measurement) {
if (!result.has_sender_report && config_.non_sender_rtt_measurement) {
rtcp::ExtendedReports xr;
xr.SetSenderSsrc(sender_ssrc);
xr.SetSenderSsrc(result.sender_ssrc);
rtcp::Rrtr rrtr;
rrtr.SetNtp(config_.clock->ConvertTimestampToNtpTime(now));

View File

@ -11,7 +11,7 @@
#ifndef MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_
#define MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_
#include <map>
#include <list>
#include <memory>
#include <string>
#include <vector>
@ -48,6 +48,11 @@ class RtcpTransceiverImpl {
void RemoveMediaReceiverRtcpObserver(uint32_t remote_ssrc,
MediaReceiverRtcpObserver* observer);
// Returns false on failure, e.g. when there is already an handler for the
// `local_ssrc`.
bool AddMediaSender(uint32_t local_ssrc, RtpStreamRtcpHandler* handler);
bool RemoveMediaSender(uint32_t local_ssrc);
void SetReadyToSend(bool ready);
void ReceivePacket(rtc::ArrayView<const uint8_t> packet, Timestamp now);
@ -76,6 +81,7 @@ class RtcpTransceiverImpl {
private:
class PacketSender;
struct RemoteSenderState;
struct LocalSenderState;
void HandleReceivedPacket(const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now,
@ -104,11 +110,16 @@ class RtcpTransceiverImpl {
void ReschedulePeriodicCompoundPackets();
void SchedulePeriodicCompoundPackets(int64_t delay_ms);
// Appends RTCP receiver reports with attached report blocks to the `sender`.
// Appends RTCP sender and receiver reports to the `sender`.
// Both sender and receiver reports may have attached report blocks.
// Uses up to `config_.max_packet_size - reserved_bytes`
void FillReports(Timestamp now,
size_t reserved_bytes,
PacketSender& rtcp_sender);
struct CompoundPacketInfo {
uint32_t sender_ssrc;
bool has_sender_report;
};
CompoundPacketInfo FillReports(Timestamp now,
size_t reserved_bytes,
PacketSender& rtcp_sender);
// Creates compound RTCP packet, as defined in
// https://tools.ietf.org/html/rfc5506#section-2
@ -130,6 +141,9 @@ class RtcpTransceiverImpl {
// TODO(danilchap): Remove entries from remote_senders_ that are no longer
// needed.
flat_map<uint32_t, RemoteSenderState> remote_senders_;
std::list<LocalSenderState> local_senders_;
flat_map<uint32_t, std::list<LocalSenderState>::iterator>
local_senders_by_ssrc_;
RepeatingTaskHandle periodic_task_handle_;
};

View File

@ -67,6 +67,11 @@ class MockMediaReceiverRtcpObserver : public MediaReceiverRtcpObserver {
(override));
};
class MockRtpStreamRtcpHandler : public RtpStreamRtcpHandler {
public:
MOCK_METHOD(RtpStats, SentStats, (), (override));
};
class MockNetworkLinkRtcpObserver : public NetworkLinkRtcpObserver {
public:
MOCK_METHOD(void,
@ -1344,5 +1349,128 @@ TEST(RtcpTransceiverImplTest,
rtcp_transceiver.ReceivePacket(packet.Build(), receive_time);
}
TEST(RtcpTransceiverImplTest, FailsToRegisterTwoSendersWithTheSameSsrc) {
RtcpTransceiverImpl rtcp_transceiver(DefaultTestConfig());
MockRtpStreamRtcpHandler sender1;
MockRtpStreamRtcpHandler sender2;
EXPECT_TRUE(rtcp_transceiver.AddMediaSender(/*local_ssrc=*/10001, &sender1));
EXPECT_FALSE(rtcp_transceiver.AddMediaSender(/*local_ssrc=*/10001, &sender2));
EXPECT_TRUE(rtcp_transceiver.AddMediaSender(/*local_ssrc=*/10002, &sender2));
EXPECT_TRUE(rtcp_transceiver.RemoveMediaSender(/*local_ssrc=*/10001));
EXPECT_FALSE(rtcp_transceiver.RemoveMediaSender(/*local_ssrc=*/10001));
}
TEST(RtcpTransceiverImplTest, SendsSenderReport) {
static constexpr uint32_t kFeedbackSsrc = 123;
static constexpr uint32_t kSenderSsrc = 12345;
SimulatedClock clock(100'000'000);
RtcpTransceiverConfig config;
config.clock = &clock;
config.feedback_ssrc = kFeedbackSsrc;
RtcpPacketParser rtcp_parser;
RtcpParserTransport transport(&rtcp_parser);
config.outgoing_transport = &transport;
config.schedule_periodic_compound_packets = false;
RtcpTransceiverImpl rtcp_transceiver(config);
RtpStreamRtcpHandler::RtpStats sender_stats;
sender_stats.set_num_sent_packets(10);
sender_stats.set_num_sent_bytes(1000);
sender_stats.set_last_rtp_timestamp(0x3333);
sender_stats.set_last_capture_time(clock.CurrentTime() -
TimeDelta::Seconds(2));
sender_stats.set_last_clock_rate(0x1000);
MockRtpStreamRtcpHandler sender;
ON_CALL(sender, SentStats).WillByDefault(Return(sender_stats));
rtcp_transceiver.AddMediaSender(kSenderSsrc, &sender);
rtcp_transceiver.SendCompoundPacket();
ASSERT_GT(rtcp_parser.sender_report()->num_packets(), 0);
EXPECT_EQ(rtcp_parser.sender_report()->sender_ssrc(), kSenderSsrc);
EXPECT_EQ(rtcp_parser.sender_report()->ntp(), clock.CurrentNtpTime());
EXPECT_EQ(rtcp_parser.sender_report()->rtp_timestamp(), 0x3333u + 0x2000u);
EXPECT_EQ(rtcp_parser.sender_report()->sender_packet_count(), 10u);
EXPECT_EQ(rtcp_parser.sender_report()->sender_octet_count(), 1000u);
}
TEST(RtcpTransceiverImplTest,
MaySendBothSenderReportAndReceiverReportInTheSamePacket) {
RtcpPacketParser rtcp_parser;
RtcpParserTransport transport(&rtcp_parser);
std::vector<ReportBlock> statistics_report_blocks(40);
MockReceiveStatisticsProvider receive_statistics;
EXPECT_CALL(receive_statistics, RtcpReportBlocks(/*max_blocks=*/Ge(40u)))
.WillOnce(Return(statistics_report_blocks));
RtcpTransceiverConfig config = DefaultTestConfig();
config.outgoing_transport = &transport;
config.receive_statistics = &receive_statistics;
RtcpTransceiverImpl rtcp_transceiver(config);
MockRtpStreamRtcpHandler sender;
rtcp_transceiver.AddMediaSender(/*ssrc=*/12345, &sender);
rtcp_transceiver.SendCompoundPacket();
// Expect a single RTCP packet with a sender and a receiver reports in it.
EXPECT_EQ(transport.num_packets(), 1);
ASSERT_EQ(rtcp_parser.sender_report()->num_packets(), 1);
ASSERT_EQ(rtcp_parser.receiver_report()->num_packets(), 1);
// Sender report may contain up to 31 report blocks, thus remaining 9 report
// block should be attached to the receiver report.
EXPECT_THAT(rtcp_parser.sender_report()->report_blocks(), SizeIs(31));
EXPECT_THAT(rtcp_parser.receiver_report()->report_blocks(), SizeIs(9));
}
TEST(RtcpTransceiverImplTest, RotatesSendersWhenAllSenderReportDoNotFit) {
// Send 6 compound packet, each should contain 5 sender reports,
// each of 6 senders should be mentioned 5 times.
static constexpr int kNumSenders = 6;
static constexpr uint32_t kSenderSsrc[kNumSenders] = {10, 20, 30, 40, 50, 60};
static constexpr int kSendersPerPacket = 5;
SimulatedClock clock(100'000'000);
// RtcpPacketParser remembers only latest block for each type, but this test
// is about sending multiple sender reports in the same packet, thus need
// a more advance parser: RtcpTranceiver
RtcpTransceiverConfig receiver_config = DefaultTestConfig();
receiver_config.clock = &clock;
RtcpTransceiverImpl rtcp_receiver(receiver_config);
// Main expectatation: all senders are spread equally across multiple packets.
NiceMock<MockMediaReceiverRtcpObserver> receiver[kNumSenders];
for (int i = 0; i < kNumSenders; ++i) {
SCOPED_TRACE(i);
EXPECT_CALL(receiver[i], OnSenderReport(kSenderSsrc[i], _, _))
.Times(kSendersPerPacket);
rtcp_receiver.AddMediaReceiverRtcpObserver(kSenderSsrc[i], &receiver[i]);
}
MockTransport transport;
EXPECT_CALL(transport, SendRtcp)
.Times(kNumSenders)
.WillRepeatedly([&](const uint8_t* data, size_t size) {
rtcp_receiver.ReceivePacket(rtc::MakeArrayView(data, size),
clock.CurrentTime());
return true;
});
RtcpTransceiverConfig config = DefaultTestConfig();
config.clock = &clock;
config.outgoing_transport = &transport;
// Limit packet to have space just for kSendersPerPacket sender reports.
// Sender report without report blocks require 28 bytes.
config.max_packet_size = kSendersPerPacket * 28;
RtcpTransceiverImpl rtcp_transceiver(config);
NiceMock<MockRtpStreamRtcpHandler> sender[kNumSenders];
for (int i = 0; i < kNumSenders; ++i) {
rtcp_transceiver.AddMediaSender(kSenderSsrc[i], &sender[i]);
}
for (int i = 1; i <= kNumSenders; ++i) {
SCOPED_TRACE(i);
rtcp_transceiver.SendCompoundPacket();
}
}
} // namespace
} // namespace webrtc