From 8c8d49ea0f75e90db16412ce2f8c4bd469147012 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Mon, 30 Oct 2017 15:21:41 +0100 Subject: [PATCH] Add periodic compound packet sending to RtcpTransceiver Bug: webrtc:8239 Change-Id: I1511db63a15e8c5101a933e55e66d3877ff963be Reviewed-on: https://webrtc-review.googlesource.com/15440 Commit-Queue: Danil Chapovalov Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#20480} --- modules/rtp_rtcp/BUILD.gn | 4 + .../source/rtcp_transceiver_config.cc | 11 +- .../rtp_rtcp/source/rtcp_transceiver_config.h | 20 ++- .../rtp_rtcp/source/rtcp_transceiver_impl.cc | 46 ++++++- .../rtp_rtcp/source/rtcp_transceiver_impl.h | 7 ++ .../source/rtcp_transceiver_impl_unittest.cc | 117 ++++++++++++++++++ 6 files changed, 199 insertions(+), 6 deletions(-) diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index b3e9f23a27..52c40e2079 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -232,6 +232,8 @@ rtc_source_set("rtcp_transceiver") { "../../api:array_view", "../../api:transport_api", "../../rtc_base:rtc_base_approved", + "../../rtc_base:rtc_task_queue", + "../../rtc_base:weak_ptr", ] } @@ -390,11 +392,13 @@ if (rtc_include_tests) { "../..:webrtc_common", "../../api:array_view", "../../api:libjingle_peerconnection_api", + "../../api:optional", "../../api:transport_api", "../../call:rtp_receiver", "../../common_video:common_video", "../../logging:rtc_event_log_api", "../../rtc_base:rtc_base_approved", + "../../rtc_base:rtc_task_queue", "../../system_wrappers:system_wrappers", "../../test:field_trial", "../../test:rtp_test_utils", diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_config.cc b/modules/rtp_rtcp/source/rtcp_transceiver_config.cc index c129d171ee..0cd0abed67 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_config.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_config.cc @@ -43,15 +43,20 @@ bool RtcpTransceiverConfig::Validate() const { << " more than " << IP_PACKET_SIZE << " is unsupported."; return false; } - if (outgoing_transport == nullptr) { + if (!outgoing_transport) { LOG(LS_ERROR) << debug_id << "outgoing transport must be set"; return false; } - if (min_periodic_report_ms <= 0) { - LOG(LS_ERROR) << debug_id << "period " << min_periodic_report_ms + if (report_period_ms <= 0) { + LOG(LS_ERROR) << debug_id << "period " << report_period_ms << "ms between reports should be positive."; return false; } + if (schedule_periodic_compound_packets && !task_queue) { + LOG(LS_ERROR) << debug_id + << "missing task queue for periodic compound packets"; + return false; + } // TODO(danilchap): Remove or update the warning when RtcpTransceiver supports // send-only sessions. if (receive_statistics == nullptr) diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_config.h b/modules/rtp_rtcp/source/rtcp_transceiver_config.h index 187ad189d5..79f18c80c1 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_config.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver_config.h @@ -13,6 +13,8 @@ #include +#include "rtc_base/task_queue.h" + namespace webrtc { class ReceiveStatisticsProvider; class Transport; @@ -43,11 +45,25 @@ struct RtcpTransceiverConfig { // Transport to send rtcp packets to. Should be set. Transport* outgoing_transport = nullptr; - // Minimum period to send receiver reports and attached messages. - int min_periodic_report_ms = 1000; + // Queue for scheduling delayed tasks, e.g. sending periodic compound packets. + rtc::TaskQueue* task_queue = nullptr; // Rtcp report block generator for outgoing receiver reports. ReceiveStatisticsProvider* receive_statistics = nullptr; + + // + // Tuning parameters. + // + // Delay before 1st periodic compound packet. + int initial_report_delay_ms = 500; + + // Period between periodic compound packets. + int report_period_ms = 1000; + + // + // Flags for features and experiments. + // + bool schedule_periodic_compound_packets = true; }; } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc index c8db3c858b..4c4ae9567c 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc @@ -21,6 +21,8 @@ #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h" #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h" #include "rtc_base/checks.h" +#include "rtc_base/ptr_util.h" +#include "rtc_base/task_queue.h" namespace webrtc { namespace { @@ -66,13 +68,55 @@ class PacketSender : public rtcp::RtcpPacket::PacketReadyCallback { } // namespace RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config) - : config_(config) { + : config_(config), ptr_factory_(this) { RTC_CHECK(config_.Validate()); + if (config_.schedule_periodic_compound_packets) + ReschedulePeriodicCompoundPackets(config_.initial_report_delay_ms); } RtcpTransceiverImpl::~RtcpTransceiverImpl() = default; void RtcpTransceiverImpl::SendCompoundPacket() { + SendPacket(); + if (config_.schedule_periodic_compound_packets) + ReschedulePeriodicCompoundPackets(config_.report_period_ms); +} + +void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) { + class SendPeriodicCompoundPacket : public rtc::QueuedTask { + public: + SendPeriodicCompoundPacket(rtc::TaskQueue* task_queue, + rtc::WeakPtr ptr) + : task_queue_(task_queue), ptr_(std::move(ptr)) {} + bool Run() override { + RTC_DCHECK(task_queue_->IsCurrent()); + if (!ptr_) + return true; + ptr_->SendPacket(); + task_queue_->PostDelayedTask(rtc::WrapUnique(this), + ptr_->config_.report_period_ms); + return false; + } + + private: + rtc::TaskQueue* const task_queue_; + const rtc::WeakPtr ptr_; + }; + + RTC_DCHECK(config_.schedule_periodic_compound_packets); + RTC_DCHECK(config_.task_queue->IsCurrent()); + + // Stop existent send task if there is one. + ptr_factory_.InvalidateWeakPtrs(); + auto task = rtc::MakeUnique( + config_.task_queue, ptr_factory_.GetWeakPtr()); + if (delay_ms > 0) + config_.task_queue->PostDelayedTask(std::move(task), delay_ms); + else + config_.task_queue->PostTask(std::move(task)); +} + +void RtcpTransceiverImpl::SendPacket() { PacketSender sender(config_.outgoing_transport, config_.max_packet_size); rtcp::ReceiverReport rr; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h index decb4fee95..2a9e3d359e 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h @@ -17,6 +17,7 @@ #include "api/array_view.h" #include "modules/rtp_rtcp/source/rtcp_transceiver_config.h" #include "rtc_base/constructormagic.h" +#include "rtc_base/weak_ptr.h" namespace webrtc { // @@ -32,8 +33,14 @@ class RtcpTransceiverImpl { void SendCompoundPacket(); private: + void ReschedulePeriodicCompoundPackets(int64_t delay_ms); + // Sends RTCP packets. + void SendPacket(); + const RtcpTransceiverConfig config_; + rtc::WeakPtrFactory ptr_factory_; + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcpTransceiverImpl); }; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc index ca335ec438..9e9464468b 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc @@ -13,7 +13,9 @@ #include #include "modules/rtp_rtcp/include/receive_statistics.h" +#include "rtc_base/event.h" #include "rtc_base/ptr_util.h" +#include "rtc_base/task_queue.h" #include "test/gmock.h" #include "test/gtest.h" #include "test/mock_transport.h" @@ -36,6 +38,119 @@ class MockReceiveStatisticsProvider : public webrtc::ReceiveStatisticsProvider { MOCK_METHOD1(RtcpReportBlocks, std::vector(size_t)); }; +// Helper to wait for an rtcp packet produced on a different thread/task queue. +class FakeRtcpTransport : public webrtc::Transport { + public: + FakeRtcpTransport() : sent_rtcp_(false, false) {} + bool SendRtcp(const uint8_t* data, size_t size) override { + sent_rtcp_.Set(); + return true; + } + bool SendRtp(const uint8_t*, size_t, const webrtc::PacketOptions&) override { + ADD_FAILURE() << "RtcpTransciver shouldn't send rtp packets."; + return true; + } + + // Returns true if packet was received by this transport before timeout, + bool WaitPacket(int64_t timeout_ms) { return sent_rtcp_.Wait(timeout_ms); } + + private: + rtc::Event sent_rtcp_; +}; + +// Posting delayed tasks doesn't promise high precision. +constexpr int64_t kTaskQueuePrecisionMs = 15; + +TEST(RtcpTransceiverImplTest, DelaysSendingFirstCompondPacket) { + rtc::TaskQueue queue("rtcp"); + FakeRtcpTransport transport; + RtcpTransceiverConfig config; + config.outgoing_transport = &transport; + config.initial_report_delay_ms = 10; + config.task_queue = &queue; + rtc::Optional rtcp_transceiver; + + int64_t started_ms = rtc::TimeMillis(); + queue.PostTask([&] { rtcp_transceiver.emplace(config); }); + EXPECT_TRUE(transport.WaitPacket(config.initial_report_delay_ms + + kTaskQueuePrecisionMs)); + + EXPECT_GE(rtc::TimeMillis() - started_ms, config.initial_report_delay_ms); + + // Cleanup. + rtc::Event done(false, false); + queue.PostTask([&] { + rtcp_transceiver.reset(); + done.Set(); + }); + ASSERT_TRUE(done.Wait(/*milliseconds=*/100)); +} + +TEST(RtcpTransceiverImplTest, PeriodicallySendsPackets) { + rtc::TaskQueue queue("rtcp"); + FakeRtcpTransport transport; + RtcpTransceiverConfig config; + config.outgoing_transport = &transport; + config.initial_report_delay_ms = 0; + config.report_period_ms = 10; + config.task_queue = &queue; + rtc::Optional rtcp_transceiver; + queue.PostTask([&] { rtcp_transceiver.emplace(config); }); + + EXPECT_TRUE(transport.WaitPacket(config.initial_report_delay_ms + + kTaskQueuePrecisionMs)); + int64_t time_of_1st_packet_ms = rtc::TimeMillis(); + EXPECT_TRUE( + transport.WaitPacket(config.report_period_ms + kTaskQueuePrecisionMs)); + int64_t time_of_2nd_packet_ms = rtc::TimeMillis(); + + EXPECT_GE(time_of_2nd_packet_ms - time_of_1st_packet_ms, + config.report_period_ms); + + // Cleanup. + rtc::Event done(false, false); + queue.PostTask([&] { + rtcp_transceiver.reset(); + done.Set(); + }); + ASSERT_TRUE(done.Wait(/*milliseconds=*/100)); +} + +TEST(RtcpTransceiverImplTest, SendCompoundPacketDelaysPeriodicSendPackets) { + rtc::TaskQueue queue("rtcp"); + FakeRtcpTransport transport; + RtcpTransceiverConfig config; + config.outgoing_transport = &transport; + config.initial_report_delay_ms = 0; + config.report_period_ms = 10; + config.task_queue = &queue; + rtc::Optional rtcp_transceiver; + queue.PostTask([&] { rtcp_transceiver.emplace(config); }); + + // Wait for first packet. + EXPECT_TRUE(transport.WaitPacket(config.initial_report_delay_ms + + kTaskQueuePrecisionMs)); + // Wait half-period time for next one - it shouldn't be sent. + EXPECT_FALSE(transport.WaitPacket(config.report_period_ms / 2)); + // Send packet now. + queue.PostTask([&] { rtcp_transceiver->SendCompoundPacket(); }); + EXPECT_TRUE(transport.WaitPacket(/*timeout_ms=*/1)); + int64_t time_of_non_periodic_packet_ms = rtc::TimeMillis(); + // Next packet should be sent at least after period_ms. + EXPECT_TRUE( + transport.WaitPacket(config.report_period_ms + kTaskQueuePrecisionMs)); + EXPECT_GE(rtc::TimeMillis() - time_of_non_periodic_packet_ms, + config.report_period_ms); + + // Cleanup. + rtc::Event done(false, false); + queue.PostTask([&] { + rtcp_transceiver.reset(); + done.Set(); + }); + ASSERT_TRUE(done.Wait(/*milliseconds=*/100)); +} + TEST(RtcpTransceiverImplTest, SendsMinimalCompoundPacket) { const uint32_t kSenderSsrc = 12345; MockTransport outgoing_transport; @@ -43,6 +158,7 @@ TEST(RtcpTransceiverImplTest, SendsMinimalCompoundPacket) { config.feedback_ssrc = kSenderSsrc; config.cname = "cname"; config.outgoing_transport = &outgoing_transport; + config.schedule_periodic_compound_packets = false; RtcpTransceiverImpl rtcp_transceiver(config); RtcpPacketParser rtcp_parser; @@ -78,6 +194,7 @@ TEST(RtcpTransceiverImplTest, ReceiverReportUsesReceiveStatistics) { config.feedback_ssrc = kSenderSsrc; config.outgoing_transport = &outgoing_transport; config.receive_statistics = &receive_statistics; + config.schedule_periodic_compound_packets = false; RtcpTransceiverImpl rtcp_transceiver(config); rtcp_transceiver.SendCompoundPacket();