Stop Posting tasks when we don't need to.

Under the combined network/worker thread project, tasks
are unnecessarily posted to the same thread.

This CL reaps 90% overhead savings in sent packet feedback
as measured with Perfetto under a 49p Meet call.

The identity of the posted calls was uncovered with WebRTC/Chrome's
new location-aware tracing.

TESTED=presubmit + local Meet calls.

Bug: chromium:1373439
Change-Id: I0c43aa4de884831838747d52b21c45fd360106e8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295780
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39484}
This commit is contained in:
Markus Handell 2023-03-03 16:38:10 +01:00 committed by WebRTC LUCI CQ
parent 78e1388ea7
commit eb277527f0
3 changed files with 88 additions and 24 deletions

View File

@ -233,6 +233,12 @@ struct NetworkControlUpdate {
NetworkControlUpdate();
NetworkControlUpdate(const NetworkControlUpdate&);
~NetworkControlUpdate();
bool has_updates() const {
return congestion_window.has_value() || pacer_config.has_value() ||
!probe_cluster_configs.empty() || target_rate.has_value();
}
absl::optional<DataSize> congestion_window;
absl::optional<PacerConfig> pacer_config;
std::vector<ProbeClusterConfig> probe_cluster_configs;

View File

@ -181,12 +181,19 @@ void RtpTransportControllerSend::UpdateControlState() {
}
void RtpTransportControllerSend::UpdateCongestedState() {
if (auto update = GetCongestedStateUpdate()) {
is_congested_ = update.value();
pacer_.SetCongested(update.value());
}
}
absl::optional<bool> RtpTransportControllerSend::GetCongestedStateUpdate()
const {
bool congested = transport_feedback_adapter_.GetOutstandingData() >=
congestion_window_size_;
if (congested != is_congested_) {
is_congested_ = congested;
pacer_.SetCongested(congested);
}
if (congested != is_congested_)
return congested;
return absl::nullopt;
}
MaybeWorkerThread* RtpTransportControllerSend::GetWorkerQueue() {
@ -392,27 +399,73 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
// Normally called on the network thread !
// TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and
// simplify task posting logic when the combined network/worker project
// launches.
if (TaskQueueBase::Current() != task_queue_.TaskQueueForPost()) {
// We can't use SafeTask here if we are using an owned task queue, because
// the safety flag will be destroyed when RtpTransportControllerSend is
// destroyed on the worker thread. But we must use SafeTask if we are using
// the worker thread, since the worker thread outlives
// RtpTransportControllerSend.
task_queue_.TaskQueueForPost()->PostTask(
task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
ProcessSentPacket(sent_packet, /*posted_to_worker=*/true);
}));
return;
}
// We can not use SafeTask here if we are using an owned task queue, because
// the safety flag will be destroyed when RtpTransportControllerSend is
// destroyed on the worker thread. But we must use SafeTask if we are using
// the worker thread, since the worker thread outlive
// RtpTransportControllerSend.
task_queue_.TaskQueueForPost()->PostTask(
task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (packet_msg) {
// Only update outstanding data if:
// 1. Packet feedback is used.
// 2. The packet has not yet received an acknowledgement.
// 3. It is not a retransmission of an earlier packet.
UpdateCongestedState();
if (controller_)
PostUpdates(controller_->OnSentPacket(*packet_msg));
}
}));
RTC_DCHECK_RUN_ON(&task_queue_);
ProcessSentPacket(sent_packet, /*posted_to_worker=*/false);
}
// RTC_RUN_ON(task_queue_)
void RtpTransportControllerSend::ProcessSentPacket(
const rtc::SentPacket& sent_packet,
bool posted_to_worker) {
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (!packet_msg)
return;
auto congestion_update = GetCongestedStateUpdate();
NetworkControlUpdate control_update;
if (controller_)
control_update = controller_->OnSentPacket(*packet_msg);
if (!congestion_update && !control_update.has_updates())
return;
if (posted_to_worker) {
ProcessSentPacketUpdates(std::move(control_update));
} else {
// TODO(bugs.webrtc.org/137439): Aim to remove downstream locks to permit
// removing this PostTask.
// At least in test situations (and possibly in production environments), we
// may get here synchronously with locks taken in PacketRouter::SendPacket.
// Because the pacer may at times synchronously re-enter
// PacketRouter::SendPacket, we need to break the chain here and PostTask to
// get out of the lock. In testing, having updates to process happens pretty
// rarely so we do not usually get here.
task_queue_.TaskQueueForPost()->PostTask(task_queue_.MaybeSafeTask(
safety_.flag(),
[this, control_update = std::move(control_update)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
ProcessSentPacketUpdates(std::move(control_update));
}));
}
}
// RTC_RUN_ON(task_queue_)
void RtpTransportControllerSend::ProcessSentPacketUpdates(
NetworkControlUpdate updates) {
// Only update outstanding data if:
// 1. Packet feedback is used.
// 2. The packet has not yet received an acknowledgement.
// 3. It is not a retransmission of an earlier packet.
UpdateCongestedState();
if (controller_) {
PostUpdates(std::move(updates));
}
}
void RtpTransportControllerSend::OnReceivedPacket(

View File

@ -141,6 +141,11 @@ class RtpTransportControllerSend final
void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_);
void UpdateControlState() RTC_RUN_ON(task_queue_);
void UpdateCongestedState() RTC_RUN_ON(task_queue_);
absl::optional<bool> GetCongestedStateUpdate() const RTC_RUN_ON(task_queue_);
void ProcessSentPacket(const rtc::SentPacket& sent_packet,
bool posted_to_worker) RTC_RUN_ON(task_queue_);
void ProcessSentPacketUpdates(NetworkControlUpdate updates)
RTC_RUN_ON(task_queue_);
Clock* const clock_;
RtcEventLog* const event_log_;