webrtc_m130/modules/pacing/paced_sender.cc
Erik Språng 739a5b3692 Refactors BitrateProber with unit types and absolute probe time.
Using unit types improves readability and some conversion in PacedSender
can be removed.

TimeUntilNextProbe() is replaced by NextProbeTime(), so returning an
absolute time rather than a delta. This fits better with the upcoming
TaskQueue based pacer, and is also what is already stored internally
in BitrateProber.

Bug: webrtc:10809
Change-Id: I5a4e289d2b53e99d3c0a2f4b36a966dba759d5cf
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158743
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29670}
2019-10-31 15:34:39 +00:00

180 lines
5.7 KiB
C++

/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/paced_sender.h"
#include <algorithm>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
PacedSender::PacedSender(Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
ProcessThread* process_thread)
: pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
event_log,
field_trials),
clock_(clock),
packet_router_(packet_router),
process_thread_(process_thread) {
if (process_thread_)
process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
}
PacedSender::~PacedSender() {
if (process_thread_)
process_thread_->DeRegisterModule(&module_proxy_);
}
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
rtc::CritScope cs(&critsect_);
return pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
}
void PacedSender::Pause() {
{
rtc::CritScope cs(&critsect_);
pacing_controller_.Pause();
}
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new (longer) estimate for when to call Process().
if (process_thread_)
process_thread_->WakeUp(&module_proxy_);
}
void PacedSender::Resume() {
{
rtc::CritScope cs(&critsect_);
pacing_controller_.Resume();
}
// Tell the process thread to call our TimeUntilNextProcess() method to
// refresh the estimate for when to call Process().
if (process_thread_)
process_thread_->WakeUp(&module_proxy_);
}
void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
rtc::CritScope cs(&critsect_);
pacing_controller_.SetCongestionWindow(congestion_window_size);
}
void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
rtc::CritScope cs(&critsect_);
pacing_controller_.UpdateOutstandingData(outstanding_data);
}
void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
rtc::CritScope cs(&critsect_);
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
}
void PacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
rtc::CritScope cs(&critsect_);
for (auto& packet : packets) {
pacing_controller_.EnqueuePacket(std::move(packet));
}
}
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
rtc::CritScope cs(&critsect_);
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
}
TimeDelta PacedSender::ExpectedQueueTime() const {
rtc::CritScope cs(&critsect_);
return pacing_controller_.ExpectedQueueTime();
}
DataSize PacedSender::QueueSizeData() const {
rtc::CritScope cs(&critsect_);
return pacing_controller_.QueueSizeData();
}
absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
rtc::CritScope cs(&critsect_);
return pacing_controller_.FirstSentPacketTime();
}
TimeDelta PacedSender::OldestPacketWaitTime() const {
rtc::CritScope cs(&critsect_);
return pacing_controller_.OldestPacketWaitTime();
}
int64_t PacedSender::TimeUntilNextProcess() {
rtc::CritScope cs(&critsect_);
// When paused we wake up every 500 ms to send a padding packet to ensure
// we won't get stuck in the paused state due to no feedback being received.
TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
if (pacing_controller_.IsPaused()) {
return std::max(PacingController::kPausedProcessInterval - elapsed_time,
TimeDelta::Zero())
.ms();
}
Timestamp next_probe = pacing_controller_.NextProbeTime();
if (next_probe != Timestamp::PlusInfinity()) {
return std::max(TimeDelta::Zero(), next_probe - clock_->CurrentTime()).ms();
}
const TimeDelta min_packet_limit = TimeDelta::ms(5);
return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}
void PacedSender::Process() {
rtc::CritScope cs(&critsect_);
pacing_controller_.ProcessPackets();
}
void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << process_thread;
RTC_DCHECK(!process_thread || process_thread == process_thread_);
}
void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
rtc::CritScope cs(&critsect_);
pacing_controller_.SetQueueTimeLimit(limit);
}
void PacedSender::SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
critsect_.Leave();
packet_router_->SendPacket(std::move(packet), cluster_info);
critsect_.Enter();
}
std::vector<std::unique_ptr<RtpPacketToSend>> PacedSender::GeneratePadding(
DataSize size) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
critsect_.Leave();
padding_packets = packet_router_->GeneratePadding(size.bytes());
critsect_.Enter();
return padding_packets;
}
} // namespace webrtc