Introduce support for video packet batching.
This CL introduces a new feature enabling video packet send batches. The feature is enabled via PeerConnectionInterface ::RTCConfiguration ::MediaConfig ::enable_send_packet_batching. PacketOptions have been augmented with attribute "batchable" (set for all video packets) and attribute "last_packet_in_batch" which gives injected AsyncPacketSockets a chance to understand when a batch begins and ends. When the feature is on, packets are collected in RtpSenderEgress. On reception of OnBatchComplete from PacingController, RtpSenderEgress sends the collected batch, setting "last_packet_in_batch" to true in the last packet. Bug: chromium:1439830 Change-Id: I1846b9d4a8a0efd227d617691213a2e048bdc8a2 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/303720 Commit-Queue: Markus Handell <handellm@webrtc.org> Reviewed-by: Stefan Holmer <stefan@webrtc.org> Reviewed-by: Erik Språng <sprang@webrtc.org> Cr-Commit-Position: refs/heads/main@{#40012}
This commit is contained in:
parent
24f9a8b398
commit
c8c4a282a6
@ -36,6 +36,10 @@ struct PacketOptions {
|
||||
bool is_retransmit = false;
|
||||
bool included_in_feedback = false;
|
||||
bool included_in_allocation = false;
|
||||
// Whether this packet can be part of a packet batch at lower levels.
|
||||
bool batchable = false;
|
||||
// Whether this packet is the last of a batch.
|
||||
bool last_packet_in_batch = false;
|
||||
};
|
||||
|
||||
class Transport {
|
||||
|
||||
@ -81,6 +81,9 @@ struct CallConfig {
|
||||
|
||||
// The burst interval of the pacer, see TaskQueuePacedSender constructor.
|
||||
absl::optional<TimeDelta> pacer_burst_interval;
|
||||
|
||||
// Enables send packet batching from the egress RTP sender.
|
||||
bool enable_send_packet_batching = false;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
@ -159,6 +159,9 @@ struct RtpConfig {
|
||||
// RTCP CNAME, see RFC 3550.
|
||||
std::string c_name;
|
||||
|
||||
// Enables send packet batching from the egress RTP sender.
|
||||
bool enable_send_packet_batching = false;
|
||||
|
||||
bool IsMediaSsrc(uint32_t ssrc) const;
|
||||
bool IsRtxSsrc(uint32_t ssrc) const;
|
||||
bool IsFlexfecSsrc(uint32_t ssrc) const;
|
||||
|
||||
@ -235,6 +235,8 @@ std::vector<RtpStreamSender> CreateRtpStreamSenders(
|
||||
configuration.extmap_allow_mixed = rtp_config.extmap_allow_mixed;
|
||||
configuration.rtcp_report_interval_ms = rtcp_report_interval_ms;
|
||||
configuration.field_trials = &trials;
|
||||
configuration.enable_send_packet_batching =
|
||||
rtp_config.enable_send_packet_batching;
|
||||
|
||||
std::vector<RtpStreamSender> rtp_streams;
|
||||
|
||||
|
||||
@ -203,6 +203,8 @@ void MediaChannel::SendRtp(const uint8_t* data,
|
||||
[this, packet_id = options.packet_id,
|
||||
included_in_feedback = options.included_in_feedback,
|
||||
included_in_allocation = options.included_in_allocation,
|
||||
batchable = options.batchable,
|
||||
last_packet_in_batch = options.last_packet_in_batch,
|
||||
packet = rtc::CopyOnWriteBuffer(data, len, kMaxRtpPacketLen)]() mutable {
|
||||
rtc::PacketOptions rtc_options;
|
||||
rtc_options.packet_id = packet_id;
|
||||
@ -213,6 +215,8 @@ void MediaChannel::SendRtp(const uint8_t* data,
|
||||
included_in_feedback;
|
||||
rtc_options.info_signaled_after_sent.included_in_allocation =
|
||||
included_in_allocation;
|
||||
rtc_options.batchable = batchable;
|
||||
rtc_options.last_packet_in_batch = last_packet_in_batch;
|
||||
SendPacket(&packet, rtc_options);
|
||||
};
|
||||
|
||||
|
||||
@ -62,6 +62,9 @@ struct MediaConfig {
|
||||
|
||||
// Time interval between RTCP report for video
|
||||
int rtcp_report_interval_ms = 1000;
|
||||
|
||||
// Enables send packet batching from the egress RTP sender.
|
||||
bool enable_send_packet_batching = false;
|
||||
} video;
|
||||
|
||||
// Audio-specific config.
|
||||
@ -82,6 +85,8 @@ struct MediaConfig {
|
||||
video.experiment_cpu_load_estimator ==
|
||||
o.video.experiment_cpu_load_estimator &&
|
||||
video.rtcp_report_interval_ms == o.video.rtcp_report_interval_ms &&
|
||||
video.enable_send_packet_batching ==
|
||||
o.video.enable_send_packet_batching &&
|
||||
audio.rtcp_report_interval_ms == o.audio.rtcp_report_interval_ms;
|
||||
}
|
||||
|
||||
|
||||
@ -1386,6 +1386,8 @@ bool WebRtcVideoChannel::AddSendStream(const StreamParams& sp) {
|
||||
config.crypto_options = crypto_options_;
|
||||
config.rtp.extmap_allow_mixed = ExtmapAllowMixed();
|
||||
config.rtcp_report_interval_ms = video_config_.rtcp_report_interval_ms;
|
||||
config.rtp.enable_send_packet_batching =
|
||||
video_config_.enable_send_packet_batching;
|
||||
|
||||
WebRtcVideoSendStream* stream = new WebRtcVideoSendStream(
|
||||
call_, sp, std::move(config), default_send_options_,
|
||||
|
||||
@ -62,6 +62,7 @@ rtc_library("pacing") {
|
||||
"../utility:utility",
|
||||
]
|
||||
absl_deps = [
|
||||
"//third_party/abseil-cpp/absl/cleanup",
|
||||
"//third_party/abseil-cpp/absl/memory",
|
||||
"//third_party/abseil-cpp/absl/strings",
|
||||
"//third_party/abseil-cpp/absl/types:optional",
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/cleanup/cleanup.h"
|
||||
#include "absl/strings/match.h"
|
||||
#include "modules/pacing/bitrate_prober.h"
|
||||
#include "modules/pacing/interval_budget.h"
|
||||
@ -374,6 +375,9 @@ Timestamp PacingController::NextSendTime() const {
|
||||
}
|
||||
|
||||
void PacingController::ProcessPackets() {
|
||||
absl::Cleanup cleanup = [packet_sender = packet_sender_] {
|
||||
packet_sender->OnBatchComplete();
|
||||
};
|
||||
const Timestamp now = CurrentTime();
|
||||
Timestamp target_send_time = now;
|
||||
|
||||
|
||||
@ -52,6 +52,8 @@ class PacingController {
|
||||
virtual std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() = 0;
|
||||
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
|
||||
DataSize size) = 0;
|
||||
// TODO(bugs.webrtc.org/1439830): Make pure virtual once subclasses adapt.
|
||||
virtual void OnBatchComplete() {}
|
||||
|
||||
// TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects
|
||||
// have been updated.
|
||||
|
||||
@ -138,6 +138,7 @@ class MockPacingControllerCallback : public PacingController::PacketSender {
|
||||
GetRtxSsrcForMedia,
|
||||
(uint32_t),
|
||||
(const, override));
|
||||
MOCK_METHOD(void, OnBatchComplete, (), (override));
|
||||
};
|
||||
|
||||
// Mock callback implementing the raw api.
|
||||
@ -165,6 +166,7 @@ class MockPacketSender : public PacingController::PacketSender {
|
||||
GetRtxSsrcForMedia,
|
||||
(uint32_t),
|
||||
(const, override));
|
||||
MOCK_METHOD(void, OnBatchComplete, (), (override));
|
||||
};
|
||||
|
||||
class PacingControllerPadding : public PacingController::PacketSender {
|
||||
@ -202,6 +204,8 @@ class PacingControllerPadding : public PacingController::PacketSender {
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
void OnBatchComplete() override {}
|
||||
|
||||
size_t padding_sent() { return padding_sent_; }
|
||||
size_t total_bytes_sent() { return total_bytes_sent_; }
|
||||
|
||||
@ -260,6 +264,7 @@ class PacingControllerProbing : public PacingController::PacketSender {
|
||||
absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
|
||||
return absl::nullopt;
|
||||
}
|
||||
void OnBatchComplete() override {}
|
||||
|
||||
int packets_sent() const { return packets_sent_; }
|
||||
int padding_packets_sent() const { return padding_packets_sent_; }
|
||||
@ -1454,6 +1459,13 @@ TEST_F(PacingControllerTest, PaddingOveruse) {
|
||||
pacer->ProcessPackets();
|
||||
}
|
||||
|
||||
TEST_F(PacingControllerTest, ProvidesOnBatchCompleteToPacketSender) {
|
||||
MockPacketSender callback;
|
||||
auto pacer = std::make_unique<PacingController>(&clock_, &callback, trials_);
|
||||
EXPECT_CALL(callback, OnBatchComplete);
|
||||
pacer->ProcessPackets();
|
||||
}
|
||||
|
||||
TEST_F(PacingControllerTest, ProbeClusterId) {
|
||||
MockPacketSender callback;
|
||||
uint32_t ssrc = 12346;
|
||||
|
||||
@ -90,6 +90,7 @@ void PacketRouter::RemoveSendRtpModuleFromMap(uint32_t ssrc) {
|
||||
auto it = send_modules_map_.find(ssrc);
|
||||
RTC_DCHECK(it != send_modules_map_.end());
|
||||
send_modules_list_.remove(it->second);
|
||||
RTC_CHECK(modules_used_in_current_batch_.empty());
|
||||
send_modules_map_.erase(it);
|
||||
}
|
||||
|
||||
@ -166,6 +167,7 @@ void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
|
||||
return;
|
||||
}
|
||||
modules_used_in_current_batch_.insert(rtp_module);
|
||||
|
||||
// Sending succeeded.
|
||||
|
||||
@ -184,6 +186,16 @@ void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
}
|
||||
}
|
||||
|
||||
void PacketRouter::OnBatchComplete() {
|
||||
RTC_DCHECK_RUN_ON(&thread_checker_);
|
||||
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
|
||||
"PacketRouter::OnBatchComplete");
|
||||
for (auto& module : modules_used_in_current_batch_) {
|
||||
module->OnBatchComplete();
|
||||
}
|
||||
modules_used_in_current_batch_.clear();
|
||||
}
|
||||
|
||||
std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::FetchFec() {
|
||||
RTC_DCHECK_RUN_ON(&thread_checker_);
|
||||
std::vector<std::unique_ptr<RtpPacketToSend>> fec_packets =
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
@ -62,6 +63,7 @@ class PacketRouter : public PacingController::PacketSender {
|
||||
uint32_t ssrc,
|
||||
rtc::ArrayView<const uint16_t> sequence_numbers) override;
|
||||
absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const override;
|
||||
void OnBatchComplete() override;
|
||||
|
||||
uint16_t CurrentTransportSequenceNumber() const;
|
||||
|
||||
@ -108,6 +110,8 @@ class PacketRouter : public PacingController::PacketSender {
|
||||
|
||||
std::vector<std::unique_ptr<RtpPacketToSend>> pending_fec_packets_
|
||||
RTC_GUARDED_BY(thread_checker_);
|
||||
std::set<RtpRtcpInterface*> modules_used_in_current_batch_
|
||||
RTC_GUARDED_BY(thread_checker_);
|
||||
};
|
||||
} // namespace webrtc
|
||||
#endif // MODULES_PACING_PACKET_ROUTER_H_
|
||||
|
||||
@ -252,6 +252,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) {
|
||||
|
||||
// If the last active module is removed, and no module sends media before
|
||||
// the next padding request, and arbitrary module will be selected.
|
||||
packet_router_.OnBatchComplete();
|
||||
packet_router_.RemoveSendRtpModule(&rtp_2);
|
||||
|
||||
// Send on and then remove all remaining modules.
|
||||
@ -301,6 +302,7 @@ TEST_F(PacketRouterTest, AllocatesTransportSequenceNumbers) {
|
||||
packet_router.CurrentTransportSequenceNumber());
|
||||
}
|
||||
|
||||
packet_router.OnBatchComplete();
|
||||
packet_router.RemoveSendRtpModule(&rtp_1);
|
||||
}
|
||||
|
||||
@ -346,7 +348,7 @@ TEST_F(PacketRouterTest, SendPacketWithoutTransportSequenceNumbers) {
|
||||
_))
|
||||
.WillOnce(Return(true));
|
||||
packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
|
||||
|
||||
packet_router_.OnBatchComplete();
|
||||
packet_router_.RemoveSendRtpModule(&rtp_1);
|
||||
}
|
||||
|
||||
@ -390,6 +392,7 @@ TEST_F(PacketRouterTest, SendPacketAssignsTransportSequenceNumbers) {
|
||||
.WillOnce(Return(true));
|
||||
packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
|
||||
|
||||
packet_router_.OnBatchComplete();
|
||||
packet_router_.RemoveSendRtpModule(&rtp_1);
|
||||
packet_router_.RemoveSendRtpModule(&rtp_2);
|
||||
}
|
||||
@ -430,6 +433,7 @@ TEST_F(PacketRouterTest, DoesNotIncrementTransportSequenceNumberOnSendFailure) {
|
||||
.WillOnce(Return(true));
|
||||
packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
|
||||
|
||||
packet_router_.OnBatchComplete();
|
||||
packet_router_.RemoveSendRtpModule(&rtp);
|
||||
}
|
||||
|
||||
@ -495,6 +499,24 @@ TEST_F(PacketRouterTest, ReportsRtxSsrc) {
|
||||
packet_router_.RemoveSendRtpModule(&rtp_2);
|
||||
}
|
||||
|
||||
TEST_F(PacketRouterTest, RoutesBatchCompleteToActiveModules) {
|
||||
NiceMock<MockRtpRtcpInterface> rtp_1;
|
||||
NiceMock<MockRtpRtcpInterface> rtp_2;
|
||||
constexpr uint32_t kSsrc1 = 4711;
|
||||
constexpr uint32_t kSsrc2 = 1234;
|
||||
ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
|
||||
ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
|
||||
packet_router_.AddSendRtpModule(&rtp_1, false);
|
||||
packet_router_.AddSendRtpModule(&rtp_2, false);
|
||||
EXPECT_CALL(rtp_1, TrySendPacket).WillOnce(Return(true));
|
||||
packet_router_.SendPacket(BuildRtpPacket(kSsrc1), PacedPacketInfo());
|
||||
EXPECT_CALL(rtp_1, OnBatchComplete);
|
||||
EXPECT_CALL(rtp_2, OnBatchComplete).Times(0);
|
||||
packet_router_.OnBatchComplete();
|
||||
packet_router_.RemoveSendRtpModule(&rtp_1);
|
||||
packet_router_.RemoveSendRtpModule(&rtp_2);
|
||||
}
|
||||
|
||||
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
|
||||
using PacketRouterDeathTest = PacketRouterTest;
|
||||
TEST_F(PacketRouterDeathTest, DoubleRegistrationOfSendModuleDisallowed) {
|
||||
|
||||
@ -88,6 +88,7 @@ class MockRtpRtcpInterface : public RtpRtcpInterface {
|
||||
(std::unique_ptr<RtpPacketToSend> packet,
|
||||
const PacedPacketInfo& pacing_info),
|
||||
(override));
|
||||
MOCK_METHOD(void, OnBatchComplete, (), (override));
|
||||
MOCK_METHOD(void,
|
||||
SetFecProtectionParams,
|
||||
(const FecProtectionParams& delta_params,
|
||||
|
||||
@ -135,6 +135,8 @@ class ABSL_DEPRECATED("") ModuleRtpRtcpImpl
|
||||
bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
const PacedPacketInfo& pacing_info) override;
|
||||
|
||||
void OnBatchComplete() override {}
|
||||
|
||||
void SetFecProtectionParams(const FecProtectionParams& delta_params,
|
||||
const FecProtectionParams& key_params) override;
|
||||
|
||||
|
||||
@ -344,6 +344,11 @@ bool ModuleRtpRtcpImpl2::TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
return true;
|
||||
}
|
||||
|
||||
void ModuleRtpRtcpImpl2::OnBatchComplete() {
|
||||
RTC_DCHECK(rtp_sender_);
|
||||
rtp_sender_->packet_sender.OnBatchComplete();
|
||||
}
|
||||
|
||||
void ModuleRtpRtcpImpl2::SetFecProtectionParams(
|
||||
const FecProtectionParams& delta_params,
|
||||
const FecProtectionParams& key_params) {
|
||||
|
||||
@ -146,6 +146,7 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
|
||||
|
||||
bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
const PacedPacketInfo& pacing_info) override;
|
||||
void OnBatchComplete() override;
|
||||
|
||||
void SetFecProtectionParams(const FecProtectionParams& delta_params,
|
||||
const FecProtectionParams& key_params) override;
|
||||
|
||||
@ -148,6 +148,9 @@ class RtpRtcpInterface : public RtcpFeedbackSenderInterface {
|
||||
// not negotiated. If the RID and Repaired RID extensions are not
|
||||
// registered, the RID will not be sent.
|
||||
std::string rid;
|
||||
|
||||
// Enables send packet batching from the egress RTP sender.
|
||||
bool enable_send_packet_batching = false;
|
||||
};
|
||||
|
||||
// Stats for RTCP sender reports (SR) for a specific SSRC.
|
||||
@ -317,6 +320,10 @@ class RtpRtcpInterface : public RtcpFeedbackSenderInterface {
|
||||
virtual bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
const PacedPacketInfo& pacing_info) = 0;
|
||||
|
||||
// Notifies that a batch of packet sends is completed. The implementation can
|
||||
// use this to optimize packet sending.
|
||||
virtual void OnBatchComplete() = 0;
|
||||
|
||||
// Update the FEC protection parameters to use for delta- and key-frames.
|
||||
// Only used when deferred FEC is active.
|
||||
virtual void SetFecProtectionParams(
|
||||
|
||||
@ -66,7 +66,8 @@ void RtpSenderEgress::NonPacedPacketSender::PrepareForSend(
|
||||
|
||||
RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
|
||||
RtpPacketHistory* packet_history)
|
||||
: worker_queue_(TaskQueueBase::Current()),
|
||||
: enable_send_packet_batching_(config.enable_send_packet_batching),
|
||||
worker_queue_(TaskQueueBase::Current()),
|
||||
ssrc_(config.local_media_ssrc),
|
||||
rtx_ssrc_(config.rtx_send_ssrc),
|
||||
flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
|
||||
@ -76,9 +77,7 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
|
||||
packet_history_(packet_history),
|
||||
transport_(config.outgoing_transport),
|
||||
event_log_(config.event_log),
|
||||
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
|
||||
is_audio_(config.audio),
|
||||
#endif
|
||||
need_rtp_packet_infos_(config.need_rtp_packet_infos),
|
||||
fec_generator_(config.fec_generator),
|
||||
transport_feedback_observer_(config.transport_feedback_callback),
|
||||
@ -139,13 +138,13 @@ void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
RTC_DCHECK(packet->retransmitted_sequence_number().has_value());
|
||||
}
|
||||
|
||||
const uint32_t packet_ssrc = packet->Ssrc();
|
||||
const Timestamp now = clock_->CurrentTime();
|
||||
|
||||
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
|
||||
worker_queue_->PostTask(SafeTask(
|
||||
task_safety_.flag(),
|
||||
[this, now, packet_ssrc]() { BweTestLoggingPlot(now, packet_ssrc); }));
|
||||
worker_queue_->PostTask(SafeTask(task_safety_.flag(),
|
||||
[this, now, packet_ssrc = packet->Ssrc()]() {
|
||||
BweTestLoggingPlot(now, packet_ssrc);
|
||||
}));
|
||||
#endif
|
||||
|
||||
if (need_rtp_packet_infos_ &&
|
||||
@ -225,6 +224,26 @@ void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
}
|
||||
}
|
||||
|
||||
auto compound_packet = Packet{std::move(packet), pacing_info, now};
|
||||
if (enable_send_packet_batching_ && !is_audio_) {
|
||||
packets_to_send_.push_back(std::move(compound_packet));
|
||||
} else {
|
||||
CompleteSendPacket(compound_packet, false);
|
||||
}
|
||||
}
|
||||
|
||||
void RtpSenderEgress::OnBatchComplete() {
|
||||
RTC_DCHECK_RUN_ON(&pacer_checker_);
|
||||
for (auto& packet : packets_to_send_) {
|
||||
CompleteSendPacket(packet, &packet == &packets_to_send_.back());
|
||||
}
|
||||
packets_to_send_.clear();
|
||||
}
|
||||
|
||||
void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet,
|
||||
bool last_in_batch) {
|
||||
auto& [packet, pacing_info, now] = compound_packet;
|
||||
|
||||
const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
|
||||
packet->packet_type() == RtpPacketMediaType::kVideo;
|
||||
|
||||
@ -246,12 +265,14 @@ void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
|
||||
options.additional_data = packet->additional_data();
|
||||
|
||||
const uint32_t packet_ssrc = packet->Ssrc();
|
||||
if (packet->packet_type() != RtpPacketMediaType::kPadding &&
|
||||
packet->packet_type() != RtpPacketMediaType::kRetransmission) {
|
||||
UpdateDelayStatistics(packet->capture_time(), now, packet_ssrc);
|
||||
UpdateOnSendPacket(options.packet_id, packet->capture_time(), packet_ssrc);
|
||||
}
|
||||
|
||||
options.batchable = enable_send_packet_batching_ && !is_audio_;
|
||||
options.last_packet_in_batch = last_in_batch;
|
||||
const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
|
||||
|
||||
// Put packet in retransmission history or update pending status even if
|
||||
@ -279,9 +300,9 @@ void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
// TODO(bugs.webrtc.org/137439): clean up task posting when the combined
|
||||
// network/worker project launches.
|
||||
if (TaskQueueBase::Current() != worker_queue_) {
|
||||
worker_queue_->PostTask(
|
||||
SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type,
|
||||
counter = std::move(counter), size]() {
|
||||
worker_queue_->PostTask(SafeTask(
|
||||
task_safety_.flag(), [this, now = now, packet_ssrc, packet_type,
|
||||
counter = std::move(counter), size]() {
|
||||
RTC_DCHECK_RUN_ON(worker_queue_);
|
||||
UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter),
|
||||
size);
|
||||
|
||||
@ -67,6 +67,7 @@ class RtpSenderEgress {
|
||||
|
||||
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
|
||||
const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_);
|
||||
void OnBatchComplete();
|
||||
uint32_t Ssrc() const { return ssrc_; }
|
||||
absl::optional<uint32_t> RtxSsrc() const { return rtx_ssrc_; }
|
||||
absl::optional<uint32_t> FlexFecSsrc() const { return flexfec_ssrc_; }
|
||||
@ -100,6 +101,13 @@ class RtpSenderEgress {
|
||||
rtc::ArrayView<const uint16_t> sequence_numbers);
|
||||
|
||||
private:
|
||||
struct Packet {
|
||||
std::unique_ptr<RtpPacketToSend> rtp_packet;
|
||||
PacedPacketInfo info;
|
||||
Timestamp now;
|
||||
};
|
||||
void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch)
|
||||
RTC_LOCKS_EXCLUDED(lock_) RTC_RUN_ON(pacer_checker_);
|
||||
RtpSendRates GetSendRatesLocked(Timestamp now) const
|
||||
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
|
||||
bool HasCorrectSsrc(const RtpPacketToSend& packet) const;
|
||||
@ -128,6 +136,7 @@ class RtpSenderEgress {
|
||||
// Called on a timer, once a second, on the worker_queue_.
|
||||
void PeriodicUpdate();
|
||||
|
||||
const bool enable_send_packet_batching_;
|
||||
TaskQueueBase* const worker_queue_;
|
||||
RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_checker_;
|
||||
const uint32_t ssrc_;
|
||||
@ -138,9 +147,7 @@ class RtpSenderEgress {
|
||||
RtpPacketHistory* const packet_history_;
|
||||
Transport* const transport_;
|
||||
RtcEventLog* const event_log_;
|
||||
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
|
||||
const bool is_audio_;
|
||||
#endif
|
||||
const bool need_rtp_packet_infos_;
|
||||
VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(pacer_checker_);
|
||||
absl::optional<uint16_t> last_sent_seq_ RTC_GUARDED_BY(pacer_checker_);
|
||||
@ -178,6 +185,7 @@ class RtpSenderEgress {
|
||||
const std::unique_ptr<RtpSequenceNumberMap> rtp_sequence_number_map_
|
||||
RTC_GUARDED_BY(worker_queue_);
|
||||
RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_);
|
||||
std::vector<Packet> packets_to_send_ RTC_GUARDED_BY(pacer_checker_);
|
||||
ScopedTaskSafety task_safety_;
|
||||
};
|
||||
|
||||
|
||||
@ -35,9 +35,10 @@ namespace webrtc {
|
||||
namespace {
|
||||
|
||||
using ::testing::_;
|
||||
using ::testing::AllOf;
|
||||
using ::testing::Field;
|
||||
using ::testing::InSequence;
|
||||
using ::testing::NiceMock;
|
||||
using ::testing::Optional;
|
||||
using ::testing::StrictMock;
|
||||
|
||||
constexpr Timestamp kStartTime = Timestamp::Millis(123456789);
|
||||
@ -96,12 +97,14 @@ class TestTransport : public Transport {
|
||||
public:
|
||||
explicit TestTransport(RtpHeaderExtensionMap* extensions)
|
||||
: total_data_sent_(DataSize::Zero()), extensions_(extensions) {}
|
||||
MOCK_METHOD(void, SentRtp, (const PacketOptions& options), ());
|
||||
bool SendRtp(const uint8_t* packet,
|
||||
size_t length,
|
||||
const PacketOptions& options) override {
|
||||
total_data_sent_ += DataSize::Bytes(length);
|
||||
last_packet_.emplace(rtc::MakeArrayView(packet, length), options,
|
||||
extensions_);
|
||||
SentRtp(options);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -133,6 +136,7 @@ class RtpSenderEgressTest : public ::testing::Test {
|
||||
|
||||
RtpRtcpInterface::Configuration DefaultConfig() {
|
||||
RtpRtcpInterface::Configuration config;
|
||||
config.audio = false;
|
||||
config.clock = clock_;
|
||||
config.outgoing_transport = &transport_;
|
||||
config.local_media_ssrc = kSsrc;
|
||||
@ -175,7 +179,7 @@ class RtpSenderEgressTest : public ::testing::Test {
|
||||
NiceMock<MockSendPacketObserver> send_packet_observer_;
|
||||
NiceMock<MockTransportFeedbackObserver> feedback_observer_;
|
||||
RtpHeaderExtensionMap header_extensions_;
|
||||
TestTransport transport_;
|
||||
NiceMock<TestTransport> transport_;
|
||||
RtpPacketHistory packet_history_;
|
||||
test::ExplicitKeyValueConfig trials_;
|
||||
uint16_t sequence_number_;
|
||||
@ -209,6 +213,43 @@ TEST_F(RtpSenderEgressTest, TransportFeedbackObserverGetsCorrectByteCount) {
|
||||
sender->SendPacket(std::move(packet), PacedPacketInfo());
|
||||
}
|
||||
|
||||
TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenNotBatching) {
|
||||
std::unique_ptr<RtpSenderEgress> sender = CreateRtpSenderEgress();
|
||||
EXPECT_CALL(transport_,
|
||||
SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
|
||||
Field(&PacketOptions::batchable, false))));
|
||||
sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
|
||||
}
|
||||
|
||||
TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenBatchingWithAudio) {
|
||||
auto config = DefaultConfig();
|
||||
config.enable_send_packet_batching = true;
|
||||
config.audio = true;
|
||||
auto sender = std::make_unique<RtpSenderEgress>(config, &packet_history_);
|
||||
EXPECT_CALL(transport_,
|
||||
SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
|
||||
Field(&PacketOptions::batchable, false))))
|
||||
.Times(2);
|
||||
sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
|
||||
sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
|
||||
}
|
||||
|
||||
TEST_F(RtpSenderEgressTest, CollectsPacketsWhenBatchingWithVideo) {
|
||||
auto config = DefaultConfig();
|
||||
config.enable_send_packet_batching = true;
|
||||
auto sender = std::make_unique<RtpSenderEgress>(config, &packet_history_);
|
||||
sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
|
||||
sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
|
||||
InSequence s;
|
||||
EXPECT_CALL(transport_,
|
||||
SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
|
||||
Field(&PacketOptions::batchable, true))));
|
||||
EXPECT_CALL(transport_,
|
||||
SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, true),
|
||||
Field(&PacketOptions::batchable, true))));
|
||||
sender->OnBatchComplete();
|
||||
}
|
||||
|
||||
TEST_F(RtpSenderEgressTest, PacketOptionsIsRetransmitSetByPacketType) {
|
||||
std::unique_ptr<RtpSenderEgress> sender = CreateRtpSenderEgress();
|
||||
|
||||
|
||||
@ -54,6 +54,12 @@ struct RTC_EXPORT PacketOptions {
|
||||
PacketTimeUpdateParams packet_time_params;
|
||||
// PacketInfo is passed to SentPacket when signaling this packet is sent.
|
||||
PacketInfo info_signaled_after_sent;
|
||||
// True if this is a batchable packet. Batchable packets are collected at low
|
||||
// levels and sent first when their AsyncPacketSocket receives a
|
||||
// OnSendBatchComplete call.
|
||||
bool batchable = false;
|
||||
// True if this is the last packet of a batch.
|
||||
bool last_packet_in_batch = false;
|
||||
};
|
||||
|
||||
// Provides the ability to receive packets asynchronously. Sends are not
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user