From f2b987377b2df11731d085a5efd5001cde0a647b Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Mon, 14 Feb 2022 15:24:30 +0100 Subject: [PATCH] in RtcpTransceiver implement sending rtcp sender reports Bug: webrtc:8239 Change-Id: Id3298bf4e0eb18a3fc8072fb19416e67a126705f Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249788 Reviewed-by: Emil Lundmark Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#35995} --- modules/rtp_rtcp/BUILD.gn | 1 + .../rtp_rtcp/source/rtcp_transceiver_config.h | 35 +++++ .../rtp_rtcp/source/rtcp_transceiver_impl.cc | 147 ++++++++++++++---- .../rtp_rtcp/source/rtcp_transceiver_impl.h | 24 ++- .../source/rtcp_transceiver_impl_unittest.cc | 128 +++++++++++++++ 5 files changed, 298 insertions(+), 37 deletions(-) diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index 685fc9ff82..9922484647 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -377,6 +377,7 @@ rtc_library("rtcp_transceiver") { "../../api/units:timestamp", "../../api/video:video_bitrate_allocation", "../../rtc_base:checks", + "../../rtc_base:divide_round", "../../rtc_base:rtc_base_approved", "../../rtc_base/containers:flat_map", "../../rtc_base/task_utils:repeating_task", diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_config.h b/modules/rtp_rtcp/source/rtcp_transceiver_config.h index 73b933d0a8..0be633fc61 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_config.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver_config.h @@ -64,6 +64,41 @@ class MediaReceiverRtcpObserver { const VideoBitrateAllocation& allocation) {} }; +// Handles RTCP related messages for a single RTP stream (i.e. single SSRC) +class RtpStreamRtcpHandler { + public: + virtual ~RtpStreamRtcpHandler() = default; + + // Statistic about sent RTP packets to propagate to RTCP sender report. + class RtpStats { + public: + RtpStats() = default; + RtpStats(const RtpStats&) = default; + RtpStats& operator=(const RtpStats&) = default; + ~RtpStats() = default; + + size_t num_sent_packets() const { return num_sent_packets_; } + size_t num_sent_bytes() const { return num_sent_bytes_; } + Timestamp last_capture_time() const { return last_capture_time_; } + uint32_t last_rtp_timestamp() const { return last_rtp_timestamp_; } + int last_clock_rate() const { return last_clock_rate_; } + + void set_num_sent_packets(size_t v) { num_sent_packets_ = v; } + void set_num_sent_bytes(size_t v) { num_sent_bytes_ = v; } + void set_last_capture_time(Timestamp v) { last_capture_time_ = v; } + void set_last_rtp_timestamp(uint32_t v) { last_rtp_timestamp_ = v; } + void set_last_clock_rate(int v) { last_clock_rate_ = v; } + + private: + size_t num_sent_packets_ = 0; + size_t num_sent_bytes_ = 0; + Timestamp last_capture_time_ = Timestamp::Zero(); + uint32_t last_rtp_timestamp_ = 0; + int last_clock_rate_ = 90'000; + }; + virtual RtpStats SentStats() = 0; +}; + struct RtcpTransceiverConfig { RtcpTransceiverConfig(); RtcpTransceiverConfig(const RtcpTransceiverConfig&); diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc index d918056743..94451efcd7 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc @@ -33,6 +33,7 @@ #include "modules/rtp_rtcp/source/time_util.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/numerics/divide_round.h" #include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" @@ -53,6 +54,11 @@ struct RtcpTransceiverImpl::RemoteSenderState { std::vector observers; }; +struct RtcpTransceiverImpl::LocalSenderState { + uint32_t ssrc; + RtpStreamRtcpHandler* handler = nullptr; +}; + // Helper to put several RTCP packets into lower layer datagram composing // Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2. // TODO(danilchap): When in compound mode and packets are so many that several @@ -120,6 +126,32 @@ void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver( stored.erase(it); } +bool RtcpTransceiverImpl::AddMediaSender(uint32_t local_ssrc, + RtpStreamRtcpHandler* handler) { + RTC_DCHECK(handler != nullptr); + LocalSenderState state; + state.ssrc = local_ssrc; + state.handler = handler; + local_senders_.push_back(state); + auto it = std::prev(local_senders_.end()); + auto [unused, inserted] = local_senders_by_ssrc_.emplace(local_ssrc, it); + if (!inserted) { + local_senders_.pop_back(); + return false; + } + return true; +} + +bool RtcpTransceiverImpl::RemoveMediaSender(uint32_t local_ssrc) { + auto index_it = local_senders_by_ssrc_.find(local_ssrc); + if (index_it == local_senders_by_ssrc_.end()) { + return false; + } + local_senders_.erase(index_it->second); + local_senders_by_ssrc_.erase(index_it); + return true; +} + void RtcpTransceiverImpl::SetReadyToSend(bool ready) { if (config_.schedule_periodic_compound_packets) { if (ready_to_send_ && !ready) @@ -442,9 +474,10 @@ void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) { }); } -void RtcpTransceiverImpl::FillReports(Timestamp now, - size_t reserved_bytes, - PacketSender& rtcp_sender) { +RtcpTransceiverImpl::CompoundPacketInfo RtcpTransceiverImpl::FillReports( + Timestamp now, + size_t reserved_bytes, + PacketSender& rtcp_sender) { // Sender/receiver reports should be first in the RTCP packet. RTC_DCHECK(rtcp_sender.IsEmpty()); @@ -457,42 +490,95 @@ void RtcpTransceiverImpl::FillReports(Timestamp now, available_bytes -= reserved_bytes; } - static constexpr size_t kReceiverReportSizeBytes = 8; - static constexpr size_t kFullReceiverReportSizeBytes = - kReceiverReportSizeBytes + - rtcp::ReceiverReport::kMaxNumberOfReportBlocks * - rtcp::ReportBlock::kLength; - size_t max_full_receiver_reports = - available_bytes / kFullReceiverReportSizeBytes; - size_t max_report_blocks = max_full_receiver_reports * - rtcp::ReceiverReport::kMaxNumberOfReportBlocks; - size_t available_bytes_for_last_receiver_report = - available_bytes - - max_full_receiver_reports * kFullReceiverReportSizeBytes; - if (available_bytes_for_last_receiver_report >= kReceiverReportSizeBytes) { + CompoundPacketInfo result; + result.sender_ssrc = config_.feedback_ssrc; + result.has_sender_report = false; + + static constexpr size_t kSenderReportSizeBytes = 28; + static constexpr size_t kFullSenderReportSizeBytes = + kSenderReportSizeBytes + + rtcp::SenderReport::kMaxNumberOfReportBlocks * rtcp::ReportBlock::kLength; + size_t max_full_sender_reports = available_bytes / kFullSenderReportSizeBytes; + size_t max_report_blocks = + max_full_sender_reports * rtcp::SenderReport::kMaxNumberOfReportBlocks; + size_t available_bytes_for_last_sender_report = + available_bytes - max_full_sender_reports * kFullSenderReportSizeBytes; + if (available_bytes_for_last_sender_report >= kSenderReportSizeBytes) { max_report_blocks += - (available_bytes_for_last_receiver_report - kReceiverReportSizeBytes) / + (available_bytes_for_last_sender_report - kSenderReportSizeBytes) / rtcp::ReportBlock::kLength; } std::vector report_blocks = CreateReportBlocks(now, max_report_blocks); + // Previous calculation of max number of sender report made space for max + // number of report blocks per sender report, but if number of report blocks + // is low, more sender reports may fit in. + size_t max_sender_reports = + (available_bytes - report_blocks.size() * rtcp::ReportBlock::kLength) / + kSenderReportSizeBytes; + auto last_handled_sender_it = local_senders_.end(); + auto report_block_it = report_blocks.begin(); + size_t num_sender_reports = 0; + for (auto it = local_senders_.begin(); + it != local_senders_.end() && num_sender_reports < max_sender_reports; + ++it) { + LocalSenderState& rtp_sender = *it; + RtpStreamRtcpHandler::RtpStats stats = rtp_sender.handler->SentStats(); + + last_handled_sender_it = it; + rtcp::SenderReport sender_report; + sender_report.SetSenderSsrc(rtp_sender.ssrc); + sender_report.SetPacketCount(stats.num_sent_packets()); + sender_report.SetOctetCount(stats.num_sent_bytes()); + sender_report.SetNtp(config_.clock->ConvertTimestampToNtpTime(now)); + RTC_DCHECK_GE(now, stats.last_capture_time()); + sender_report.SetRtpTimestamp( + stats.last_rtp_timestamp() + + ((now - stats.last_capture_time()) * stats.last_clock_rate()) + .seconds()); + if (report_block_it != report_blocks.end()) { + size_t num_blocks = + std::min(rtcp::SenderReport::kMaxNumberOfReportBlocks, + report_blocks.end() - report_block_it); + std::vector sub_blocks(report_block_it, + report_block_it + num_blocks); + sender_report.SetReportBlocks(std::move(sub_blocks)); + report_block_it += num_blocks; + } + rtcp_sender.AppendPacket(sender_report); + ++num_sender_reports; + + if (!result.has_sender_report) { + result.has_sender_report = true; + result.sender_ssrc = rtp_sender.ssrc; + } + } + if (last_handled_sender_it != local_senders_.end()) { + // Rotate `local_senders_` so that the 1st unhandled sender become first in + // the list, and thus will be first to generate rtcp sender report for on + // the next call to `FillReports`. + local_senders_.splice(local_senders_.end(), local_senders_, + local_senders_.begin(), + std::next(last_handled_sender_it)); + } + + // Calculcate number of receiver reports to attach remaining report blocks to. size_t num_receiver_reports = - (report_blocks.size() + rtcp::ReceiverReport::kMaxNumberOfReportBlocks - - 1) / - rtcp::ReceiverReport::kMaxNumberOfReportBlocks; + DivideRoundUp(report_blocks.end() - report_block_it, + rtcp::ReceiverReport::kMaxNumberOfReportBlocks); - // In compund mode each RTCP packet has to start with a sender or receiver + // In compound mode each RTCP packet has to start with a sender or receiver // report. - if (config_.rtcp_mode == RtcpMode::kCompound && num_receiver_reports == 0) { + if (config_.rtcp_mode == RtcpMode::kCompound && num_sender_reports == 0 && + num_receiver_reports == 0) { num_receiver_reports = 1; } - auto report_block_it = report_blocks.begin(); for (size_t i = 0; i < num_receiver_reports; ++i) { rtcp::ReceiverReport receiver_report; - receiver_report.SetSenderSsrc(config_.feedback_ssrc); + receiver_report.SetSenderSsrc(result.sender_ssrc); size_t num_blocks = std::min(rtcp::ReceiverReport::kMaxNumberOfReportBlocks, report_blocks.end() - report_block_it); @@ -504,6 +590,7 @@ void RtcpTransceiverImpl::FillReports(Timestamp now, } // All report blocks should be attached at this point. RTC_DCHECK_EQ(report_blocks.end() - report_block_it, 0); + return result; } void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now, @@ -526,22 +613,18 @@ void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now, reserved_bytes += (4 + 4 + rtcp::Rrtr::kLength); } - FillReports(now, reserved_bytes, sender); - const uint32_t sender_ssrc = config_.feedback_ssrc; + CompoundPacketInfo result = FillReports(now, reserved_bytes, sender); if (sdes.has_value() && !sender.IsEmpty()) { sender.AppendPacket(*sdes); } if (remb_.has_value()) { - remb_->SetSenderSsrc(sender_ssrc); + remb_->SetSenderSsrc(result.sender_ssrc); sender.AppendPacket(*remb_); } - // TODO(bugs.webrtc.org/8239): Do not send rrtr if this packet starts with - // SenderReport instead of ReceiverReport - // when RtcpTransceiver supports rtp senders. - if (config_.non_sender_rtt_measurement) { + if (!result.has_sender_report && config_.non_sender_rtt_measurement) { rtcp::ExtendedReports xr; - xr.SetSenderSsrc(sender_ssrc); + xr.SetSenderSsrc(result.sender_ssrc); rtcp::Rrtr rrtr; rrtr.SetNtp(config_.clock->ConvertTimestampToNtpTime(now)); diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h index 1339ba1e67..ce25899bcf 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h @@ -11,7 +11,7 @@ #ifndef MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_ #define MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_ -#include +#include #include #include #include @@ -48,6 +48,11 @@ class RtcpTransceiverImpl { void RemoveMediaReceiverRtcpObserver(uint32_t remote_ssrc, MediaReceiverRtcpObserver* observer); + // Returns false on failure, e.g. when there is already an handler for the + // `local_ssrc`. + bool AddMediaSender(uint32_t local_ssrc, RtpStreamRtcpHandler* handler); + bool RemoveMediaSender(uint32_t local_ssrc); + void SetReadyToSend(bool ready); void ReceivePacket(rtc::ArrayView packet, Timestamp now); @@ -76,6 +81,7 @@ class RtcpTransceiverImpl { private: class PacketSender; struct RemoteSenderState; + struct LocalSenderState; void HandleReceivedPacket(const rtcp::CommonHeader& rtcp_packet_header, Timestamp now, @@ -104,11 +110,16 @@ class RtcpTransceiverImpl { void ReschedulePeriodicCompoundPackets(); void SchedulePeriodicCompoundPackets(int64_t delay_ms); - // Appends RTCP receiver reports with attached report blocks to the `sender`. + // Appends RTCP sender and receiver reports to the `sender`. + // Both sender and receiver reports may have attached report blocks. // Uses up to `config_.max_packet_size - reserved_bytes` - void FillReports(Timestamp now, - size_t reserved_bytes, - PacketSender& rtcp_sender); + struct CompoundPacketInfo { + uint32_t sender_ssrc; + bool has_sender_report; + }; + CompoundPacketInfo FillReports(Timestamp now, + size_t reserved_bytes, + PacketSender& rtcp_sender); // Creates compound RTCP packet, as defined in // https://tools.ietf.org/html/rfc5506#section-2 @@ -130,6 +141,9 @@ class RtcpTransceiverImpl { // TODO(danilchap): Remove entries from remote_senders_ that are no longer // needed. flat_map remote_senders_; + std::list local_senders_; + flat_map::iterator> + local_senders_by_ssrc_; RepeatingTaskHandle periodic_task_handle_; }; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc index b957173493..0a69d29df4 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc @@ -67,6 +67,11 @@ class MockMediaReceiverRtcpObserver : public MediaReceiverRtcpObserver { (override)); }; +class MockRtpStreamRtcpHandler : public RtpStreamRtcpHandler { + public: + MOCK_METHOD(RtpStats, SentStats, (), (override)); +}; + class MockNetworkLinkRtcpObserver : public NetworkLinkRtcpObserver { public: MOCK_METHOD(void, @@ -1344,5 +1349,128 @@ TEST(RtcpTransceiverImplTest, rtcp_transceiver.ReceivePacket(packet.Build(), receive_time); } +TEST(RtcpTransceiverImplTest, FailsToRegisterTwoSendersWithTheSameSsrc) { + RtcpTransceiverImpl rtcp_transceiver(DefaultTestConfig()); + MockRtpStreamRtcpHandler sender1; + MockRtpStreamRtcpHandler sender2; + + EXPECT_TRUE(rtcp_transceiver.AddMediaSender(/*local_ssrc=*/10001, &sender1)); + EXPECT_FALSE(rtcp_transceiver.AddMediaSender(/*local_ssrc=*/10001, &sender2)); + EXPECT_TRUE(rtcp_transceiver.AddMediaSender(/*local_ssrc=*/10002, &sender2)); + + EXPECT_TRUE(rtcp_transceiver.RemoveMediaSender(/*local_ssrc=*/10001)); + EXPECT_FALSE(rtcp_transceiver.RemoveMediaSender(/*local_ssrc=*/10001)); +} + +TEST(RtcpTransceiverImplTest, SendsSenderReport) { + static constexpr uint32_t kFeedbackSsrc = 123; + static constexpr uint32_t kSenderSsrc = 12345; + SimulatedClock clock(100'000'000); + RtcpTransceiverConfig config; + config.clock = &clock; + config.feedback_ssrc = kFeedbackSsrc; + RtcpPacketParser rtcp_parser; + RtcpParserTransport transport(&rtcp_parser); + config.outgoing_transport = &transport; + config.schedule_periodic_compound_packets = false; + RtcpTransceiverImpl rtcp_transceiver(config); + + RtpStreamRtcpHandler::RtpStats sender_stats; + sender_stats.set_num_sent_packets(10); + sender_stats.set_num_sent_bytes(1000); + sender_stats.set_last_rtp_timestamp(0x3333); + sender_stats.set_last_capture_time(clock.CurrentTime() - + TimeDelta::Seconds(2)); + sender_stats.set_last_clock_rate(0x1000); + MockRtpStreamRtcpHandler sender; + ON_CALL(sender, SentStats).WillByDefault(Return(sender_stats)); + rtcp_transceiver.AddMediaSender(kSenderSsrc, &sender); + + rtcp_transceiver.SendCompoundPacket(); + + ASSERT_GT(rtcp_parser.sender_report()->num_packets(), 0); + EXPECT_EQ(rtcp_parser.sender_report()->sender_ssrc(), kSenderSsrc); + EXPECT_EQ(rtcp_parser.sender_report()->ntp(), clock.CurrentNtpTime()); + EXPECT_EQ(rtcp_parser.sender_report()->rtp_timestamp(), 0x3333u + 0x2000u); + EXPECT_EQ(rtcp_parser.sender_report()->sender_packet_count(), 10u); + EXPECT_EQ(rtcp_parser.sender_report()->sender_octet_count(), 1000u); +} + +TEST(RtcpTransceiverImplTest, + MaySendBothSenderReportAndReceiverReportInTheSamePacket) { + RtcpPacketParser rtcp_parser; + RtcpParserTransport transport(&rtcp_parser); + std::vector statistics_report_blocks(40); + MockReceiveStatisticsProvider receive_statistics; + EXPECT_CALL(receive_statistics, RtcpReportBlocks(/*max_blocks=*/Ge(40u))) + .WillOnce(Return(statistics_report_blocks)); + RtcpTransceiverConfig config = DefaultTestConfig(); + config.outgoing_transport = &transport; + config.receive_statistics = &receive_statistics; + RtcpTransceiverImpl rtcp_transceiver(config); + + MockRtpStreamRtcpHandler sender; + rtcp_transceiver.AddMediaSender(/*ssrc=*/12345, &sender); + + rtcp_transceiver.SendCompoundPacket(); + + // Expect a single RTCP packet with a sender and a receiver reports in it. + EXPECT_EQ(transport.num_packets(), 1); + ASSERT_EQ(rtcp_parser.sender_report()->num_packets(), 1); + ASSERT_EQ(rtcp_parser.receiver_report()->num_packets(), 1); + // Sender report may contain up to 31 report blocks, thus remaining 9 report + // block should be attached to the receiver report. + EXPECT_THAT(rtcp_parser.sender_report()->report_blocks(), SizeIs(31)); + EXPECT_THAT(rtcp_parser.receiver_report()->report_blocks(), SizeIs(9)); +} + +TEST(RtcpTransceiverImplTest, RotatesSendersWhenAllSenderReportDoNotFit) { + // Send 6 compound packet, each should contain 5 sender reports, + // each of 6 senders should be mentioned 5 times. + static constexpr int kNumSenders = 6; + static constexpr uint32_t kSenderSsrc[kNumSenders] = {10, 20, 30, 40, 50, 60}; + static constexpr int kSendersPerPacket = 5; + SimulatedClock clock(100'000'000); + // RtcpPacketParser remembers only latest block for each type, but this test + // is about sending multiple sender reports in the same packet, thus need + // a more advance parser: RtcpTranceiver + RtcpTransceiverConfig receiver_config = DefaultTestConfig(); + receiver_config.clock = &clock; + RtcpTransceiverImpl rtcp_receiver(receiver_config); + // Main expectatation: all senders are spread equally across multiple packets. + NiceMock receiver[kNumSenders]; + for (int i = 0; i < kNumSenders; ++i) { + SCOPED_TRACE(i); + EXPECT_CALL(receiver[i], OnSenderReport(kSenderSsrc[i], _, _)) + .Times(kSendersPerPacket); + rtcp_receiver.AddMediaReceiverRtcpObserver(kSenderSsrc[i], &receiver[i]); + } + + MockTransport transport; + EXPECT_CALL(transport, SendRtcp) + .Times(kNumSenders) + .WillRepeatedly([&](const uint8_t* data, size_t size) { + rtcp_receiver.ReceivePacket(rtc::MakeArrayView(data, size), + clock.CurrentTime()); + return true; + }); + RtcpTransceiverConfig config = DefaultTestConfig(); + config.clock = &clock; + config.outgoing_transport = &transport; + // Limit packet to have space just for kSendersPerPacket sender reports. + // Sender report without report blocks require 28 bytes. + config.max_packet_size = kSendersPerPacket * 28; + RtcpTransceiverImpl rtcp_transceiver(config); + NiceMock sender[kNumSenders]; + for (int i = 0; i < kNumSenders; ++i) { + rtcp_transceiver.AddMediaSender(kSenderSsrc[i], &sender[i]); + } + + for (int i = 1; i <= kNumSenders; ++i) { + SCOPED_TRACE(i); + rtcp_transceiver.SendCompoundPacket(); + } +} + } // namespace } // namespace webrtc