Merges RtpTransportControllerSend with SendSideCongestionController.

Bug: webrtc:9586
Change-Id: I50332f2e128f107e40af7776be0ed530e20774d9
Reviewed-on: https://webrtc-review.googlesource.com/c/113183
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25922}
This commit is contained in:
Sebastian Jansson 2018-12-05 17:35:35 +01:00 committed by Commit Bot
parent 722875f72e
commit 87609be863
7 changed files with 502 additions and 1462 deletions

View File

@ -123,6 +123,7 @@ rtc_source_set("rtp_sender") {
"..:webrtc_common",
"../api:fec_controller_api",
"../api:transport_api",
"../api/transport:goog_cc",
"../api/transport:network_control",
"../api/units:data_rate",
"../api/units:time_delta",
@ -131,7 +132,8 @@ rtc_source_set("rtp_sender") {
"../api/video_codecs:video_codecs_api",
"../logging:rtc_event_log_api",
"../modules/congestion_controller",
"../modules/congestion_controller/rtp:congestion_controller",
"../modules/congestion_controller/rtp:control_handler",
"../modules/congestion_controller/rtp:transport_feedback",
"../modules/pacing",
"../modules/rtp_rtcp:rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",

View File

@ -12,14 +12,13 @@
#include "absl/memory/memory.h"
#include "absl/types/optional.h"
#include "api/transport/goog_cc_factory.h"
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "call/rtp_transport_controller_send.h"
#include "call/rtp_video_sender.h"
#include "modules/congestion_controller/include/send_side_congestion_controller.h"
#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
@ -27,35 +26,87 @@
#include "system_wrappers/include/field_trial.h"
namespace webrtc {
class RtpTransportControllerSend::PeriodicTask : public rtc::QueuedTask {
public:
virtual void Stop() = 0;
};
namespace {
static const int64_t kRetransmitWindowSizeMs = 500;
static const size_t kMaxOverheadBytes = 500;
const char kTaskQueueExperiment[] = "WebRTC-TaskQueueCongestionControl";
using TaskQueueController = webrtc::webrtc_cc::SendSideCongestionController;
std::unique_ptr<SendSideCongestionControllerInterface> CreateController(
Clock* clock,
rtc::TaskQueue* task_queue,
webrtc::RtcEventLog* event_log,
PacedSender* pacer,
const BitrateConstraints& bitrate_config,
bool task_queue_controller,
NetworkControllerFactoryInterface* controller_factory) {
if (task_queue_controller) {
RTC_LOG(LS_INFO) << "Using TaskQueue based SSCC";
return absl::make_unique<webrtc::webrtc_cc::SendSideCongestionController>(
clock, task_queue, event_log, pacer, bitrate_config.start_bitrate_bps,
bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps,
controller_factory);
const int64_t PacerQueueUpdateIntervalMs = 25;
TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
int max_bitrate_bps,
int start_bitrate_bps,
const Clock* clock) {
TargetRateConstraints msg;
msg.at_time = Timestamp::ms(clock->TimeInMilliseconds());
msg.min_data_rate =
min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero();
msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps)
: DataRate::Infinity();
if (start_bitrate_bps > 0)
msg.starting_rate = DataRate::bps(start_bitrate_bps);
return msg;
}
TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints,
const Clock* clock) {
return ConvertConstraints(contraints.min_bitrate_bps,
contraints.max_bitrate_bps,
contraints.start_bitrate_bps, clock);
}
// The template closure pattern is based on rtc::ClosureTask.
template <class Closure>
class PeriodicTaskImpl final : public RtpTransportControllerSend::PeriodicTask {
public:
PeriodicTaskImpl(rtc::TaskQueue* task_queue,
int64_t period_ms,
Closure&& closure)
: task_queue_(task_queue),
period_ms_(period_ms),
closure_(std::forward<Closure>(closure)) {}
bool Run() override {
if (!running_)
return true;
closure_();
// absl::WrapUnique lets us repost this task on the TaskQueue.
task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_);
// Return false to tell TaskQueue to not destruct this object, since we have
// taken ownership with absl::WrapUnique.
return false;
}
RTC_LOG(LS_INFO) << "Using Legacy SSCC";
auto cc = absl::make_unique<webrtc::SendSideCongestionController>(
clock, nullptr /* observer */, event_log, pacer);
cc->SignalNetworkState(kNetworkDown);
cc->SetBweBitrates(bitrate_config.min_bitrate_bps,
bitrate_config.start_bitrate_bps,
bitrate_config.max_bitrate_bps);
return std::move(cc);
void Stop() override {
if (task_queue_->IsCurrent()) {
RTC_DCHECK(running_);
running_ = false;
} else {
task_queue_->PostTask([this] { Stop(); });
}
}
private:
rtc::TaskQueue* const task_queue_;
const int64_t period_ms_;
typename std::remove_const<
typename std::remove_reference<Closure>::type>::type closure_;
bool running_ = true;
};
template <class Closure>
static RtpTransportControllerSend::PeriodicTask* StartPeriodicTask(
rtc::TaskQueue* task_queue,
int64_t period_ms,
Closure&& closure) {
auto periodic_task = absl::make_unique<PeriodicTaskImpl<Closure>>(
task_queue, period_ms, std::forward<Closure>(closure));
RtpTransportControllerSend::PeriodicTask* periodic_task_ptr =
periodic_task.get();
task_queue->PostDelayedTask(std::move(periodic_task), period_ms);
return periodic_task_ptr;
}
} // namespace
@ -69,21 +120,35 @@ RtpTransportControllerSend::RtpTransportControllerSend(
bitrate_configurator_(bitrate_config),
process_thread_(ProcessThread::Create("SendControllerThread")),
observer_(nullptr),
transport_feedback_adapter_(clock_),
controller_factory_override_(controller_factory),
controller_factory_fallback_(
absl::make_unique<GoogCcNetworkControllerFactory>(event_log)),
process_interval_(controller_factory_fallback_->GetProcessInterval()),
last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
reset_feedback_on_route_change_(
!field_trial::IsEnabled("WebRTC-Bwe-NoFeedbackReset")),
send_side_bwe_with_overhead_(
webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")),
transport_overhead_bytes_per_packet_(0),
network_available_(false),
periodic_tasks_enabled_(true),
packet_feedback_available_(false),
pacer_queue_update_task_(nullptr),
controller_task_(nullptr),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
task_queue_("rtp_send_controller") {
// Created after task_queue to be able to post to the task queue internally.
send_side_cc_ = CreateController(
clock, &task_queue_, event_log, &pacer_, bitrate_config,
!field_trial::IsDisabled(kTaskQueueExperiment), controller_factory);
initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);
RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
pacer_.SetEstimatedBitrate(bitrate_config.start_bitrate_bps);
process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE);
process_thread_->Start();
}
RtpTransportControllerSend::~RtpTransportControllerSend() {
process_thread_->Stop();
process_thread_->DeRegisterModule(send_side_cc_.get());
process_thread_->DeRegisterModule(&pacer_);
}
@ -126,34 +191,21 @@ void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps,
uint8_t fraction_loss,
int64_t rtt_ms,
int64_t probing_interval_ms) {
// TODO(srte): Skip this step when old SendSideCongestionController is
// deprecated.
TargetTransferRate msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.target_rate = DataRate::bps(bitrate_bps);
msg.network_estimate.at_time = msg.at_time;
msg.network_estimate.bwe_period = TimeDelta::ms(probing_interval_ms);
uint32_t bandwidth_bps;
if (send_side_cc_->AvailableBandwidth(&bandwidth_bps))
msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps);
msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0;
msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms);
// TODO(srte): Remove this interface and push information about bandwidth
// estimation to users of this class, thereby reducing synchronous calls.
RTC_DCHECK_RUN_ON(&task_queue_);
RTC_DCHECK(control_handler_->last_transfer_rate().has_value());
msg.network_estimate =
control_handler_->last_transfer_rate()->network_estimate;
retransmission_rate_limiter_.SetMaxRate(bandwidth_bps);
retransmission_rate_limiter_.SetMaxRate(msg.network_estimate.bandwidth.bps());
if (!task_queue_.IsCurrent()) {
task_queue_.PostTask([this, msg] {
rtc::CritScope cs(&observer_crit_);
// We won't register as observer until we have an observers.
RTC_DCHECK(observer_ != nullptr);
observer_->OnTargetTransferRate(msg);
});
} else {
rtc::CritScope cs(&observer_crit_);
// We won't register as observer until we have an observers.
RTC_DCHECK(observer_ != nullptr);
observer_->OnTargetTransferRate(msg);
}
// We won't register as observer until we have an observers.
RTC_DCHECK(observer_ != nullptr);
observer_->OnTargetTransferRate(msg);
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
@ -166,7 +218,7 @@ PacketRouter* RtpTransportControllerSend::packet_router() {
TransportFeedbackObserver*
RtpTransportControllerSend::transport_feedback_observer() {
return send_side_cc_.get();
return this;
}
RtpPacketSender* RtpTransportControllerSend::packet_sender() {
@ -181,8 +233,12 @@ void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
int min_send_bitrate_bps,
int max_padding_bitrate_bps,
int max_total_bitrate_bps) {
send_side_cc_->SetAllocatedSendBitrateLimits(
min_send_bitrate_bps, max_padding_bitrate_bps, max_total_bitrate_bps);
RTC_DCHECK_RUN_ON(&task_queue_);
streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
streams_config_.max_total_allocated_bitrate =
DataRate::bps(max_total_bitrate_bps);
UpdateStreamsConfig();
}
void RtpTransportControllerSend::SetKeepAliveConfig(
@ -190,31 +246,33 @@ void RtpTransportControllerSend::SetKeepAliveConfig(
keepalive_ = config;
}
void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
send_side_cc_->SetPacingFactor(pacing_factor);
RTC_DCHECK_RUN_ON(&task_queue_);
streams_config_.pacing_factor = pacing_factor;
UpdateStreamsConfig();
}
void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
pacer_.SetQueueTimeLimit(limit_ms);
}
CallStatsObserver* RtpTransportControllerSend::GetCallStatsObserver() {
return send_side_cc_.get();
return this;
}
void RtpTransportControllerSend::RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
send_side_cc_->RegisterPacketFeedbackObserver(observer);
transport_feedback_adapter_.RegisterPacketFeedbackObserver(observer);
}
void RtpTransportControllerSend::DeRegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
send_side_cc_->DeRegisterPacketFeedbackObserver(observer);
transport_feedback_adapter_.DeRegisterPacketFeedbackObserver(observer);
}
void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
TargetTransferRateObserver* observer) {
{
rtc::CritScope cs(&observer_crit_);
task_queue_.PostTask([this, observer] {
RTC_DCHECK_RUN_ON(&task_queue_);
RTC_DCHECK(observer_ == nullptr);
observer_ = observer;
}
send_side_cc_->RegisterNetworkObserver(this);
MaybeCreateControllers();
});
}
void RtpTransportControllerSend::OnNetworkRouteChanged(
const std::string& transport_name,
@ -252,20 +310,49 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
<< " bps, max: " << bitrate_config.max_bitrate_bps
<< " bps.";
RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0);
send_side_cc_->OnNetworkRouteChanged(
network_route, bitrate_config.start_bitrate_bps,
bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps);
if (reset_feedback_on_route_change_)
transport_feedback_adapter_.SetNetworkIds(
network_route.local_network_id, network_route.remote_network_id);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
NetworkRouteChange msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.constraints = ConvertConstraints(bitrate_config, clock_);
task_queue_.PostTask([this, msg] {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg));
} else {
UpdateInitialConstraints(msg.constraints);
}
pacer_.UpdateOutstandingData(0);
});
}
}
void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
send_side_cc_->SignalNetworkState(network_available ? kNetworkUp
: kNetworkDown);
RTC_LOG(LS_INFO) << "SignalNetworkState "
<< (network_available ? "Up" : "Down");
NetworkAvailability msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.network_available = network_available;
task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
network_available_ = msg.network_available;
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg));
control_handler_->OnNetworkAvailability(msg);
} else {
MaybeCreateControllers();
}
});
for (auto& rtp_sender : video_rtp_senders_) {
rtp_sender->OnNetworkAvailability(network_available);
}
}
RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() {
return send_side_cc_->GetBandwidthObserver();
return this;
}
int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const {
return pacer_.QueueInMs();
@ -274,14 +361,30 @@ int64_t RtpTransportControllerSend::GetFirstPacketTimeMs() const {
return pacer_.FirstSentPacketTimeMs();
}
void RtpTransportControllerSend::SetPerPacketFeedbackAvailable(bool available) {
send_side_cc_->SetPerPacketFeedbackAvailable(available);
RTC_DCHECK_RUN_ON(&task_queue_);
packet_feedback_available_ = available;
if (!controller_)
MaybeCreateControllers();
}
void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
send_side_cc_->EnablePeriodicAlrProbing(enable);
task_queue_.PostTask([this, enable]() {
RTC_DCHECK_RUN_ON(&task_queue_);
streams_config_.requests_alr_probing = enable;
UpdateStreamsConfig();
});
}
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
send_side_cc_->OnSentPacket(sent_packet);
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (packet_msg) {
task_queue_.PostTask([this, packet_msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg));
});
}
MaybeUpdateOutstandingData();
}
void RtpTransportControllerSend::SetSdpBitrateParameters(
@ -289,9 +392,16 @@ void RtpTransportControllerSend::SetSdpBitrateParameters(
absl::optional<BitrateConstraints> updated =
bitrate_configurator_.UpdateWithSdpParameters(constraints);
if (updated.has_value()) {
send_side_cc_->SetBweBitrates(updated->min_bitrate_bps,
updated->start_bitrate_bps,
updated->max_bitrate_bps);
TargetRateConstraints msg = ConvertConstraints(*updated, clock_);
task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) {
control_handler_->PostUpdates(
controller_->OnTargetRateConstraints(msg));
} else {
UpdateInitialConstraints(msg);
}
});
} else {
RTC_LOG(LS_VERBOSE)
<< "WebRTC.RtpTransportControllerSend.SetSdpBitrateParameters: "
@ -304,9 +414,16 @@ void RtpTransportControllerSend::SetClientBitratePreferences(
absl::optional<BitrateConstraints> updated =
bitrate_configurator_.UpdateWithClientPreferences(preferences);
if (updated.has_value()) {
send_side_cc_->SetBweBitrates(updated->min_bitrate_bps,
updated->start_bitrate_bps,
updated->max_bitrate_bps);
TargetRateConstraints msg = ConvertConstraints(*updated, clock_);
task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) {
control_handler_->PostUpdates(
controller_->OnTargetRateConstraints(msg));
} else {
UpdateInitialConstraints(msg);
}
});
} else {
RTC_LOG(LS_VERBOSE)
<< "WebRTC.RtpTransportControllerSend.SetClientBitratePreferences: "
@ -321,7 +438,12 @@ void RtpTransportControllerSend::SetAllocatedBitrateWithoutFeedback(
if (field_trial::IsEnabled("WebRTC-Audio-ABWENoTWCC")) {
// TODO(srte): Make sure it's safe to always report this and remove the
// field trial check.
send_side_cc_->SetAllocatedBitrateWithoutFeedback(bitrate_bps);
task_queue_.PostTask([this, bitrate_bps]() {
RTC_DCHECK_RUN_ON(&task_queue_);
streams_config_.unacknowledged_rate_allocation =
DataRate::bps(bitrate_bps);
UpdateStreamsConfig();
});
}
}
@ -339,4 +461,215 @@ void RtpTransportControllerSend::OnTransportOverheadChanged(
transport_overhead_bytes_per_packet);
}
}
void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
RemoteBitrateReport msg;
msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.bandwidth = DataRate::bps(bitrate);
task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg));
});
}
void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
const ReportBlockList& report_blocks,
int64_t rtt_ms,
int64_t now_ms) {
task_queue_.PostTask([this, report_blocks, now_ms]() {
RTC_DCHECK_RUN_ON(&task_queue_);
OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
});
task_queue_.PostTask([this, now_ms, rtt_ms]() {
RTC_DCHECK_RUN_ON(&task_queue_);
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(rtt_ms);
report.smoothed = false;
if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
}
void RtpTransportControllerSend::AddPacket(uint32_t ssrc,
uint16_t sequence_number,
size_t length,
const PacedPacketInfo& pacing_info) {
if (send_side_bwe_with_overhead_) {
length += transport_overhead_bytes_per_packet_;
}
transport_feedback_adapter_.AddPacket(ssrc, sequence_number, length,
pacing_info);
}
void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
absl::optional<TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessTransportFeedback(feedback);
if (feedback_msg) {
task_queue_.PostTask([this, feedback_msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
control_handler_->PostUpdates(
controller_->OnTransportPacketsFeedback(*feedback_msg));
});
}
MaybeUpdateOutstandingData();
}
void RtpTransportControllerSend::OnRttUpdate(int64_t avg_rtt_ms,
int64_t max_rtt_ms) {
int64_t now_ms = clock_->TimeInMilliseconds();
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
report.smoothed = true;
task_queue_.PostTask([this, report]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
}
void RtpTransportControllerSend::MaybeCreateControllers() {
RTC_DCHECK(!controller_);
RTC_DCHECK(!control_handler_);
if (!network_available_ || !observer_)
return;
control_handler_ = absl::make_unique<CongestionControlHandler>(this, &pacer_);
initial_config_.constraints.at_time =
Timestamp::ms(clock_->TimeInMilliseconds());
initial_config_.stream_based_config = streams_config_;
// TODO(srte): Use fallback controller if no feedback is available.
if (controller_factory_override_) {
RTC_LOG(LS_INFO) << "Creating overridden congestion controller";
controller_ = controller_factory_override_->Create(initial_config_);
process_interval_ = controller_factory_override_->GetProcessInterval();
} else {
RTC_LOG(LS_INFO) << "Creating fallback congestion controller";
controller_ = controller_factory_fallback_->Create(initial_config_);
process_interval_ = controller_factory_fallback_->GetProcessInterval();
}
UpdateControllerWithTimeInterval();
StartProcessPeriodicTasks();
}
void RtpTransportControllerSend::UpdateInitialConstraints(
TargetRateConstraints new_contraints) {
if (!new_contraints.starting_rate)
new_contraints.starting_rate = initial_config_.constraints.starting_rate;
RTC_DCHECK(new_contraints.starting_rate);
initial_config_.constraints = new_contraints;
}
void RtpTransportControllerSend::StartProcessPeriodicTasks() {
if (!periodic_tasks_enabled_)
return;
if (!pacer_queue_update_task_) {
pacer_queue_update_task_ =
StartPeriodicTask(&task_queue_, PacerQueueUpdateIntervalMs, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
UpdatePacerQueue();
});
}
if (controller_task_) {
// Stop is not synchronous, but is guaranteed to occur before the first
// invocation of the new controller task started below.
controller_task_->Stop();
controller_task_ = nullptr;
}
if (process_interval_.IsFinite()) {
// The controller task is owned by the task queue and lives until the task
// queue is destroyed or some time after Stop() is called, whichever comes
// first.
controller_task_ =
StartPeriodicTask(&task_queue_, process_interval_.ms(), [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
UpdateControllerWithTimeInterval();
});
}
}
void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
if (controller_) {
ProcessInterval msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
control_handler_->PostUpdates(controller_->OnProcessInterval(msg));
}
}
void RtpTransportControllerSend::UpdatePacerQueue() {
if (control_handler_) {
TimeDelta expected_queue_time = TimeDelta::ms(pacer_.ExpectedQueueTimeMs());
control_handler_->OnPacerQueueUpdate(expected_queue_time);
}
}
void RtpTransportControllerSend::MaybeUpdateOutstandingData() {
DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData();
task_queue_.PostTask([this, in_flight_data]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (control_handler_)
control_handler_->OnOutstandingData(in_flight_data);
});
}
void RtpTransportControllerSend::UpdateStreamsConfig() {
streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
if (controller_)
control_handler_->PostUpdates(
controller_->OnStreamsConfig(streams_config_));
}
void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
const ReportBlockList& report_blocks,
int64_t now_ms) {
if (report_blocks.empty())
return;
int total_packets_lost_delta = 0;
int total_packets_delta = 0;
// Compute the packet loss from all report blocks.
for (const RTCPReportBlock& report_block : report_blocks) {
auto it = last_report_blocks_.find(report_block.source_ssrc);
if (it != last_report_blocks_.end()) {
auto number_of_packets = report_block.extended_highest_sequence_number -
it->second.extended_highest_sequence_number;
total_packets_delta += number_of_packets;
auto lost_delta = report_block.packets_lost - it->second.packets_lost;
total_packets_lost_delta += lost_delta;
}
last_report_blocks_[report_block.source_ssrc] = report_block;
}
// Can only compute delta if there has been previous blocks to compare to. If
// not, total_packets_delta will be unchanged and there's nothing more to do.
if (!total_packets_delta)
return;
int packets_received_delta = total_packets_delta - total_packets_lost_delta;
// To detect lost packets, at least one packet has to be received. This check
// is needed to avoid bandwith detection update in
// VideoSendStreamTest.SuspendBelowMinBitrate
if (packets_received_delta < 1)
return;
Timestamp now = Timestamp::ms(now_ms);
TransportLossReport msg;
msg.packets_lost_delta = total_packets_lost_delta;
msg.packets_received_delta = packets_received_delta;
msg.receive_time = now;
msg.start_time = last_report_block_time_;
msg.end_time = now;
if (controller_)
control_handler_->PostUpdates(controller_->OnTransportLossReport(msg));
last_report_block_time_ = now;
}
} // namespace webrtc

View File

@ -11,6 +11,7 @@
#ifndef CALL_RTP_TRANSPORT_CONTROLLER_SEND_H_
#define CALL_RTP_TRANSPORT_CONTROLLER_SEND_H_
#include <atomic>
#include <map>
#include <memory>
#include <string>
@ -20,15 +21,17 @@
#include "call/rtp_bitrate_configurator.h"
#include "call/rtp_transport_controller_send_interface.h"
#include "call/rtp_video_sender.h"
#include "modules/congestion_controller/include/send_side_congestion_controller_interface.h"
#include "modules/congestion_controller/include/network_changed_observer.h"
#include "modules/congestion_controller/rtp/control_handler.h"
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
#include "modules/pacing/packet_router.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/networkroute.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/task_queue.h"
namespace webrtc {
class Clock;
class FrameEncryptorInterface;
class RtcEventLog;
@ -38,7 +41,10 @@ class RtcEventLog;
// per transport, sharing the same congestion controller.
class RtpTransportControllerSend final
: public RtpTransportControllerSendInterface,
public NetworkChangedObserver {
public NetworkChangedObserver,
public RtcpBandwidthObserver,
public CallStatsObserver,
public TransportFeedbackObserver {
public:
RtpTransportControllerSend(
Clock* clock,
@ -108,7 +114,39 @@ class RtpTransportControllerSend final
void OnTransportOverheadChanged(
size_t transport_overhead_per_packet) override;
// Implements RtcpBandwidthObserver interface
void OnReceivedEstimatedBitrate(uint32_t bitrate) override;
void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks,
int64_t rtt,
int64_t now_ms) override;
// Implements TransportFeedbackObserver interface
void AddPacket(uint32_t ssrc,
uint16_t sequence_number,
size_t length,
const PacedPacketInfo& pacing_info) override;
void OnTransportFeedback(const rtcp::TransportFeedback& feedback) override;
// Implements CallStatsObserver interface
void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override;
class PeriodicTask;
private:
void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
void UpdateInitialConstraints(TargetRateConstraints new_contraints)
RTC_RUN_ON(task_queue_);
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
void MaybeUpdateOutstandingData();
void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
int64_t now_ms)
RTC_RUN_ON(task_queue_);
const Clock* const clock_;
PacketRouter packet_router_;
std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_;
@ -117,9 +155,49 @@ class RtpTransportControllerSend final
RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_;
const std::unique_ptr<ProcessThread> process_thread_;
rtc::CriticalSection observer_crit_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_);
std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
// TODO(srte): Move all access to feedback adapter to task queue.
TransportFeedbackAdapter transport_feedback_adapter_;
NetworkControllerFactoryInterface* const controller_factory_override_
RTC_PT_GUARDED_BY(task_queue_);
const std::unique_ptr<NetworkControllerFactoryInterface>
controller_factory_fallback_ RTC_PT_GUARDED_BY(task_queue_);
std::unique_ptr<CongestionControlHandler> control_handler_
RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_);
std::unique_ptr<NetworkControllerInterface> controller_
RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_);
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
std::map<uint32_t, RTCPReportBlock> last_report_blocks_
RTC_GUARDED_BY(task_queue_);
Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_);
NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_);
StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_);
const bool reset_feedback_on_route_change_;
const bool send_side_bwe_with_overhead_;
// Transport overhead is written by OnNetworkRouteChanged and read by
// AddPacket.
// TODO(srte): Remove atomic when feedback adapter runs on task queue.
std::atomic<size_t> transport_overhead_bytes_per_packet_;
bool network_available_ RTC_GUARDED_BY(task_queue_);
bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_);
bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_);
PeriodicTask* pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_)
RTC_PT_GUARDED_BY(task_queue_);
PeriodicTask* controller_task_ RTC_GUARDED_BY(task_queue_)
RTC_PT_GUARDED_BY(task_queue_);
// Protects access to last_packet_feedback_vector_ in feedback adapter.
// TODO(srte): Remove this checker when feedback adapter runs on task queue.
rtc::RaceChecker worker_race_;
RateLimiter retransmission_rate_limiter_;
// TODO(perkj): |task_queue_| is supposed to replace |process_thread_|.

View File

@ -16,47 +16,6 @@ config("bwe_test_logging") {
}
}
rtc_static_library("congestion_controller") {
visibility = [ "*" ]
configs += [ ":bwe_test_logging" ]
sources = [
"include/send_side_congestion_controller.h",
"send_side_congestion_controller.cc",
]
if (!build_with_chromium && is_clang) {
# Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
suppressed_configs += [ "//build/config/clang:find_bad_constructs" ]
}
deps = [
":control_handler",
":transport_feedback",
"../:congestion_controller",
"../..:module_api",
"../../..:webrtc_common",
"../../../api/transport:goog_cc",
"../../../api/transport:network_control",
"../../../rtc_base:checks",
"../../../rtc_base:rate_limiter",
"../../../rtc_base:rtc_task_queue_api",
"../../../rtc_base:safe_minmax",
"../../../rtc_base:sequenced_task_checker",
"../../../rtc_base/experiments:congestion_controller_experiment",
"../../../rtc_base/network:sent_packet",
"../../../system_wrappers",
"../../../system_wrappers:field_trial",
"../../pacing",
"../../remote_bitrate_estimator",
"../../rtp_rtcp:rtp_rtcp_format",
"//third_party/abseil-cpp/absl/memory",
]
if (!build_with_mozilla) {
deps += [ "../../../rtc_base:rtc_base" ]
}
}
rtc_source_set("control_handler") {
visibility = [ "*" ]
sources = [
@ -122,12 +81,10 @@ if (rtc_include_tests) {
sources = [
"congestion_controller_unittests_helper.cc",
"congestion_controller_unittests_helper.h",
"send_side_congestion_controller_unittest.cc",
"send_time_history_unittest.cc",
"transport_feedback_adapter_unittest.cc",
]
deps = [
":congestion_controller",
":transport_feedback",
"../:congestion_controller",
"../:mock_congestion_controller",

View File

@ -1,219 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_
#define MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <vector>
#include "api/transport/network_control.h"
#include "api/transport/network_types.h"
#include "common_types.h" // NOLINT(build/include)
#include "modules/congestion_controller/include/send_side_congestion_controller_interface.h"
#include "modules/congestion_controller/rtp/control_handler.h"
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
#include "modules/include/module.h"
#include "modules/include/module_common_types.h"
#include "modules/pacing/paced_sender.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/networkroute.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/task_queue.h"
namespace rtc {
struct SentPacket;
}
namespace webrtc {
class Clock;
class RtcEventLog;
namespace webrtc_cc {
namespace send_side_cc_internal {
// TODO(srte): Make sure the PeriodicTask implementation is reusable and move it
// to task_queue.h.
class PeriodicTask : public rtc::QueuedTask {
public:
virtual void Stop() = 0;
};
} // namespace send_side_cc_internal
class SendSideCongestionController
: public SendSideCongestionControllerInterface,
public RtcpBandwidthObserver {
public:
SendSideCongestionController(
const Clock* clock,
rtc::TaskQueue* task_queue,
RtcEventLog* event_log,
PacedSender* pacer,
int start_bitrate_bps,
int min_bitrate_bps,
int max_bitrate_bps,
NetworkControllerFactoryInterface* controller_factory);
~SendSideCongestionController() override;
void RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) override;
void DeRegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) override;
// Currently, there can be at most one observer.
// TODO(nisse): The RegisterNetworkObserver method is needed because we first
// construct this object (as part of RtpTransportControllerSend), then pass a
// reference to Call, which then registers itself as the observer. We should
// try to break this circular chain of references, and make the observer a
// construction time constant.
void RegisterNetworkObserver(NetworkChangedObserver* observer) override;
void SetBweBitrates(int min_bitrate_bps,
int start_bitrate_bps,
int max_bitrate_bps) override;
void SetAllocatedSendBitrateLimits(int64_t min_send_bitrate_bps,
int64_t max_padding_bitrate_bps,
int64_t max_total_bitrate_bps) override;
// Resets the BWE state. Note the first argument is the bitrate_bps.
void OnNetworkRouteChanged(const rtc::NetworkRoute& network_route,
int bitrate_bps,
int min_bitrate_bps,
int max_bitrate_bps) override;
void SignalNetworkState(NetworkState state) override;
RtcpBandwidthObserver* GetBandwidthObserver() override;
bool AvailableBandwidth(uint32_t* bandwidth) const override;
TransportFeedbackObserver* GetTransportFeedbackObserver() override;
void SetPerPacketFeedbackAvailable(bool available) override;
void EnablePeriodicAlrProbing(bool enable) override;
void OnSentPacket(const rtc::SentPacket& sent_packet) override;
// Implements RtcpBandwidthObserver
void OnReceivedEstimatedBitrate(uint32_t bitrate) override;
void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks,
int64_t rtt,
int64_t now_ms) override;
// Implements CallStatsObserver.
void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override;
// Implements Module.
int64_t TimeUntilNextProcess() override;
void Process() override;
// Implements TransportFeedbackObserver.
void AddPacket(uint32_t ssrc,
uint16_t sequence_number,
size_t length,
const PacedPacketInfo& pacing_info) override;
void OnTransportFeedback(const rtcp::TransportFeedback& feedback) override;
std::vector<PacketFeedback> GetTransportFeedbackVector() const;
void SetPacingFactor(float pacing_factor) override;
void SetAllocatedBitrateWithoutFeedback(uint32_t bitrate_bps) override;
protected:
// TODO(srte): The tests should be rewritten to not depend on internals and
// these functions should be removed.
// Since we can't control the timing of the internal task queue, this method
// is used in unit tests to stop the periodic tasks from running unless
// PostPeriodicTasksForTest is called.
void DisablePeriodicTasks();
// Post periodic tasks just once. This allows unit tests to trigger process
// updates immediately.
void PostPeriodicTasksForTest();
// Waits for outstanding tasks to be finished. This allos unit tests to ensure
// that expected callbacks has been called.
void WaitOnTasksForTest();
private:
void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
void MaybeRecreateControllers() RTC_RUN_ON(task_queue_);
void UpdateInitialConstraints(TargetRateConstraints new_contraints)
RTC_RUN_ON(task_queue_);
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
void MaybeUpdateOutstandingData();
void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
int64_t now_ms)
RTC_RUN_ON(task_queue_);
const Clock* const clock_;
// PacedSender is thread safe and doesn't need protection here.
PacedSender* const pacer_;
// TODO(srte): Move all access to feedback adapter to task queue.
TransportFeedbackAdapter transport_feedback_adapter_;
NetworkControllerFactoryInterface* const controller_factory_with_feedback_
RTC_GUARDED_BY(task_queue_);
const std::unique_ptr<NetworkControllerFactoryInterface>
controller_factory_fallback_ RTC_GUARDED_BY(task_queue_);
std::unique_ptr<CongestionControlHandler> control_handler_
RTC_GUARDED_BY(task_queue_);
std::unique_ptr<NetworkControllerInterface> controller_
RTC_GUARDED_BY(task_queue_);
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
std::map<uint32_t, RTCPReportBlock> last_report_blocks_
RTC_GUARDED_BY(task_queue_);
Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_);
NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_);
NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_);
StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_);
const bool reset_feedback_on_route_change_;
const bool send_side_bwe_with_overhead_;
// Transport overhead is written by OnNetworkRouteChanged and read by
// AddPacket.
// TODO(srte): Remove atomic when feedback adapter runs on task queue.
std::atomic<size_t> transport_overhead_bytes_per_packet_;
bool network_available_ RTC_GUARDED_BY(task_queue_);
bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_);
bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_);
send_side_cc_internal::PeriodicTask* pacer_queue_update_task_
RTC_GUARDED_BY(task_queue_);
send_side_cc_internal::PeriodicTask* controller_task_
RTC_GUARDED_BY(task_queue_);
// Protects access to last_packet_feedback_vector_ in feedback adapter.
// TODO(srte): Remove this checker when feedback adapter runs on task queue.
rtc::RaceChecker worker_race_;
rtc::TaskQueue* task_queue_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SendSideCongestionController);
};
} // namespace webrtc_cc
} // namespace webrtc
#endif // MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_

View File

@ -1,588 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h"
#include <algorithm>
#include <functional>
#include <memory>
#include <vector>
#include "absl/memory/memory.h"
#include "api/transport/goog_cc_factory.h"
#include "api/transport/network_types.h"
#include "modules/remote_bitrate_estimator/include/bwe_defines.h"
#include "rtc_base/bind.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/format_macros.h"
#include "rtc_base/logging.h"
#include "rtc_base/network/sent_packet.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/numerics/safe_minmax.h"
#include "rtc_base/rate_limiter.h"
#include "rtc_base/sequenced_task_checker.h"
#include "rtc_base/timeutils.h"
#include "system_wrappers/include/field_trial.h"
using absl::make_unique;
namespace webrtc {
namespace webrtc_cc {
namespace {
using send_side_cc_internal::PeriodicTask;
const int64_t PacerQueueUpdateIntervalMs = 25;
TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
int max_bitrate_bps,
int start_bitrate_bps,
const Clock* clock) {
TargetRateConstraints msg;
msg.at_time = Timestamp::ms(clock->TimeInMilliseconds());
msg.min_data_rate =
min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero();
msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps)
: DataRate::Infinity();
if (start_bitrate_bps > 0)
msg.starting_rate = DataRate::bps(start_bitrate_bps);
return msg;
}
// The template closure pattern is based on rtc::ClosureTask.
template <class Closure>
class PeriodicTaskImpl final : public PeriodicTask {
public:
PeriodicTaskImpl(rtc::TaskQueue* task_queue,
int64_t period_ms,
Closure&& closure)
: task_queue_(task_queue),
period_ms_(period_ms),
closure_(std::forward<Closure>(closure)) {}
bool Run() override {
if (!running_)
return true;
closure_();
// absl::WrapUnique lets us repost this task on the TaskQueue.
task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_);
// Return false to tell TaskQueue to not destruct this object, since we have
// taken ownership with absl::WrapUnique.
return false;
}
void Stop() override {
if (task_queue_->IsCurrent()) {
RTC_DCHECK(running_);
running_ = false;
} else {
task_queue_->PostTask([this] { Stop(); });
}
}
private:
rtc::TaskQueue* const task_queue_;
const int64_t period_ms_;
typename std::remove_const<
typename std::remove_reference<Closure>::type>::type closure_;
bool running_ = true;
};
template <class Closure>
static PeriodicTask* StartPeriodicTask(rtc::TaskQueue* task_queue,
int64_t period_ms,
Closure&& closure) {
auto periodic_task = absl::make_unique<PeriodicTaskImpl<Closure>>(
task_queue, period_ms, std::forward<Closure>(closure));
PeriodicTask* periodic_task_ptr = periodic_task.get();
task_queue->PostDelayedTask(std::move(periodic_task), period_ms);
return periodic_task_ptr;
}
} // namespace
SendSideCongestionController::SendSideCongestionController(
const Clock* clock,
rtc::TaskQueue* task_queue,
RtcEventLog* event_log,
PacedSender* pacer,
int start_bitrate_bps,
int min_bitrate_bps,
int max_bitrate_bps,
NetworkControllerFactoryInterface* controller_factory)
: clock_(clock),
pacer_(pacer),
transport_feedback_adapter_(clock_),
controller_factory_with_feedback_(controller_factory),
controller_factory_fallback_(
absl::make_unique<GoogCcNetworkControllerFactory>(event_log)),
process_interval_(controller_factory_fallback_->GetProcessInterval()),
last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
observer_(nullptr),
reset_feedback_on_route_change_(
!field_trial::IsEnabled("WebRTC-Bwe-NoFeedbackReset")),
send_side_bwe_with_overhead_(
webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")),
transport_overhead_bytes_per_packet_(0),
network_available_(false),
periodic_tasks_enabled_(true),
packet_feedback_available_(false),
pacer_queue_update_task_(nullptr),
controller_task_(nullptr),
task_queue_(task_queue) {
initial_config_.constraints = ConvertConstraints(
min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_);
RTC_DCHECK(start_bitrate_bps > 0);
// To be fully compatible with legacy SendSideCongestionController, make sure
// pacer is initialized even if there are no registered streams. This should
// not happen under normal circumstances, but some tests rely on it and there
// are no checks detecting when the legacy SendSideCongestionController is
// used. This way of setting the value has the drawback that it might be wrong
// compared to what the actual value from the congestion controller will be.
// TODO(srte): Remove this when the legacy SendSideCongestionController is
// removed.
pacer_->SetEstimatedBitrate(start_bitrate_bps);
}
// There is no point in having a network controller for a network that is not
// yet available and if we don't have any observer of it's state.
// MaybeCreateControllers is used to trigger creation if those things are
// fulfilled. This is needed since dependent code uses the period until network
// is signalled to be avaliabile to set the expected start bitrate which is sent
// to the initializer for NetworkControllers. The observer is injected later due
// to a circular dependency between RtpTransportControllerSend and Call.
// TODO(srte): Break the circular dependency issue and make sure that starting
// bandwidth is set before this class is initialized so the controllers can be
// created in the constructor.
void SendSideCongestionController::MaybeCreateControllers() {
if (!controller_)
MaybeRecreateControllers();
}
void SendSideCongestionController::MaybeRecreateControllers() {
if (!network_available_ || !observer_)
return;
if (!control_handler_) {
control_handler_ =
absl::make_unique<CongestionControlHandler>(observer_, pacer_);
}
initial_config_.constraints.at_time =
Timestamp::ms(clock_->TimeInMilliseconds());
initial_config_.stream_based_config = streams_config_;
if (!controller_) {
// TODO(srte): Use fallback controller if no feedback is available.
if (controller_factory_with_feedback_) {
RTC_LOG(LS_INFO) << "Creating feedback based only controller";
controller_ = controller_factory_with_feedback_->Create(initial_config_);
process_interval_ =
controller_factory_with_feedback_->GetProcessInterval();
} else {
RTC_LOG(LS_INFO) << "Creating fallback controller";
controller_ = controller_factory_fallback_->Create(initial_config_);
process_interval_ = controller_factory_fallback_->GetProcessInterval();
}
UpdateControllerWithTimeInterval();
StartProcessPeriodicTasks();
}
RTC_DCHECK(controller_);
}
void SendSideCongestionController::UpdateInitialConstraints(
TargetRateConstraints new_contraints) {
if (!new_contraints.starting_rate)
new_contraints.starting_rate = initial_config_.constraints.starting_rate;
RTC_DCHECK(new_contraints.starting_rate);
initial_config_.constraints = new_contraints;
}
SendSideCongestionController::~SendSideCongestionController() = default;
void SendSideCongestionController::RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
transport_feedback_adapter_.RegisterPacketFeedbackObserver(observer);
}
void SendSideCongestionController::DeRegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
transport_feedback_adapter_.DeRegisterPacketFeedbackObserver(observer);
}
void SendSideCongestionController::RegisterNetworkObserver(
NetworkChangedObserver* observer) {
task_queue_->PostTask([this, observer]() {
RTC_DCHECK_RUN_ON(task_queue_);
RTC_DCHECK(observer_ == nullptr);
observer_ = observer;
MaybeCreateControllers();
});
}
void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps,
int start_bitrate_bps,
int max_bitrate_bps) {
TargetRateConstraints constraints = ConvertConstraints(
min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_);
task_queue_->PostTask([this, constraints]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_) {
control_handler_->PostUpdates(
controller_->OnTargetRateConstraints(constraints));
} else {
UpdateInitialConstraints(constraints);
}
});
}
void SendSideCongestionController::SetAllocatedSendBitrateLimits(
int64_t min_send_bitrate_bps,
int64_t max_padding_bitrate_bps,
int64_t max_total_bitrate_bps) {
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
streams_config_.max_total_allocated_bitrate =
DataRate::bps(max_total_bitrate_bps);
UpdateStreamsConfig();
}
// TODO(holmer): Split this up and use SetBweBitrates in combination with
// OnNetworkRouteChanged.
void SendSideCongestionController::OnNetworkRouteChanged(
const rtc::NetworkRoute& network_route,
int start_bitrate_bps,
int min_bitrate_bps,
int max_bitrate_bps) {
if (reset_feedback_on_route_change_)
transport_feedback_adapter_.SetNetworkIds(network_route.local_network_id,
network_route.remote_network_id);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
NetworkRouteChange msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.constraints = ConvertConstraints(min_bitrate_bps, max_bitrate_bps,
start_bitrate_bps, clock_);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg));
} else {
UpdateInitialConstraints(msg.constraints);
}
pacer_->UpdateOutstandingData(0);
});
}
bool SendSideCongestionController::AvailableBandwidth(
uint32_t* bandwidth) const {
// This is only called in the OnNetworkChanged callback in
// RtpTransportControllerSend which is called from ControlHandler, which is
// running on the task queue.
// TODO(srte): Remove this function when RtpTransportControllerSend stops
// calling it.
RTC_DCHECK_RUN_ON(task_queue_);
if (!control_handler_) {
return false;
}
// TODO(srte): Remove this interface and push information about bandwidth
// estimation to users of this class, thereby reducing synchronous calls.
if (control_handler_->last_transfer_rate().has_value()) {
*bandwidth = control_handler_->last_transfer_rate()
->network_estimate.bandwidth.bps();
return true;
}
return false;
}
RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() {
return this;
}
void SendSideCongestionController::SetPerPacketFeedbackAvailable(
bool available) {
RTC_DCHECK_RUN_ON(task_queue_);
packet_feedback_available_ = available;
MaybeRecreateControllers();
}
void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) {
task_queue_->PostTask([this, enable]() {
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.requests_alr_probing = enable;
UpdateStreamsConfig();
});
}
void SendSideCongestionController::UpdateStreamsConfig() {
streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
if (controller_)
control_handler_->PostUpdates(
controller_->OnStreamsConfig(streams_config_));
}
TransportFeedbackObserver*
SendSideCongestionController::GetTransportFeedbackObserver() {
return this;
}
void SendSideCongestionController::SignalNetworkState(NetworkState state) {
RTC_LOG(LS_INFO) << "SignalNetworkState "
<< (state == kNetworkUp ? "Up" : "Down");
NetworkAvailability msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.network_available = state == kNetworkUp;
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
network_available_ = msg.network_available;
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg));
control_handler_->OnNetworkAvailability(msg);
} else {
MaybeCreateControllers();
}
});
}
void SendSideCongestionController::OnSentPacket(
const rtc::SentPacket& sent_packet) {
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (packet_msg) {
task_queue_->PostTask([this, packet_msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg));
});
}
MaybeUpdateOutstandingData();
}
void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
int64_t max_rtt_ms) {
int64_t now_ms = clock_->TimeInMilliseconds();
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
report.smoothed = true;
task_queue_->PostTask([this, report]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
}
int64_t SendSideCongestionController::TimeUntilNextProcess() {
// Using task queue to process, just sleep long to avoid wasting resources.
return 60 * 1000;
}
void SendSideCongestionController::Process() {
// Ignored, using task queue to process.
}
void SendSideCongestionController::StartProcessPeriodicTasks() {
if (!periodic_tasks_enabled_)
return;
if (!pacer_queue_update_task_) {
pacer_queue_update_task_ =
StartPeriodicTask(task_queue_, PacerQueueUpdateIntervalMs, [this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdatePacerQueue();
});
}
if (controller_task_) {
// Stop is not synchronous, but is guaranteed to occur before the first
// invocation of the new controller task started below.
controller_task_->Stop();
controller_task_ = nullptr;
}
if (process_interval_.IsFinite()) {
// The controller task is owned by the task queue and lives until the task
// queue is destroyed or some time after Stop() is called, whichever comes
// first.
controller_task_ =
StartPeriodicTask(task_queue_, process_interval_.ms(), [this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdateControllerWithTimeInterval();
});
}
}
void SendSideCongestionController::UpdateControllerWithTimeInterval() {
if (controller_) {
ProcessInterval msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
control_handler_->PostUpdates(controller_->OnProcessInterval(msg));
}
}
void SendSideCongestionController::UpdatePacerQueue() {
if (control_handler_) {
TimeDelta expected_queue_time =
TimeDelta::ms(pacer_->ExpectedQueueTimeMs());
control_handler_->OnPacerQueueUpdate(expected_queue_time);
}
}
void SendSideCongestionController::AddPacket(
uint32_t ssrc,
uint16_t sequence_number,
size_t length,
const PacedPacketInfo& pacing_info) {
if (send_side_bwe_with_overhead_) {
length += transport_overhead_bytes_per_packet_;
}
transport_feedback_adapter_.AddPacket(ssrc, sequence_number, length,
pacing_info);
}
void SendSideCongestionController::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
absl::optional<TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessTransportFeedback(feedback);
if (feedback_msg) {
task_queue_->PostTask([this, feedback_msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(
controller_->OnTransportPacketsFeedback(*feedback_msg));
});
}
MaybeUpdateOutstandingData();
}
void SendSideCongestionController::MaybeUpdateOutstandingData() {
DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData();
task_queue_->PostTask([this, in_flight_data]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (control_handler_)
control_handler_->OnOutstandingData(in_flight_data);
});
}
std::vector<PacketFeedback>
SendSideCongestionController::GetTransportFeedbackVector() const {
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
return transport_feedback_adapter_.GetTransportFeedbackVector();
}
void SendSideCongestionController::PostPeriodicTasksForTest() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdateControllerWithTimeInterval();
UpdatePacerQueue();
});
}
void SendSideCongestionController::WaitOnTasksForTest() {
rtc::Event event;
task_queue_->PostTask([&event]() { event.Set(); });
event.Wait(rtc::Event::kForever);
}
void SendSideCongestionController::SetPacingFactor(float pacing_factor) {
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.pacing_factor = pacing_factor;
UpdateStreamsConfig();
}
void SendSideCongestionController::SetAllocatedBitrateWithoutFeedback(
uint32_t bitrate_bps) {
task_queue_->PostTask([this, bitrate_bps]() {
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.unacknowledged_rate_allocation = DataRate::bps(bitrate_bps);
UpdateStreamsConfig();
});
}
void SendSideCongestionController::DisablePeriodicTasks() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_);
periodic_tasks_enabled_ = false;
});
}
void SendSideCongestionController::OnReceivedEstimatedBitrate(
uint32_t bitrate) {
RemoteBitrateReport msg;
msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.bandwidth = DataRate::bps(bitrate);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg));
});
}
void SendSideCongestionController::OnReceivedRtcpReceiverReport(
const webrtc::ReportBlockList& report_blocks,
int64_t rtt_ms,
int64_t now_ms) {
task_queue_->PostTask([this, report_blocks, now_ms]() {
RTC_DCHECK_RUN_ON(task_queue_);
OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
});
task_queue_->PostTask([this, now_ms, rtt_ms]() {
RTC_DCHECK_RUN_ON(task_queue_);
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(rtt_ms);
report.smoothed = false;
if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
}
void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks(
const ReportBlockList& report_blocks,
int64_t now_ms) {
if (report_blocks.empty())
return;
int total_packets_lost_delta = 0;
int total_packets_delta = 0;
// Compute the packet loss from all report blocks.
for (const RTCPReportBlock& report_block : report_blocks) {
auto it = last_report_blocks_.find(report_block.source_ssrc);
if (it != last_report_blocks_.end()) {
auto number_of_packets = report_block.extended_highest_sequence_number -
it->second.extended_highest_sequence_number;
total_packets_delta += number_of_packets;
auto lost_delta = report_block.packets_lost - it->second.packets_lost;
total_packets_lost_delta += lost_delta;
}
last_report_blocks_[report_block.source_ssrc] = report_block;
}
// Can only compute delta if there has been previous blocks to compare to. If
// not, total_packets_delta will be unchanged and there's nothing more to do.
if (!total_packets_delta)
return;
int packets_received_delta = total_packets_delta - total_packets_lost_delta;
// To detect lost packets, at least one packet has to be received. This check
// is needed to avoid bandwith detection update in
// VideoSendStreamTest.SuspendBelowMinBitrate
if (packets_received_delta < 1)
return;
Timestamp now = Timestamp::ms(now_ms);
TransportLossReport msg;
msg.packets_lost_delta = total_packets_lost_delta;
msg.packets_received_delta = packets_received_delta;
msg.receive_time = now;
msg.start_time = last_report_block_time_;
msg.end_time = now;
if (controller_)
control_handler_->PostUpdates(controller_->OnTransportLossReport(msg));
last_report_block_time_ = now;
}
} // namespace webrtc_cc
} // namespace webrtc

View File

@ -1,523 +0,0 @@
/*
* Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h"
#include "logging/rtc_event_log/mock/mock_rtc_event_log.h"
#include "modules/congestion_controller/include/mock/mock_congestion_observer.h"
#include "modules/congestion_controller/rtp/congestion_controller_unittests_helper.h"
#include "modules/pacing/mock/mock_paced_sender.h"
#include "modules/pacing/packet_router.h"
#include "modules/remote_bitrate_estimator/include/bwe_defines.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "rtc_base/network/sent_packet.h"
#include "system_wrappers/include/clock.h"
#include "test/field_trial.h"
#include "test/gmock.h"
#include "test/gtest.h"
using testing::_;
using testing::AtLeast;
using testing::Ge;
using testing::NiceMock;
using testing::Return;
using testing::SaveArg;
using testing::StrictMock;
namespace webrtc {
namespace webrtc_cc {
namespace test {
namespace {
using webrtc::test::MockCongestionObserver;
const webrtc::PacedPacketInfo kPacingInfo0(0, 5, 2000);
const webrtc::PacedPacketInfo kPacingInfo1(1, 8, 4000);
const uint32_t kInitialBitrateBps = 60000;
const float kDefaultPacingRate = 2.5f;
class SendSideCongestionControllerForTest
: public SendSideCongestionController {
public:
using SendSideCongestionController::SendSideCongestionController;
~SendSideCongestionControllerForTest() {}
using SendSideCongestionController::DisablePeriodicTasks;
void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); }
void Process() override {
SendSideCongestionController::PostPeriodicTasksForTest();
SendSideCongestionController::WaitOnTasksForTest();
}
};
} // namespace
class SendSideCongestionControllerTest : public ::testing::Test {
protected:
SendSideCongestionControllerTest()
: clock_(123456),
target_bitrate_observer_(this),
bandwidth_observer_(nullptr) {}
~SendSideCongestionControllerTest() override {}
void SetUp() override {
pacer_.reset(new NiceMock<MockPacedSender>());
// Set the initial bitrate estimate and expect the |observer| and |pacer_|
// to be updated.
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
EXPECT_CALL(*pacer_,
SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _));
EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 3));
EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 5));
task_queue_ = absl::make_unique<rtc::TaskQueue>("SSCC Test");
controller_.reset(new SendSideCongestionControllerForTest(
&clock_, task_queue_.get(), &event_log_, pacer_.get(),
kInitialBitrateBps, 0, 5 * kInitialBitrateBps, nullptr));
controller_->DisablePeriodicTasks();
controller_->RegisterNetworkObserver(&observer_);
controller_->SignalNetworkState(NetworkState::kNetworkUp);
bandwidth_observer_ = controller_->GetBandwidthObserver();
controller_->WaitOnTasks();
testing::Mock::VerifyAndClearExpectations(pacer_.get());
testing::Mock::VerifyAndClearExpectations(&observer_);
}
void TearDown() override { controller_->WaitOnTasks(); }
// Custom setup - use an observer that tracks the target bitrate, without
// prescribing on which iterations it must change (like a mock would).
void TargetBitrateTrackingSetup() {
bandwidth_observer_ = nullptr;
pacer_.reset(new NiceMock<MockPacedSender>());
task_queue_ = absl::make_unique<rtc::TaskQueue>("SSCC Test");
controller_.reset(new SendSideCongestionControllerForTest(
&clock_, task_queue_.get(), &event_log_, pacer_.get(),
kInitialBitrateBps, 0, 5 * kInitialBitrateBps, nullptr));
controller_->DisablePeriodicTasks();
controller_->RegisterNetworkObserver(&target_bitrate_observer_);
controller_->SignalNetworkState(NetworkState::kNetworkUp);
}
void OnSentPacket(const PacketFeedback& packet_feedback) {
constexpr uint32_t ssrc = 0;
controller_->AddPacket(ssrc, packet_feedback.sequence_number,
packet_feedback.payload_size,
packet_feedback.pacing_info);
rtc::PacketInfo packet_info;
packet_info.included_in_feedback = true;
controller_->OnSentPacket(rtc::SentPacket(packet_feedback.sequence_number,
packet_feedback.send_time_ms,
packet_info));
}
// Allows us to track the target bitrate, without prescribing the exact
// iterations when this would hapen, like a mock would.
class TargetBitrateObserver : public NetworkChangedObserver {
public:
explicit TargetBitrateObserver(SendSideCongestionControllerTest* owner)
: owner_(owner) {}
~TargetBitrateObserver() override = default;
void OnNetworkChanged(uint32_t bitrate_bps,
uint8_t fraction_loss, // 0 - 255.
int64_t rtt_ms,
int64_t probing_interval_ms) override {
owner_->target_bitrate_bps_ = bitrate_bps;
}
private:
SendSideCongestionControllerTest* owner_;
};
void PacketTransmissionAndFeedbackBlock(uint16_t* seq_num,
int64_t runtime_ms,
int64_t delay) {
int64_t delay_buildup = 0;
int64_t start_time_ms = clock_.TimeInMilliseconds();
while (clock_.TimeInMilliseconds() - start_time_ms < runtime_ms) {
constexpr size_t kPayloadSize = 1000;
PacketFeedback packet(clock_.TimeInMilliseconds() + delay_buildup,
clock_.TimeInMilliseconds(), *seq_num, kPayloadSize,
PacedPacketInfo());
delay_buildup += delay; // Delay has to increase, or it's just RTT.
OnSentPacket(packet);
// Create expected feedback and send into adapter.
std::unique_ptr<rtcp::TransportFeedback> feedback(
new rtcp::TransportFeedback());
feedback->SetBase(packet.sequence_number, packet.arrival_time_ms * 1000);
EXPECT_TRUE(feedback->AddReceivedPacket(packet.sequence_number,
packet.arrival_time_ms * 1000));
rtc::Buffer raw_packet = feedback->Build();
feedback = rtcp::TransportFeedback::ParseFrom(raw_packet.data(),
raw_packet.size());
EXPECT_TRUE(feedback.get() != nullptr);
controller_->OnTransportFeedback(*feedback.get());
clock_.AdvanceTimeMilliseconds(50);
controller_->Process();
++(*seq_num);
}
}
SimulatedClock clock_;
StrictMock<MockCongestionObserver> observer_;
TargetBitrateObserver target_bitrate_observer_;
NiceMock<MockRtcEventLog> event_log_;
RtcpBandwidthObserver* bandwidth_observer_;
PacketRouter packet_router_;
std::unique_ptr<NiceMock<MockPacedSender>> pacer_;
std::unique_ptr<SendSideCongestionControllerForTest> controller_;
absl::optional<uint32_t> target_bitrate_bps_;
std::unique_ptr<rtc::TaskQueue> task_queue_;
};
TEST_F(SendSideCongestionControllerTest, OnNetworkChanged) {
// Test no change.
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
EXPECT_CALL(*pacer_,
SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _));
bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
EXPECT_CALL(*pacer_,
SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _));
bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps);
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
}
TEST_F(SendSideCongestionControllerTest, OnSendQueueFull) {
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
controller_->Process();
// Let the pacer not be full next time the controller checks.
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1));
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
controller_->Process();
}
TEST_F(SendSideCongestionControllerTest, OnSendQueueFullAndEstimateChange) {
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
controller_->Process();
// Receive new estimate but let the queue still be full.
bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
// The send pacer should get the new estimate though.
EXPECT_CALL(*pacer_,
SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _));
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
// Let the pacer not be full next time the controller checks.
// |OnNetworkChanged| should be called with the new estimate.
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1));
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
}
TEST_F(SendSideCongestionControllerTest, SignalNetworkState) {
EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
controller_->SignalNetworkState(kNetworkDown);
controller_->WaitOnTasks();
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
controller_->SignalNetworkState(kNetworkUp);
controller_->WaitOnTasks();
EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
controller_->SignalNetworkState(kNetworkDown);
}
TEST_F(SendSideCongestionControllerTest, OnNetworkRouteChanged) {
int new_bitrate = 200000;
EXPECT_CALL(observer_, OnNetworkChanged(new_bitrate, _, _, _));
EXPECT_CALL(*pacer_, SetPacingRates(new_bitrate * kDefaultPacingRate, _));
rtc::NetworkRoute route;
route.local_network_id = 1;
controller_->OnNetworkRouteChanged(route, new_bitrate, -1, -1);
controller_->WaitOnTasks();
testing::Mock::VerifyAndClearExpectations(pacer_.get());
testing::Mock::VerifyAndClearExpectations(&observer_);
// If the bitrate is reset to -1, the new starting bitrate will be
// the minimum default bitrate kMinBitrateBps.
EXPECT_CALL(
observer_,
OnNetworkChanged(congestion_controller::GetMinBitrateBps(), _, _, _));
EXPECT_CALL(
*pacer_,
SetPacingRates(
congestion_controller::GetMinBitrateBps() * kDefaultPacingRate, _));
route.local_network_id = 2;
controller_->OnNetworkRouteChanged(route, -1, -1, -1);
}
TEST_F(SendSideCongestionControllerTest, OldFeedback) {
int new_bitrate = 200000;
testing::Mock::VerifyAndClearExpectations(pacer_.get());
EXPECT_CALL(observer_, OnNetworkChanged(new_bitrate, _, _, _));
EXPECT_CALL(*pacer_, SetPacingRates(new_bitrate * kDefaultPacingRate, _));
// Send a few packets on the first network route.
std::vector<PacketFeedback> packets;
packets.push_back(PacketFeedback(0, 0, 0, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(10, 10, 1, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(20, 20, 2, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(30, 30, 3, 1500, kPacingInfo1));
packets.push_back(PacketFeedback(40, 40, 4, 1500, kPacingInfo1));
for (const PacketFeedback& packet : packets)
OnSentPacket(packet);
// Change route and then insert a number of feedback packets.
rtc::NetworkRoute route;
route.local_network_id = 1;
controller_->OnNetworkRouteChanged(route, new_bitrate, -1, -1);
for (const PacketFeedback& packet : packets) {
rtcp::TransportFeedback feedback;
feedback.SetBase(packet.sequence_number, packet.arrival_time_ms * 1000);
EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number,
packet.arrival_time_ms * 1000));
feedback.Build();
controller_->OnTransportFeedback(feedback);
}
controller_->WaitOnTasks();
// If the bitrate is reset to -1, the new starting bitrate will be
// the minimum default bitrate kMinBitrateBps.
EXPECT_CALL(
observer_,
OnNetworkChanged(congestion_controller::GetMinBitrateBps(), _, _, _));
EXPECT_CALL(
*pacer_,
SetPacingRates(
congestion_controller::GetMinBitrateBps() * kDefaultPacingRate, _));
route.local_network_id = 2;
controller_->OnNetworkRouteChanged(route, -1, -1, -1);
}
TEST_F(SendSideCongestionControllerTest,
SignalNetworkStateAndQueueIsFullAndEstimateChange) {
// Send queue is full.
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
controller_->Process();
// Queue is full and network is down. Expect no bitrate change.
controller_->SignalNetworkState(kNetworkDown);
controller_->Process();
// Queue is full but network is up. Expect no bitrate change.
controller_->SignalNetworkState(kNetworkUp);
controller_->Process();
// Receive new estimate but let the queue still be full.
EXPECT_CALL(*pacer_,
SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _));
bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
// Let the pacer not be full next time the controller checks.
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1));
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
controller_->Process();
}
TEST_F(SendSideCongestionControllerTest, GetProbingInterval) {
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
EXPECT_CALL(observer_, OnNetworkChanged(_, _, _, testing::Ne(0)));
EXPECT_CALL(*pacer_, SetPacingRates(_, _));
bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
clock_.AdvanceTimeMilliseconds(25);
controller_->Process();
}
TEST_F(SendSideCongestionControllerTest, ProbeOnRouteChange) {
EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 6));
EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 12));
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
rtc::NetworkRoute route;
route.local_network_id = 1;
controller_->OnNetworkRouteChanged(route, 2 * kInitialBitrateBps, 0,
20 * kInitialBitrateBps);
controller_->Process();
}
// Estimated bitrate reduced when the feedbacks arrive with such a long delay,
// that the send-time-history no longer holds the feedbacked packets.
TEST_F(SendSideCongestionControllerTest, LongFeedbackDelays) {
TargetBitrateTrackingSetup();
const int64_t kFeedbackTimeoutMs = 60001;
const int kMaxConsecutiveFailedLookups = 5;
for (int i = 0; i < kMaxConsecutiveFailedLookups; ++i) {
std::vector<PacketFeedback> packets;
packets.push_back(
PacketFeedback(i * 100, 2 * i * 100, 0, 1500, kPacingInfo0));
packets.push_back(
PacketFeedback(i * 100 + 10, 2 * i * 100 + 10, 1, 1500, kPacingInfo0));
packets.push_back(
PacketFeedback(i * 100 + 20, 2 * i * 100 + 20, 2, 1500, kPacingInfo0));
packets.push_back(
PacketFeedback(i * 100 + 30, 2 * i * 100 + 30, 3, 1500, kPacingInfo1));
packets.push_back(
PacketFeedback(i * 100 + 40, 2 * i * 100 + 40, 4, 1500, kPacingInfo1));
for (const PacketFeedback& packet : packets)
OnSentPacket(packet);
rtcp::TransportFeedback feedback;
feedback.SetBase(packets[0].sequence_number,
packets[0].arrival_time_ms * 1000);
for (const PacketFeedback& packet : packets) {
EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number,
packet.arrival_time_ms * 1000));
}
feedback.Build();
clock_.AdvanceTimeMilliseconds(kFeedbackTimeoutMs);
PacketFeedback later_packet(kFeedbackTimeoutMs + i * 100 + 40,
kFeedbackTimeoutMs + i * 200 + 40, 5, 1500,
kPacingInfo1);
OnSentPacket(later_packet);
controller_->OnTransportFeedback(feedback);
// Check that packets have timed out.
for (PacketFeedback& packet : packets) {
packet.send_time_ms = PacketFeedback::kNoSendTime;
packet.payload_size = 0;
packet.pacing_info = PacedPacketInfo();
}
ComparePacketFeedbackVectors(packets,
controller_->GetTransportFeedbackVector());
}
controller_->Process();
EXPECT_EQ(kInitialBitrateBps / 2, target_bitrate_bps_);
// Test with feedback that isn't late enough to time out.
{
std::vector<PacketFeedback> packets;
packets.push_back(PacketFeedback(100, 200, 0, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(110, 210, 1, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(120, 220, 2, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(130, 230, 3, 1500, kPacingInfo1));
packets.push_back(PacketFeedback(140, 240, 4, 1500, kPacingInfo1));
for (const PacketFeedback& packet : packets)
OnSentPacket(packet);
rtcp::TransportFeedback feedback;
feedback.SetBase(packets[0].sequence_number,
packets[0].arrival_time_ms * 1000);
for (const PacketFeedback& packet : packets) {
EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number,
packet.arrival_time_ms * 1000));
}
feedback.Build();
clock_.AdvanceTimeMilliseconds(kFeedbackTimeoutMs - 1);
PacketFeedback later_packet(kFeedbackTimeoutMs + 140,
kFeedbackTimeoutMs + 240, 5, 1500,
kPacingInfo1);
OnSentPacket(later_packet);
controller_->OnTransportFeedback(feedback);
ComparePacketFeedbackVectors(packets,
controller_->GetTransportFeedbackVector());
}
}
// Bandwidth estimation is updated when feedbacks are received.
// Feedbacks which show an increasing delay cause the estimation to be reduced.
TEST_F(SendSideCongestionControllerTest, UpdatesDelayBasedEstimate) {
TargetBitrateTrackingSetup();
const int64_t kRunTimeMs = 6000;
uint16_t seq_num = 0;
// The test must run and insert packets/feedback long enough that the
// BWE computes a valid estimate. This is first done in an environment which
// simulates no bandwidth limitation, and therefore not built-up delay.
PacketTransmissionAndFeedbackBlock(&seq_num, kRunTimeMs, 0);
ASSERT_TRUE(target_bitrate_bps_);
// Repeat, but this time with a building delay, and make sure that the
// estimation is adjusted downwards.
uint32_t bitrate_before_delay = *target_bitrate_bps_;
PacketTransmissionAndFeedbackBlock(&seq_num, kRunTimeMs, 50);
EXPECT_LT(*target_bitrate_bps_, bitrate_before_delay);
}
TEST_F(SendSideCongestionControllerTest, PacerQueueEncodeRatePushback) {
::webrtc::test::ScopedFieldTrials pushback_field_trial(
"WebRTC-PacerPushbackExperiment/Enabled/");
SetUp();
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0));
controller_->Process();
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(100));
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 0.9, _, _, _));
controller_->Process();
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(50));
controller_->Process();
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0));
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
controller_->Process();
const uint32_t kMinAdjustedBps = 50000;
int expected_queue_threshold =
1000 - kMinAdjustedBps * 1000.0 / kInitialBitrateBps;
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillOnce(Return(expected_queue_threshold));
EXPECT_CALL(observer_, OnNetworkChanged(Ge(kMinAdjustedBps), _, _, _));
controller_->Process();
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
.WillOnce(Return(expected_queue_threshold + 1));
EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
controller_->Process();
EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0));
EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
controller_->Process();
}
} // namespace test
} // namespace webrtc_cc
} // namespace webrtc