From 980cadd02c7384397a41c0e334e9f329f3cc5c65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Thu, 25 Jun 2020 09:58:35 +0200 Subject: [PATCH] Revert "Lets PacingController call PacketRouter directly." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 848ea9f0d3678118cb8926a2898454e5a4df58ae. Reason for revert: Part of changes that may cause deadlock Original change's description: > Lets PacingController call PacketRouter directly. > > Since locking model has been cleaned up, PacingController can now call > PacketRouter directly - without having to go via PacedSender or > TaskQueuePacedSender. > > Bug: webrtc:10809 > Change-Id: I181f04167d677c35395286f8b246aefb4c3e7ec7 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175909 > Reviewed-by: Sebastian Jansson > Commit-Queue: Erik Språng > Cr-Commit-Position: refs/heads/master@{#31342} TBR=sprang@webrtc.org,srte@webrtc.org # Not skipping CQ checks because original CL landed > 1 day ago. Bug: webrtc:10809 Change-Id: I1d7d5217a03a51555b130ec5c2dd6a992b6e489e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178021 Reviewed-by: Erik Språng Reviewed-by: Sebastian Jansson Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/master@{#31563} --- modules/pacing/paced_sender.cc | 12 ++++- modules/pacing/paced_sender.h | 12 ++++- modules/pacing/paced_sender_unittest.cc | 2 +- modules/pacing/pacing_controller.cc | 4 +- modules/pacing/pacing_controller.h | 4 +- modules/pacing/pacing_controller_unittest.cc | 46 +++++++++---------- modules/pacing/packet_router.cc | 8 ++-- modules/pacing/packet_router.h | 13 +++--- modules/pacing/packet_router_unittest.cc | 19 ++++---- modules/pacing/task_queue_paced_sender.cc | 14 +++++- modules/pacing/task_queue_paced_sender.h | 14 +++++- .../task_queue_paced_sender_unittest.cc | 2 +- 12 files changed, 96 insertions(+), 54 deletions(-) diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index a0e76761e7..8d9c0b39fe 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -41,11 +41,12 @@ PacedSender::PacedSender(Clock* clock, ? PacingController::ProcessMode::kDynamic : PacingController::ProcessMode::kPeriodic), pacing_controller_(clock, - packet_router, + static_cast(this), event_log, field_trials, process_mode_), clock_(clock), + packet_router_(packet_router), process_thread_(process_thread) { if (process_thread_) process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE); @@ -204,4 +205,13 @@ void PacedSender::SetQueueTimeLimit(TimeDelta limit) { MaybeWakupProcessThread(); } +void PacedSender::SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + packet_router_->SendPacket(std::move(packet), cluster_info); +} + +std::vector> PacedSender::GeneratePadding( + DataSize size) { + return packet_router_->GeneratePadding(size.bytes()); +} } // namespace webrtc diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index cc83b403ba..16137dfcd6 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -43,7 +43,8 @@ class RtcEventLog; // updating dependencies. class PacedSender : public Module, public RtpPacketPacer, - public RtpPacketSender { + public RtpPacketSender, + private PacingController::PacketSender { public: // Expected max pacer delay in ms. If ExpectedQueueTime() is higher than // this value, the packet producers should wait (eg drop frames rather than @@ -139,6 +140,14 @@ class PacedSender : public Module, // In dynamic process mode, refreshes the next process time. void MaybeWakupProcessThread(); + // Methods implementing PacedSenderController:PacketSender. + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) override + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + + std::vector> GeneratePadding( + DataSize size) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + // Private implementation of Module to not expose those implementation details // publicly and control when the class is registered/deregistered. class ModuleProxy : public Module { @@ -162,6 +171,7 @@ class PacedSender : public Module, PacingController pacing_controller_ RTC_GUARDED_BY(critsect_); Clock* const clock_; + PacketRouter* const packet_router_; ProcessThread* const process_thread_; }; } // namespace webrtc diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index 53cc1c42ed..dcbe7d5655 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -46,7 +46,7 @@ class MockCallback : public PacketRouter { (override)); MOCK_METHOD(std::vector>, GeneratePadding, - (DataSize target_size), + (size_t target_size_bytes), (override)); }; diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 07e265b0da..7c52306843 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -440,7 +440,7 @@ void PacingController::ProcessPackets() { for (auto& packet : keepalive_packets) { keepalive_data_sent += DataSize::Bytes(packet->payload_size() + packet->padding_size()); - packet_sender_->SendPacket(std::move(packet), PacedPacketInfo()); + packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo()); } OnPaddingSent(keepalive_data_sent); } @@ -559,7 +559,7 @@ void PacingController::ProcessPackets() { packet_size += DataSize::Bytes(rtp_packet->headers_size()) + transport_overhead_per_packet_; } - packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); + packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info); data_sent += packet_size; diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index 6e361aebb4..20d2539e45 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -55,8 +55,8 @@ class PacingController { class PacketSender { public: virtual ~PacketSender() = default; - virtual void SendPacket(std::unique_ptr packet, - const PacedPacketInfo& cluster_info) = 0; + virtual void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) = 0; virtual std::vector> GeneratePadding( DataSize size) = 0; }; diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index bc4d47333e..e7a61f75e4 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -69,8 +69,8 @@ std::unique_ptr BuildPacket(RtpPacketMediaType type, // methods that focus on core aspects. class MockPacingControllerCallback : public PacingController::PacketSender { public: - void SendPacket(std::unique_ptr packet, - const PacedPacketInfo& cluster_info) override { + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) override { SendPacket(packet->Ssrc(), packet->SequenceNumber(), packet->capture_time_ms(), packet->packet_type() == RtpPacketMediaType::kRetransmission, @@ -104,7 +104,7 @@ class MockPacingControllerCallback : public PacingController::PacketSender { class MockPacketSender : public PacingController::PacketSender { public: MOCK_METHOD(void, - SendPacket, + SendRtpPacket, (std::unique_ptr packet, const PacedPacketInfo& cluster_info), (override)); @@ -120,8 +120,8 @@ class PacingControllerPadding : public PacingController::PacketSender { PacingControllerPadding() : padding_sent_(0), total_bytes_sent_(0) {} - void SendPacket(std::unique_ptr packet, - const PacedPacketInfo& pacing_info) override { + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& pacing_info) override { total_bytes_sent_ += packet->payload_size(); } @@ -151,8 +151,8 @@ class PacingControllerProbing : public PacingController::PacketSender { public: PacingControllerProbing() : packets_sent_(0), padding_sent_(0) {} - void SendPacket(std::unique_ptr packet, - const PacedPacketInfo& pacing_info) override { + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& pacing_info) override { if (packet->packet_type() != RtpPacketMediaType::kPadding) { ++packets_sent_; } @@ -1575,7 +1575,7 @@ TEST_P(PacingControllerTest, ProbeClusterId) { // First probing cluster. EXPECT_CALL(callback, - SendPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 0))) + SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 0))) .Times(5); for (int i = 0; i < 5; ++i) { @@ -1584,7 +1584,7 @@ TEST_P(PacingControllerTest, ProbeClusterId) { // Second probing cluster. EXPECT_CALL(callback, - SendPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 1))) + SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 1))) .Times(5); for (int i = 0; i < 5; ++i) { @@ -1602,7 +1602,7 @@ TEST_P(PacingControllerTest, ProbeClusterId) { return padding_packets; }); bool non_probe_packet_seen = false; - EXPECT_CALL(callback, SendPacket) + EXPECT_CALL(callback, SendRtpPacket) .WillOnce([&](std::unique_ptr packet, const PacedPacketInfo& cluster_info) { EXPECT_EQ(cluster_info.probe_cluster_id, kNotAProbe); @@ -1632,23 +1632,23 @@ TEST_P(PacingControllerTest, OwnedPacketPrioritizedOnType) { ::testing::InSequence seq; EXPECT_CALL( callback, - SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _)); - EXPECT_CALL( - callback, - SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); + SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _)); + EXPECT_CALL(callback, + SendRtpPacket( + Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); // FEC and video actually have the same priority, so will come out in // insertion order. + EXPECT_CALL(callback, + SendRtpPacket( + Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _)); EXPECT_CALL( callback, - SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _)); - EXPECT_CALL( - callback, - SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _)); + SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _)); - EXPECT_CALL( - callback, - SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); + EXPECT_CALL(callback, + SendRtpPacket( + Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); while (pacer_->QueueSizePackets() > 0) { if (PeriodicProcess()) { @@ -1683,7 +1683,7 @@ TEST_P(PacingControllerTest, SmallFirstProbePacket) { size_t packets_sent = 0; bool media_seen = false; - EXPECT_CALL(callback, SendPacket) + EXPECT_CALL(callback, SendRtpPacket) .Times(::testing::AnyNumber()) .WillRepeatedly([&](std::unique_ptr packet, const PacedPacketInfo& cluster_info) { @@ -1821,7 +1821,7 @@ TEST_P(PacingControllerTest, for (bool account_for_audio : {false, true}) { uint16_t sequence_number = 1234; MockPacketSender callback; - EXPECT_CALL(callback, SendPacket).Times(::testing::AnyNumber()); + EXPECT_CALL(callback, SendRtpPacket).Times(::testing::AnyNumber()); pacer_ = std::make_unique(&clock_, &callback, nullptr, nullptr, GetParam()); pacer_->SetAccountForAudioPackets(account_for_audio); diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc index e75b5a337a..3569738cdf 100644 --- a/modules/pacing/packet_router.cc +++ b/modules/pacing/packet_router.cc @@ -174,9 +174,9 @@ void PacketRouter::SendPacket(std::unique_ptr packet, } std::vector> PacketRouter::GeneratePadding( - DataSize size) { + size_t target_size_bytes) { TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("webrtc"), - "PacketRouter::GeneratePadding", "bytes", size.bytes()); + "PacketRouter::GeneratePadding", "bytes", target_size_bytes); MutexLock lock(&modules_mutex_); // First try on the last rtp module to have sent media. This increases the @@ -188,7 +188,7 @@ std::vector> PacketRouter::GeneratePadding( std::vector> padding_packets; if (last_send_module_ != nullptr && last_send_module_->SupportsRtxPayloadPadding()) { - padding_packets = last_send_module_->GeneratePadding(size.bytes()); + padding_packets = last_send_module_->GeneratePadding(target_size_bytes); } if (padding_packets.empty()) { @@ -197,7 +197,7 @@ std::vector> PacketRouter::GeneratePadding( // be taken into account by the bandwidth estimator, e.g. in FF. for (RtpRtcpInterface* rtp_module : send_modules_list_) { if (rtp_module->SupportsPadding()) { - padding_packets = rtp_module->GeneratePadding(size.bytes()); + padding_packets = rtp_module->GeneratePadding(target_size_bytes); if (!padding_packets.empty()) { last_send_module_ = rtp_module; break; diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h index 73837f2ffe..379ec20f20 100644 --- a/modules/pacing/packet_router.h +++ b/modules/pacing/packet_router.h @@ -21,7 +21,6 @@ #include #include "api/transport/network_types.h" -#include "modules/pacing/pacing_controller.h" #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/rtcp_packet.h" @@ -41,8 +40,7 @@ class RtpRtcpInterface; // (receiver report). For the latter case, we also keep track of the // receive modules. class PacketRouter : public RemoteBitrateObserver, - public TransportFeedbackSenderInterface, - public PacingController::PacketSender { + public TransportFeedbackSenderInterface { public: PacketRouter(); explicit PacketRouter(uint16_t start_transport_seq); @@ -55,10 +53,11 @@ class PacketRouter : public RemoteBitrateObserver, bool remb_candidate); void RemoveReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender); - void SendPacket(std::unique_ptr packet, - const PacedPacketInfo& cluster_info) override; - std::vector> GeneratePadding( - DataSize size) override; + virtual void SendPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info); + + virtual std::vector> GeneratePadding( + size_t target_size_bytes); uint16_t CurrentTransportSequenceNumber() const; diff --git a/modules/pacing/packet_router_unittest.cc b/modules/pacing/packet_router_unittest.cc index 10cf98b3dd..6af7529e86 100644 --- a/modules/pacing/packet_router_unittest.cc +++ b/modules/pacing/packet_router_unittest.cc @@ -68,7 +68,7 @@ class PacketRouterTest : public ::testing::Test { }; TEST_F(PacketRouterTest, Sanity_NoModuleRegistered_GeneratePadding) { - constexpr DataSize bytes = DataSize::Bytes(300); + constexpr size_t bytes = 300; const PacedPacketInfo paced_info(1, kProbeMinProbes, kProbeMinBytes); EXPECT_TRUE(packet_router_.GeneratePadding(bytes).empty()); @@ -122,8 +122,7 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesRtx) { return std::vector>( kExpectedPaddingPackets); }); - auto generated_padding = - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize)); + auto generated_padding = packet_router_.GeneratePadding(kPaddingSize); EXPECT_EQ(generated_padding.size(), kExpectedPaddingPackets); packet_router_.RemoveSendRtpModule(&rtp_1); @@ -160,7 +159,7 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesVideo) { packet_router_.AddSendRtpModule(&audio_module, false); EXPECT_CALL(audio_module, GeneratePadding(kPaddingSize)) .WillOnce(generate_padding); - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize)); + packet_router_.GeneratePadding(kPaddingSize); // Add the video module, this should now be prioritized since we cannot // guarantee that audio packets will be included in the BWE. @@ -168,7 +167,7 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesVideo) { EXPECT_CALL(audio_module, GeneratePadding).Times(0); EXPECT_CALL(video_module, GeneratePadding(kPaddingSize)) .WillOnce(generate_padding); - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize)); + packet_router_.GeneratePadding(kPaddingSize); // Remove and the add audio module again. Module order shouldn't matter; // video should still be prioritized. @@ -177,14 +176,14 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesVideo) { EXPECT_CALL(audio_module, GeneratePadding).Times(0); EXPECT_CALL(video_module, GeneratePadding(kPaddingSize)) .WillOnce(generate_padding); - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize)); + packet_router_.GeneratePadding(kPaddingSize); // Remove and the video module, we should fall back to padding on the // audio module again. packet_router_.RemoveSendRtpModule(&video_module); EXPECT_CALL(audio_module, GeneratePadding(kPaddingSize)) .WillOnce(generate_padding); - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize)); + packet_router_.GeneratePadding(kPaddingSize); packet_router_.RemoveSendRtpModule(&audio_module); } @@ -244,7 +243,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) { packets.push_back(BuildRtpPacket(kSsrc2)); return packets; }); - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingBytes)); + packet_router_.GeneratePadding(kPaddingBytes); // Send media on first module. Padding should be sent on that module. packet_router_.SendPacket(BuildRtpPacket(kSsrc1), PacedPacketInfo()); @@ -256,7 +255,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) { packets.push_back(BuildRtpPacket(kSsrc1)); return packets; }); - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingBytes)); + packet_router_.GeneratePadding(kPaddingBytes); // Send media on second module. Padding should be sent there. packet_router_.SendPacket(BuildRtpPacket(kSsrc2), PacedPacketInfo()); @@ -286,7 +285,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) { for (int i = 0; i < 2; ++i) { last_send_module = nullptr; - packet_router_.GeneratePadding(DataSize::Bytes(kPaddingBytes)); + packet_router_.GeneratePadding(kPaddingBytes); EXPECT_NE(last_send_module, nullptr); packet_router_.RemoveSendRtpModule(last_send_module); } diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index db748f30b4..531e9d6ad3 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -39,8 +39,9 @@ TaskQueuePacedSender::TaskQueuePacedSender( TimeDelta hold_back_window) : clock_(clock), hold_back_window_(hold_back_window), + packet_router_(packet_router), pacing_controller_(clock, - packet_router, + static_cast(this), event_log, field_trials, PacingController::ProcessMode::kDynamic), @@ -237,6 +238,17 @@ void TaskQueuePacedSender::MaybeProcessPackets( MaybeUpdateStats(false); } +std::vector> +TaskQueuePacedSender::GeneratePadding(DataSize size) { + return packet_router_->GeneratePadding(size.bytes()); +} + +void TaskQueuePacedSender::SendRtpPacket( + std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + packet_router_->SendPacket(std::move(packet), cluster_info); +} + void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) { if (is_shutdown_) { if (is_scheduled_call) { diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 9787b8beee..71b3be27e6 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -39,7 +39,9 @@ namespace webrtc { class Clock; class RtcEventLog; -class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { +class TaskQueuePacedSender : public RtpPacketPacer, + public RtpPacketSender, + private PacingController::PacketSender { public: // The |hold_back_window| parameter sets a lower bound on time to sleep if // there is currently a pacer queue and packets can't immediately be @@ -127,11 +129,21 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // method again with desired (finite) scheduled process time. void MaybeProcessPackets(Timestamp scheduled_process_time); + // Methods implementing PacedSenderController:PacketSender. + + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) override + RTC_RUN_ON(task_queue_); + + std::vector> GeneratePadding( + DataSize size) override RTC_RUN_ON(task_queue_); + void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_); Stats GetStats() const; Clock* const clock_; const TimeDelta hold_back_window_; + PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_); PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_); // We want only one (valid) delayed process task in flight at a time. diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index 876cd96cfd..ab6a24ba42 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -45,7 +45,7 @@ class MockPacketRouter : public PacketRouter { (override)); MOCK_METHOD(std::vector>, GeneratePadding, - (DataSize target_size), + (size_t target_size_bytes), (override)); };