diff --git a/api/call/transport.h b/api/call/transport.h index 8bff28825d..387ce8d15b 100644 --- a/api/call/transport.h +++ b/api/call/transport.h @@ -36,6 +36,10 @@ struct PacketOptions { bool is_retransmit = false; bool included_in_feedback = false; bool included_in_allocation = false; + // Whether this packet can be part of a packet batch at lower levels. + bool batchable = false; + // Whether this packet is the last of a batch. + bool last_packet_in_batch = false; }; class Transport { diff --git a/call/call_config.h b/call/call_config.h index 6df4ab7ed4..918c077435 100644 --- a/call/call_config.h +++ b/call/call_config.h @@ -81,6 +81,9 @@ struct CallConfig { // The burst interval of the pacer, see TaskQueuePacedSender constructor. absl::optional pacer_burst_interval; + + // Enables send packet batching from the egress RTP sender. + bool enable_send_packet_batching = false; }; } // namespace webrtc diff --git a/call/rtp_config.h b/call/rtp_config.h index 0cc9466a9f..a01a902ba9 100644 --- a/call/rtp_config.h +++ b/call/rtp_config.h @@ -159,6 +159,9 @@ struct RtpConfig { // RTCP CNAME, see RFC 3550. std::string c_name; + // Enables send packet batching from the egress RTP sender. + bool enable_send_packet_batching = false; + bool IsMediaSsrc(uint32_t ssrc) const; bool IsRtxSsrc(uint32_t ssrc) const; bool IsFlexfecSsrc(uint32_t ssrc) const; diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc index 9108e83a13..27040f369e 100644 --- a/call/rtp_video_sender.cc +++ b/call/rtp_video_sender.cc @@ -235,6 +235,8 @@ std::vector CreateRtpStreamSenders( configuration.extmap_allow_mixed = rtp_config.extmap_allow_mixed; configuration.rtcp_report_interval_ms = rtcp_report_interval_ms; configuration.field_trials = &trials; + configuration.enable_send_packet_batching = + rtp_config.enable_send_packet_batching; std::vector rtp_streams; diff --git a/media/base/media_channel_impl.cc b/media/base/media_channel_impl.cc index ca2a117fd1..0de04b6b23 100644 --- a/media/base/media_channel_impl.cc +++ b/media/base/media_channel_impl.cc @@ -203,6 +203,8 @@ void MediaChannel::SendRtp(const uint8_t* data, [this, packet_id = options.packet_id, included_in_feedback = options.included_in_feedback, included_in_allocation = options.included_in_allocation, + batchable = options.batchable, + last_packet_in_batch = options.last_packet_in_batch, packet = rtc::CopyOnWriteBuffer(data, len, kMaxRtpPacketLen)]() mutable { rtc::PacketOptions rtc_options; rtc_options.packet_id = packet_id; @@ -213,6 +215,8 @@ void MediaChannel::SendRtp(const uint8_t* data, included_in_feedback; rtc_options.info_signaled_after_sent.included_in_allocation = included_in_allocation; + rtc_options.batchable = batchable; + rtc_options.last_packet_in_batch = last_packet_in_batch; SendPacket(&packet, rtc_options); }; diff --git a/media/base/media_config.h b/media/base/media_config.h index b383c9aa3d..782770569c 100644 --- a/media/base/media_config.h +++ b/media/base/media_config.h @@ -62,6 +62,9 @@ struct MediaConfig { // Time interval between RTCP report for video int rtcp_report_interval_ms = 1000; + + // Enables send packet batching from the egress RTP sender. + bool enable_send_packet_batching = false; } video; // Audio-specific config. @@ -82,6 +85,8 @@ struct MediaConfig { video.experiment_cpu_load_estimator == o.video.experiment_cpu_load_estimator && video.rtcp_report_interval_ms == o.video.rtcp_report_interval_ms && + video.enable_send_packet_batching == + o.video.enable_send_packet_batching && audio.rtcp_report_interval_ms == o.audio.rtcp_report_interval_ms; } diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index cc1edc8ec8..04056da7b9 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -1386,6 +1386,8 @@ bool WebRtcVideoChannel::AddSendStream(const StreamParams& sp) { config.crypto_options = crypto_options_; config.rtp.extmap_allow_mixed = ExtmapAllowMixed(); config.rtcp_report_interval_ms = video_config_.rtcp_report_interval_ms; + config.rtp.enable_send_packet_batching = + video_config_.enable_send_packet_batching; WebRtcVideoSendStream* stream = new WebRtcVideoSendStream( call_, sp, std::move(config), default_send_options_, diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index fb36f92761..ea80c8c819 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -62,6 +62,7 @@ rtc_library("pacing") { "../utility:utility", ] absl_deps = [ + "//third_party/abseil-cpp/absl/cleanup", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index a526fc1362..495aaebffa 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -15,6 +15,7 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "absl/strings/match.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" @@ -374,6 +375,9 @@ Timestamp PacingController::NextSendTime() const { } void PacingController::ProcessPackets() { + absl::Cleanup cleanup = [packet_sender = packet_sender_] { + packet_sender->OnBatchComplete(); + }; const Timestamp now = CurrentTime(); Timestamp target_send_time = now; diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index b0d802b48f..2145868a62 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -52,6 +52,8 @@ class PacingController { virtual std::vector> FetchFec() = 0; virtual std::vector> GeneratePadding( DataSize size) = 0; + // TODO(bugs.webrtc.org/1439830): Make pure virtual once subclasses adapt. + virtual void OnBatchComplete() {} // TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects // have been updated. diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index 2c5f8e9b07..ade71cd5f5 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -138,6 +138,7 @@ class MockPacingControllerCallback : public PacingController::PacketSender { GetRtxSsrcForMedia, (uint32_t), (const, override)); + MOCK_METHOD(void, OnBatchComplete, (), (override)); }; // Mock callback implementing the raw api. @@ -165,6 +166,7 @@ class MockPacketSender : public PacingController::PacketSender { GetRtxSsrcForMedia, (uint32_t), (const, override)); + MOCK_METHOD(void, OnBatchComplete, (), (override)); }; class PacingControllerPadding : public PacingController::PacketSender { @@ -202,6 +204,8 @@ class PacingControllerPadding : public PacingController::PacketSender { return absl::nullopt; } + void OnBatchComplete() override {} + size_t padding_sent() { return padding_sent_; } size_t total_bytes_sent() { return total_bytes_sent_; } @@ -260,6 +264,7 @@ class PacingControllerProbing : public PacingController::PacketSender { absl::optional GetRtxSsrcForMedia(uint32_t) const override { return absl::nullopt; } + void OnBatchComplete() override {} int packets_sent() const { return packets_sent_; } int padding_packets_sent() const { return padding_packets_sent_; } @@ -1454,6 +1459,13 @@ TEST_F(PacingControllerTest, PaddingOveruse) { pacer->ProcessPackets(); } +TEST_F(PacingControllerTest, ProvidesOnBatchCompleteToPacketSender) { + MockPacketSender callback; + auto pacer = std::make_unique(&clock_, &callback, trials_); + EXPECT_CALL(callback, OnBatchComplete); + pacer->ProcessPackets(); +} + TEST_F(PacingControllerTest, ProbeClusterId) { MockPacketSender callback; uint32_t ssrc = 12346; diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc index a25f69225a..135b618bfa 100644 --- a/modules/pacing/packet_router.cc +++ b/modules/pacing/packet_router.cc @@ -90,6 +90,7 @@ void PacketRouter::RemoveSendRtpModuleFromMap(uint32_t ssrc) { auto it = send_modules_map_.find(ssrc); RTC_DCHECK(it != send_modules_map_.end()); send_modules_list_.remove(it->second); + RTC_CHECK(modules_used_in_current_batch_.empty()); send_modules_map_.erase(it); } @@ -166,6 +167,7 @@ void PacketRouter::SendPacket(std::unique_ptr packet, RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module."; return; } + modules_used_in_current_batch_.insert(rtp_module); // Sending succeeded. @@ -184,6 +186,16 @@ void PacketRouter::SendPacket(std::unique_ptr packet, } } +void PacketRouter::OnBatchComplete() { + RTC_DCHECK_RUN_ON(&thread_checker_); + TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"), + "PacketRouter::OnBatchComplete"); + for (auto& module : modules_used_in_current_batch_) { + module->OnBatchComplete(); + } + modules_used_in_current_batch_.clear(); +} + std::vector> PacketRouter::FetchFec() { RTC_DCHECK_RUN_ON(&thread_checker_); std::vector> fec_packets = diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h index 805f422112..61779f49e5 100644 --- a/modules/pacing/packet_router.h +++ b/modules/pacing/packet_router.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -62,6 +63,7 @@ class PacketRouter : public PacingController::PacketSender { uint32_t ssrc, rtc::ArrayView sequence_numbers) override; absl::optional GetRtxSsrcForMedia(uint32_t ssrc) const override; + void OnBatchComplete() override; uint16_t CurrentTransportSequenceNumber() const; @@ -108,6 +110,8 @@ class PacketRouter : public PacingController::PacketSender { std::vector> pending_fec_packets_ RTC_GUARDED_BY(thread_checker_); + std::set modules_used_in_current_batch_ + RTC_GUARDED_BY(thread_checker_); }; } // namespace webrtc #endif // MODULES_PACING_PACKET_ROUTER_H_ diff --git a/modules/pacing/packet_router_unittest.cc b/modules/pacing/packet_router_unittest.cc index 2528129d6e..7604de6fba 100644 --- a/modules/pacing/packet_router_unittest.cc +++ b/modules/pacing/packet_router_unittest.cc @@ -252,6 +252,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) { // If the last active module is removed, and no module sends media before // the next padding request, and arbitrary module will be selected. + packet_router_.OnBatchComplete(); packet_router_.RemoveSendRtpModule(&rtp_2); // Send on and then remove all remaining modules. @@ -301,6 +302,7 @@ TEST_F(PacketRouterTest, AllocatesTransportSequenceNumbers) { packet_router.CurrentTransportSequenceNumber()); } + packet_router.OnBatchComplete(); packet_router.RemoveSendRtpModule(&rtp_1); } @@ -346,7 +348,7 @@ TEST_F(PacketRouterTest, SendPacketWithoutTransportSequenceNumbers) { _)) .WillOnce(Return(true)); packet_router_.SendPacket(std::move(packet), PacedPacketInfo()); - + packet_router_.OnBatchComplete(); packet_router_.RemoveSendRtpModule(&rtp_1); } @@ -390,6 +392,7 @@ TEST_F(PacketRouterTest, SendPacketAssignsTransportSequenceNumbers) { .WillOnce(Return(true)); packet_router_.SendPacket(std::move(packet), PacedPacketInfo()); + packet_router_.OnBatchComplete(); packet_router_.RemoveSendRtpModule(&rtp_1); packet_router_.RemoveSendRtpModule(&rtp_2); } @@ -430,6 +433,7 @@ TEST_F(PacketRouterTest, DoesNotIncrementTransportSequenceNumberOnSendFailure) { .WillOnce(Return(true)); packet_router_.SendPacket(std::move(packet), PacedPacketInfo()); + packet_router_.OnBatchComplete(); packet_router_.RemoveSendRtpModule(&rtp); } @@ -495,6 +499,24 @@ TEST_F(PacketRouterTest, ReportsRtxSsrc) { packet_router_.RemoveSendRtpModule(&rtp_2); } +TEST_F(PacketRouterTest, RoutesBatchCompleteToActiveModules) { + NiceMock rtp_1; + NiceMock rtp_2; + constexpr uint32_t kSsrc1 = 4711; + constexpr uint32_t kSsrc2 = 1234; + ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1)); + ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2)); + packet_router_.AddSendRtpModule(&rtp_1, false); + packet_router_.AddSendRtpModule(&rtp_2, false); + EXPECT_CALL(rtp_1, TrySendPacket).WillOnce(Return(true)); + packet_router_.SendPacket(BuildRtpPacket(kSsrc1), PacedPacketInfo()); + EXPECT_CALL(rtp_1, OnBatchComplete); + EXPECT_CALL(rtp_2, OnBatchComplete).Times(0); + packet_router_.OnBatchComplete(); + packet_router_.RemoveSendRtpModule(&rtp_1); + packet_router_.RemoveSendRtpModule(&rtp_2); +} + #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) using PacketRouterDeathTest = PacketRouterTest; TEST_F(PacketRouterDeathTest, DoubleRegistrationOfSendModuleDisallowed) { diff --git a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h index 52c3fe148c..e1e787a284 100644 --- a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h +++ b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h @@ -88,6 +88,7 @@ class MockRtpRtcpInterface : public RtpRtcpInterface { (std::unique_ptr packet, const PacedPacketInfo& pacing_info), (override)); + MOCK_METHOD(void, OnBatchComplete, (), (override)); MOCK_METHOD(void, SetFecProtectionParams, (const FecProtectionParams& delta_params, diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/modules/rtp_rtcp/source/rtp_rtcp_impl.h index f9d7c7234c..509036d1f6 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.h @@ -135,6 +135,8 @@ class ABSL_DEPRECATED("") ModuleRtpRtcpImpl bool TrySendPacket(std::unique_ptr packet, const PacedPacketInfo& pacing_info) override; + void OnBatchComplete() override {} + void SetFecProtectionParams(const FecProtectionParams& delta_params, const FecProtectionParams& key_params) override; diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc index 79ae08d2ea..f5627c2fcd 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc @@ -344,6 +344,11 @@ bool ModuleRtpRtcpImpl2::TrySendPacket(std::unique_ptr packet, return true; } +void ModuleRtpRtcpImpl2::OnBatchComplete() { + RTC_DCHECK(rtp_sender_); + rtp_sender_->packet_sender.OnBatchComplete(); +} + void ModuleRtpRtcpImpl2::SetFecProtectionParams( const FecProtectionParams& delta_params, const FecProtectionParams& key_params) { diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h index 214ef0406c..147cd33800 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h @@ -146,6 +146,7 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, bool TrySendPacket(std::unique_ptr packet, const PacedPacketInfo& pacing_info) override; + void OnBatchComplete() override; void SetFecProtectionParams(const FecProtectionParams& delta_params, const FecProtectionParams& key_params) override; diff --git a/modules/rtp_rtcp/source/rtp_rtcp_interface.h b/modules/rtp_rtcp/source/rtp_rtcp_interface.h index 92f1a5bbe6..89c6d4646a 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_interface.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_interface.h @@ -148,6 +148,9 @@ class RtpRtcpInterface : public RtcpFeedbackSenderInterface { // not negotiated. If the RID and Repaired RID extensions are not // registered, the RID will not be sent. std::string rid; + + // Enables send packet batching from the egress RTP sender. + bool enable_send_packet_batching = false; }; // Stats for RTCP sender reports (SR) for a specific SSRC. @@ -317,6 +320,10 @@ class RtpRtcpInterface : public RtcpFeedbackSenderInterface { virtual bool TrySendPacket(std::unique_ptr packet, const PacedPacketInfo& pacing_info) = 0; + // Notifies that a batch of packet sends is completed. The implementation can + // use this to optimize packet sending. + virtual void OnBatchComplete() = 0; + // Update the FEC protection parameters to use for delta- and key-frames. // Only used when deferred FEC is active. virtual void SetFecProtectionParams( diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc index d6052f8db3..fdc2792793 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc @@ -66,7 +66,8 @@ void RtpSenderEgress::NonPacedPacketSender::PrepareForSend( RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, RtpPacketHistory* packet_history) - : worker_queue_(TaskQueueBase::Current()), + : enable_send_packet_batching_(config.enable_send_packet_batching), + worker_queue_(TaskQueueBase::Current()), ssrc_(config.local_media_ssrc), rtx_ssrc_(config.rtx_send_ssrc), flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc() @@ -76,9 +77,7 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, packet_history_(packet_history), transport_(config.outgoing_transport), event_log_(config.event_log), -#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE is_audio_(config.audio), -#endif need_rtp_packet_infos_(config.need_rtp_packet_infos), fec_generator_(config.fec_generator), transport_feedback_observer_(config.transport_feedback_callback), @@ -139,13 +138,13 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, RTC_DCHECK(packet->retransmitted_sequence_number().has_value()); } - const uint32_t packet_ssrc = packet->Ssrc(); const Timestamp now = clock_->CurrentTime(); #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE - worker_queue_->PostTask(SafeTask( - task_safety_.flag(), - [this, now, packet_ssrc]() { BweTestLoggingPlot(now, packet_ssrc); })); + worker_queue_->PostTask(SafeTask(task_safety_.flag(), + [this, now, packet_ssrc = packet->Ssrc()]() { + BweTestLoggingPlot(now, packet_ssrc); + })); #endif if (need_rtp_packet_infos_ && @@ -225,6 +224,26 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, } } + auto compound_packet = Packet{std::move(packet), pacing_info, now}; + if (enable_send_packet_batching_ && !is_audio_) { + packets_to_send_.push_back(std::move(compound_packet)); + } else { + CompleteSendPacket(compound_packet, false); + } +} + +void RtpSenderEgress::OnBatchComplete() { + RTC_DCHECK_RUN_ON(&pacer_checker_); + for (auto& packet : packets_to_send_) { + CompleteSendPacket(packet, &packet == &packets_to_send_.back()); + } + packets_to_send_.clear(); +} + +void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, + bool last_in_batch) { + auto& [packet, pacing_info, now] = compound_packet; + const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio || packet->packet_type() == RtpPacketMediaType::kVideo; @@ -246,12 +265,14 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, options.additional_data = packet->additional_data(); + const uint32_t packet_ssrc = packet->Ssrc(); if (packet->packet_type() != RtpPacketMediaType::kPadding && packet->packet_type() != RtpPacketMediaType::kRetransmission) { UpdateDelayStatistics(packet->capture_time(), now, packet_ssrc); UpdateOnSendPacket(options.packet_id, packet->capture_time(), packet_ssrc); } - + options.batchable = enable_send_packet_batching_ && !is_audio_; + options.last_packet_in_batch = last_in_batch; const bool send_success = SendPacketToNetwork(*packet, options, pacing_info); // Put packet in retransmission history or update pending status even if @@ -279,9 +300,9 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, // TODO(bugs.webrtc.org/137439): clean up task posting when the combined // network/worker project launches. if (TaskQueueBase::Current() != worker_queue_) { - worker_queue_->PostTask( - SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type, - counter = std::move(counter), size]() { + worker_queue_->PostTask(SafeTask( + task_safety_.flag(), [this, now = now, packet_ssrc, packet_type, + counter = std::move(counter), size]() { RTC_DCHECK_RUN_ON(worker_queue_); UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), size); diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h index 4b7b50233b..accdeb15d1 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.h +++ b/modules/rtp_rtcp/source/rtp_sender_egress.h @@ -67,6 +67,7 @@ class RtpSenderEgress { void SendPacket(std::unique_ptr packet, const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_); + void OnBatchComplete(); uint32_t Ssrc() const { return ssrc_; } absl::optional RtxSsrc() const { return rtx_ssrc_; } absl::optional FlexFecSsrc() const { return flexfec_ssrc_; } @@ -100,6 +101,13 @@ class RtpSenderEgress { rtc::ArrayView sequence_numbers); private: + struct Packet { + std::unique_ptr rtp_packet; + PacedPacketInfo info; + Timestamp now; + }; + void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch) + RTC_LOCKS_EXCLUDED(lock_) RTC_RUN_ON(pacer_checker_); RtpSendRates GetSendRatesLocked(Timestamp now) const RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); bool HasCorrectSsrc(const RtpPacketToSend& packet) const; @@ -128,6 +136,7 @@ class RtpSenderEgress { // Called on a timer, once a second, on the worker_queue_. void PeriodicUpdate(); + const bool enable_send_packet_batching_; TaskQueueBase* const worker_queue_; RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_checker_; const uint32_t ssrc_; @@ -138,9 +147,7 @@ class RtpSenderEgress { RtpPacketHistory* const packet_history_; Transport* const transport_; RtcEventLog* const event_log_; -#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE const bool is_audio_; -#endif const bool need_rtp_packet_infos_; VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(pacer_checker_); absl::optional last_sent_seq_ RTC_GUARDED_BY(pacer_checker_); @@ -178,6 +185,7 @@ class RtpSenderEgress { const std::unique_ptr rtp_sequence_number_map_ RTC_GUARDED_BY(worker_queue_); RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_); + std::vector packets_to_send_ RTC_GUARDED_BY(pacer_checker_); ScopedTaskSafety task_safety_; }; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc index 504a467c9b..75f18354ec 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc @@ -35,9 +35,10 @@ namespace webrtc { namespace { using ::testing::_; +using ::testing::AllOf; using ::testing::Field; +using ::testing::InSequence; using ::testing::NiceMock; -using ::testing::Optional; using ::testing::StrictMock; constexpr Timestamp kStartTime = Timestamp::Millis(123456789); @@ -96,12 +97,14 @@ class TestTransport : public Transport { public: explicit TestTransport(RtpHeaderExtensionMap* extensions) : total_data_sent_(DataSize::Zero()), extensions_(extensions) {} + MOCK_METHOD(void, SentRtp, (const PacketOptions& options), ()); bool SendRtp(const uint8_t* packet, size_t length, const PacketOptions& options) override { total_data_sent_ += DataSize::Bytes(length); last_packet_.emplace(rtc::MakeArrayView(packet, length), options, extensions_); + SentRtp(options); return true; } @@ -133,6 +136,7 @@ class RtpSenderEgressTest : public ::testing::Test { RtpRtcpInterface::Configuration DefaultConfig() { RtpRtcpInterface::Configuration config; + config.audio = false; config.clock = clock_; config.outgoing_transport = &transport_; config.local_media_ssrc = kSsrc; @@ -175,7 +179,7 @@ class RtpSenderEgressTest : public ::testing::Test { NiceMock send_packet_observer_; NiceMock feedback_observer_; RtpHeaderExtensionMap header_extensions_; - TestTransport transport_; + NiceMock transport_; RtpPacketHistory packet_history_; test::ExplicitKeyValueConfig trials_; uint16_t sequence_number_; @@ -209,6 +213,43 @@ TEST_F(RtpSenderEgressTest, TransportFeedbackObserverGetsCorrectByteCount) { sender->SendPacket(std::move(packet), PacedPacketInfo()); } +TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenNotBatching) { + std::unique_ptr sender = CreateRtpSenderEgress(); + EXPECT_CALL(transport_, + SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false), + Field(&PacketOptions::batchable, false)))); + sender->SendPacket(BuildRtpPacket(), PacedPacketInfo()); +} + +TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenBatchingWithAudio) { + auto config = DefaultConfig(); + config.enable_send_packet_batching = true; + config.audio = true; + auto sender = std::make_unique(config, &packet_history_); + EXPECT_CALL(transport_, + SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false), + Field(&PacketOptions::batchable, false)))) + .Times(2); + sender->SendPacket(BuildRtpPacket(), PacedPacketInfo()); + sender->SendPacket(BuildRtpPacket(), PacedPacketInfo()); +} + +TEST_F(RtpSenderEgressTest, CollectsPacketsWhenBatchingWithVideo) { + auto config = DefaultConfig(); + config.enable_send_packet_batching = true; + auto sender = std::make_unique(config, &packet_history_); + sender->SendPacket(BuildRtpPacket(), PacedPacketInfo()); + sender->SendPacket(BuildRtpPacket(), PacedPacketInfo()); + InSequence s; + EXPECT_CALL(transport_, + SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false), + Field(&PacketOptions::batchable, true)))); + EXPECT_CALL(transport_, + SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, true), + Field(&PacketOptions::batchable, true)))); + sender->OnBatchComplete(); +} + TEST_F(RtpSenderEgressTest, PacketOptionsIsRetransmitSetByPacketType) { std::unique_ptr sender = CreateRtpSenderEgress(); diff --git a/rtc_base/async_packet_socket.h b/rtc_base/async_packet_socket.h index 90f2a13945..ad9a79e6f0 100644 --- a/rtc_base/async_packet_socket.h +++ b/rtc_base/async_packet_socket.h @@ -54,6 +54,12 @@ struct RTC_EXPORT PacketOptions { PacketTimeUpdateParams packet_time_params; // PacketInfo is passed to SentPacket when signaling this packet is sent. PacketInfo info_signaled_after_sent; + // True if this is a batchable packet. Batchable packets are collected at low + // levels and sent first when their AsyncPacketSocket receives a + // OnSendBatchComplete call. + bool batchable = false; + // True if this is the last packet of a batch. + bool last_packet_in_batch = false; }; // Provides the ability to receive packets asynchronously. Sends are not