diff --git a/call/BUILD.gn b/call/BUILD.gn index fea81b540e..4d7cccaaad 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -123,6 +123,7 @@ rtc_source_set("rtp_sender") { "..:webrtc_common", "../api:fec_controller_api", "../api:transport_api", + "../api/transport:goog_cc", "../api/transport:network_control", "../api/units:data_rate", "../api/units:time_delta", @@ -131,7 +132,8 @@ rtc_source_set("rtp_sender") { "../api/video_codecs:video_codecs_api", "../logging:rtc_event_log_api", "../modules/congestion_controller", - "../modules/congestion_controller/rtp:congestion_controller", + "../modules/congestion_controller/rtp:control_handler", + "../modules/congestion_controller/rtp:transport_feedback", "../modules/pacing", "../modules/rtp_rtcp:rtp_rtcp", "../modules/rtp_rtcp:rtp_rtcp_format", diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index ac0f8de807..cb28ecaebf 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -12,14 +12,13 @@ #include "absl/memory/memory.h" #include "absl/types/optional.h" +#include "api/transport/goog_cc_factory.h" #include "api/transport/network_types.h" #include "api/units/data_rate.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "call/rtp_transport_controller_send.h" #include "call/rtp_video_sender.h" -#include "modules/congestion_controller/include/send_side_congestion_controller.h" -#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h" #include "rtc_base/checks.h" #include "rtc_base/location.h" #include "rtc_base/logging.h" @@ -27,35 +26,87 @@ #include "system_wrappers/include/field_trial.h" namespace webrtc { +class RtpTransportControllerSend::PeriodicTask : public rtc::QueuedTask { + public: + virtual void Stop() = 0; +}; + namespace { static const int64_t kRetransmitWindowSizeMs = 500; static const size_t kMaxOverheadBytes = 500; -const char kTaskQueueExperiment[] = "WebRTC-TaskQueueCongestionControl"; -using TaskQueueController = webrtc::webrtc_cc::SendSideCongestionController; -std::unique_ptr CreateController( - Clock* clock, - rtc::TaskQueue* task_queue, - webrtc::RtcEventLog* event_log, - PacedSender* pacer, - const BitrateConstraints& bitrate_config, - bool task_queue_controller, - NetworkControllerFactoryInterface* controller_factory) { - if (task_queue_controller) { - RTC_LOG(LS_INFO) << "Using TaskQueue based SSCC"; - return absl::make_unique( - clock, task_queue, event_log, pacer, bitrate_config.start_bitrate_bps, - bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps, - controller_factory); +const int64_t PacerQueueUpdateIntervalMs = 25; + +TargetRateConstraints ConvertConstraints(int min_bitrate_bps, + int max_bitrate_bps, + int start_bitrate_bps, + const Clock* clock) { + TargetRateConstraints msg; + msg.at_time = Timestamp::ms(clock->TimeInMilliseconds()); + msg.min_data_rate = + min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero(); + msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps) + : DataRate::Infinity(); + if (start_bitrate_bps > 0) + msg.starting_rate = DataRate::bps(start_bitrate_bps); + return msg; +} + +TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints, + const Clock* clock) { + return ConvertConstraints(contraints.min_bitrate_bps, + contraints.max_bitrate_bps, + contraints.start_bitrate_bps, clock); +} + +// The template closure pattern is based on rtc::ClosureTask. +template +class PeriodicTaskImpl final : public RtpTransportControllerSend::PeriodicTask { + public: + PeriodicTaskImpl(rtc::TaskQueue* task_queue, + int64_t period_ms, + Closure&& closure) + : task_queue_(task_queue), + period_ms_(period_ms), + closure_(std::forward(closure)) {} + bool Run() override { + if (!running_) + return true; + closure_(); + // absl::WrapUnique lets us repost this task on the TaskQueue. + task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_); + // Return false to tell TaskQueue to not destruct this object, since we have + // taken ownership with absl::WrapUnique. + return false; } - RTC_LOG(LS_INFO) << "Using Legacy SSCC"; - auto cc = absl::make_unique( - clock, nullptr /* observer */, event_log, pacer); - cc->SignalNetworkState(kNetworkDown); - cc->SetBweBitrates(bitrate_config.min_bitrate_bps, - bitrate_config.start_bitrate_bps, - bitrate_config.max_bitrate_bps); - return std::move(cc); + void Stop() override { + if (task_queue_->IsCurrent()) { + RTC_DCHECK(running_); + running_ = false; + } else { + task_queue_->PostTask([this] { Stop(); }); + } + } + + private: + rtc::TaskQueue* const task_queue_; + const int64_t period_ms_; + typename std::remove_const< + typename std::remove_reference::type>::type closure_; + bool running_ = true; +}; + +template +static RtpTransportControllerSend::PeriodicTask* StartPeriodicTask( + rtc::TaskQueue* task_queue, + int64_t period_ms, + Closure&& closure) { + auto periodic_task = absl::make_unique>( + task_queue, period_ms, std::forward(closure)); + RtpTransportControllerSend::PeriodicTask* periodic_task_ptr = + periodic_task.get(); + task_queue->PostDelayedTask(std::move(periodic_task), period_ms); + return periodic_task_ptr; } } // namespace @@ -69,21 +120,35 @@ RtpTransportControllerSend::RtpTransportControllerSend( bitrate_configurator_(bitrate_config), process_thread_(ProcessThread::Create("SendControllerThread")), observer_(nullptr), + transport_feedback_adapter_(clock_), + controller_factory_override_(controller_factory), + controller_factory_fallback_( + absl::make_unique(event_log)), + process_interval_(controller_factory_fallback_->GetProcessInterval()), + last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())), + reset_feedback_on_route_change_( + !field_trial::IsEnabled("WebRTC-Bwe-NoFeedbackReset")), + send_side_bwe_with_overhead_( + webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")), + transport_overhead_bytes_per_packet_(0), + network_available_(false), + periodic_tasks_enabled_(true), + packet_feedback_available_(false), + pacer_queue_update_task_(nullptr), + controller_task_(nullptr), retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs), task_queue_("rtp_send_controller") { - // Created after task_queue to be able to post to the task queue internally. - send_side_cc_ = CreateController( - clock, &task_queue_, event_log, &pacer_, bitrate_config, - !field_trial::IsDisabled(kTaskQueueExperiment), controller_factory); + initial_config_.constraints = ConvertConstraints(bitrate_config, clock_); + RTC_DCHECK(bitrate_config.start_bitrate_bps > 0); + + pacer_.SetEstimatedBitrate(bitrate_config.start_bitrate_bps); process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); - process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE); process_thread_->Start(); } RtpTransportControllerSend::~RtpTransportControllerSend() { process_thread_->Stop(); - process_thread_->DeRegisterModule(send_side_cc_.get()); process_thread_->DeRegisterModule(&pacer_); } @@ -126,34 +191,21 @@ void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps, uint8_t fraction_loss, int64_t rtt_ms, int64_t probing_interval_ms) { - // TODO(srte): Skip this step when old SendSideCongestionController is - // deprecated. TargetTransferRate msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.target_rate = DataRate::bps(bitrate_bps); - msg.network_estimate.at_time = msg.at_time; - msg.network_estimate.bwe_period = TimeDelta::ms(probing_interval_ms); - uint32_t bandwidth_bps; - if (send_side_cc_->AvailableBandwidth(&bandwidth_bps)) - msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps); - msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0; - msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms); + // TODO(srte): Remove this interface and push information about bandwidth + // estimation to users of this class, thereby reducing synchronous calls. + RTC_DCHECK_RUN_ON(&task_queue_); + RTC_DCHECK(control_handler_->last_transfer_rate().has_value()); + msg.network_estimate = + control_handler_->last_transfer_rate()->network_estimate; - retransmission_rate_limiter_.SetMaxRate(bandwidth_bps); + retransmission_rate_limiter_.SetMaxRate(msg.network_estimate.bandwidth.bps()); - if (!task_queue_.IsCurrent()) { - task_queue_.PostTask([this, msg] { - rtc::CritScope cs(&observer_crit_); - // We won't register as observer until we have an observers. - RTC_DCHECK(observer_ != nullptr); - observer_->OnTargetTransferRate(msg); - }); - } else { - rtc::CritScope cs(&observer_crit_); - // We won't register as observer until we have an observers. - RTC_DCHECK(observer_ != nullptr); - observer_->OnTargetTransferRate(msg); - } + // We won't register as observer until we have an observers. + RTC_DCHECK(observer_ != nullptr); + observer_->OnTargetTransferRate(msg); } rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() { @@ -166,7 +218,7 @@ PacketRouter* RtpTransportControllerSend::packet_router() { TransportFeedbackObserver* RtpTransportControllerSend::transport_feedback_observer() { - return send_side_cc_.get(); + return this; } RtpPacketSender* RtpTransportControllerSend::packet_sender() { @@ -181,8 +233,12 @@ void RtpTransportControllerSend::SetAllocatedSendBitrateLimits( int min_send_bitrate_bps, int max_padding_bitrate_bps, int max_total_bitrate_bps) { - send_side_cc_->SetAllocatedSendBitrateLimits( - min_send_bitrate_bps, max_padding_bitrate_bps, max_total_bitrate_bps); + RTC_DCHECK_RUN_ON(&task_queue_); + streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps); + streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps); + streams_config_.max_total_allocated_bitrate = + DataRate::bps(max_total_bitrate_bps); + UpdateStreamsConfig(); } void RtpTransportControllerSend::SetKeepAliveConfig( @@ -190,31 +246,33 @@ void RtpTransportControllerSend::SetKeepAliveConfig( keepalive_ = config; } void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) { - send_side_cc_->SetPacingFactor(pacing_factor); + RTC_DCHECK_RUN_ON(&task_queue_); + streams_config_.pacing_factor = pacing_factor; + UpdateStreamsConfig(); } void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) { pacer_.SetQueueTimeLimit(limit_ms); } CallStatsObserver* RtpTransportControllerSend::GetCallStatsObserver() { - return send_side_cc_.get(); + return this; } void RtpTransportControllerSend::RegisterPacketFeedbackObserver( PacketFeedbackObserver* observer) { - send_side_cc_->RegisterPacketFeedbackObserver(observer); + transport_feedback_adapter_.RegisterPacketFeedbackObserver(observer); } void RtpTransportControllerSend::DeRegisterPacketFeedbackObserver( PacketFeedbackObserver* observer) { - send_side_cc_->DeRegisterPacketFeedbackObserver(observer); + transport_feedback_adapter_.DeRegisterPacketFeedbackObserver(observer); } void RtpTransportControllerSend::RegisterTargetTransferRateObserver( TargetTransferRateObserver* observer) { - { - rtc::CritScope cs(&observer_crit_); + task_queue_.PostTask([this, observer] { + RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK(observer_ == nullptr); observer_ = observer; - } - send_side_cc_->RegisterNetworkObserver(this); + MaybeCreateControllers(); + }); } void RtpTransportControllerSend::OnNetworkRouteChanged( const std::string& transport_name, @@ -252,20 +310,49 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( << " bps, max: " << bitrate_config.max_bitrate_bps << " bps."; RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0); - send_side_cc_->OnNetworkRouteChanged( - network_route, bitrate_config.start_bitrate_bps, - bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps); + + if (reset_feedback_on_route_change_) + transport_feedback_adapter_.SetNetworkIds( + network_route.local_network_id, network_route.remote_network_id); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; + + NetworkRouteChange msg; + msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); + msg.constraints = ConvertConstraints(bitrate_config, clock_); + task_queue_.PostTask([this, msg] { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) { + control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg)); + } else { + UpdateInitialConstraints(msg.constraints); + } + pacer_.UpdateOutstandingData(0); + }); } } void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) { - send_side_cc_->SignalNetworkState(network_available ? kNetworkUp - : kNetworkDown); + RTC_LOG(LS_INFO) << "SignalNetworkState " + << (network_available ? "Up" : "Down"); + NetworkAvailability msg; + msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); + msg.network_available = network_available; + task_queue_.PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + network_available_ = msg.network_available; + if (controller_) { + control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg)); + control_handler_->OnNetworkAvailability(msg); + } else { + MaybeCreateControllers(); + } + }); + for (auto& rtp_sender : video_rtp_senders_) { rtp_sender->OnNetworkAvailability(network_available); } } RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() { - return send_side_cc_->GetBandwidthObserver(); + return this; } int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const { return pacer_.QueueInMs(); @@ -274,14 +361,30 @@ int64_t RtpTransportControllerSend::GetFirstPacketTimeMs() const { return pacer_.FirstSentPacketTimeMs(); } void RtpTransportControllerSend::SetPerPacketFeedbackAvailable(bool available) { - send_side_cc_->SetPerPacketFeedbackAvailable(available); + RTC_DCHECK_RUN_ON(&task_queue_); + packet_feedback_available_ = available; + if (!controller_) + MaybeCreateControllers(); } void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { - send_side_cc_->EnablePeriodicAlrProbing(enable); + task_queue_.PostTask([this, enable]() { + RTC_DCHECK_RUN_ON(&task_queue_); + streams_config_.requests_alr_probing = enable; + UpdateStreamsConfig(); + }); } void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { - send_side_cc_->OnSentPacket(sent_packet); + absl::optional packet_msg = + transport_feedback_adapter_.ProcessSentPacket(sent_packet); + if (packet_msg) { + task_queue_.PostTask([this, packet_msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) + control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg)); + }); + } + MaybeUpdateOutstandingData(); } void RtpTransportControllerSend::SetSdpBitrateParameters( @@ -289,9 +392,16 @@ void RtpTransportControllerSend::SetSdpBitrateParameters( absl::optional updated = bitrate_configurator_.UpdateWithSdpParameters(constraints); if (updated.has_value()) { - send_side_cc_->SetBweBitrates(updated->min_bitrate_bps, - updated->start_bitrate_bps, - updated->max_bitrate_bps); + TargetRateConstraints msg = ConvertConstraints(*updated, clock_); + task_queue_.PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) { + control_handler_->PostUpdates( + controller_->OnTargetRateConstraints(msg)); + } else { + UpdateInitialConstraints(msg); + } + }); } else { RTC_LOG(LS_VERBOSE) << "WebRTC.RtpTransportControllerSend.SetSdpBitrateParameters: " @@ -304,9 +414,16 @@ void RtpTransportControllerSend::SetClientBitratePreferences( absl::optional updated = bitrate_configurator_.UpdateWithClientPreferences(preferences); if (updated.has_value()) { - send_side_cc_->SetBweBitrates(updated->min_bitrate_bps, - updated->start_bitrate_bps, - updated->max_bitrate_bps); + TargetRateConstraints msg = ConvertConstraints(*updated, clock_); + task_queue_.PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) { + control_handler_->PostUpdates( + controller_->OnTargetRateConstraints(msg)); + } else { + UpdateInitialConstraints(msg); + } + }); } else { RTC_LOG(LS_VERBOSE) << "WebRTC.RtpTransportControllerSend.SetClientBitratePreferences: " @@ -321,7 +438,12 @@ void RtpTransportControllerSend::SetAllocatedBitrateWithoutFeedback( if (field_trial::IsEnabled("WebRTC-Audio-ABWENoTWCC")) { // TODO(srte): Make sure it's safe to always report this and remove the // field trial check. - send_side_cc_->SetAllocatedBitrateWithoutFeedback(bitrate_bps); + task_queue_.PostTask([this, bitrate_bps]() { + RTC_DCHECK_RUN_ON(&task_queue_); + streams_config_.unacknowledged_rate_allocation = + DataRate::bps(bitrate_bps); + UpdateStreamsConfig(); + }); } } @@ -339,4 +461,215 @@ void RtpTransportControllerSend::OnTransportOverheadChanged( transport_overhead_bytes_per_packet); } } + +void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) { + RemoteBitrateReport msg; + msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds()); + msg.bandwidth = DataRate::bps(bitrate); + task_queue_.PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) + control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg)); + }); +} + +void RtpTransportControllerSend::OnReceivedRtcpReceiverReport( + const ReportBlockList& report_blocks, + int64_t rtt_ms, + int64_t now_ms) { + task_queue_.PostTask([this, report_blocks, now_ms]() { + RTC_DCHECK_RUN_ON(&task_queue_); + OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); + }); + + task_queue_.PostTask([this, now_ms, rtt_ms]() { + RTC_DCHECK_RUN_ON(&task_queue_); + RoundTripTimeUpdate report; + report.receive_time = Timestamp::ms(now_ms); + report.round_trip_time = TimeDelta::ms(rtt_ms); + report.smoothed = false; + if (controller_) + control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report)); + }); +} + +void RtpTransportControllerSend::AddPacket(uint32_t ssrc, + uint16_t sequence_number, + size_t length, + const PacedPacketInfo& pacing_info) { + if (send_side_bwe_with_overhead_) { + length += transport_overhead_bytes_per_packet_; + } + transport_feedback_adapter_.AddPacket(ssrc, sequence_number, length, + pacing_info); +} + +void RtpTransportControllerSend::OnTransportFeedback( + const rtcp::TransportFeedback& feedback) { + RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); + + absl::optional feedback_msg = + transport_feedback_adapter_.ProcessTransportFeedback(feedback); + if (feedback_msg) { + task_queue_.PostTask([this, feedback_msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) + control_handler_->PostUpdates( + controller_->OnTransportPacketsFeedback(*feedback_msg)); + }); + } + MaybeUpdateOutstandingData(); +} + +void RtpTransportControllerSend::OnRttUpdate(int64_t avg_rtt_ms, + int64_t max_rtt_ms) { + int64_t now_ms = clock_->TimeInMilliseconds(); + RoundTripTimeUpdate report; + report.receive_time = Timestamp::ms(now_ms); + report.round_trip_time = TimeDelta::ms(avg_rtt_ms); + report.smoothed = true; + task_queue_.PostTask([this, report]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) + control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report)); + }); +} + +void RtpTransportControllerSend::MaybeCreateControllers() { + RTC_DCHECK(!controller_); + RTC_DCHECK(!control_handler_); + + if (!network_available_ || !observer_) + return; + control_handler_ = absl::make_unique(this, &pacer_); + + initial_config_.constraints.at_time = + Timestamp::ms(clock_->TimeInMilliseconds()); + initial_config_.stream_based_config = streams_config_; + + // TODO(srte): Use fallback controller if no feedback is available. + if (controller_factory_override_) { + RTC_LOG(LS_INFO) << "Creating overridden congestion controller"; + controller_ = controller_factory_override_->Create(initial_config_); + process_interval_ = controller_factory_override_->GetProcessInterval(); + } else { + RTC_LOG(LS_INFO) << "Creating fallback congestion controller"; + controller_ = controller_factory_fallback_->Create(initial_config_); + process_interval_ = controller_factory_fallback_->GetProcessInterval(); + } + UpdateControllerWithTimeInterval(); + StartProcessPeriodicTasks(); +} + +void RtpTransportControllerSend::UpdateInitialConstraints( + TargetRateConstraints new_contraints) { + if (!new_contraints.starting_rate) + new_contraints.starting_rate = initial_config_.constraints.starting_rate; + RTC_DCHECK(new_contraints.starting_rate); + initial_config_.constraints = new_contraints; +} + +void RtpTransportControllerSend::StartProcessPeriodicTasks() { + if (!periodic_tasks_enabled_) + return; + if (!pacer_queue_update_task_) { + pacer_queue_update_task_ = + StartPeriodicTask(&task_queue_, PacerQueueUpdateIntervalMs, [this]() { + RTC_DCHECK_RUN_ON(&task_queue_); + UpdatePacerQueue(); + }); + } + if (controller_task_) { + // Stop is not synchronous, but is guaranteed to occur before the first + // invocation of the new controller task started below. + controller_task_->Stop(); + controller_task_ = nullptr; + } + if (process_interval_.IsFinite()) { + // The controller task is owned by the task queue and lives until the task + // queue is destroyed or some time after Stop() is called, whichever comes + // first. + controller_task_ = + StartPeriodicTask(&task_queue_, process_interval_.ms(), [this]() { + RTC_DCHECK_RUN_ON(&task_queue_); + UpdateControllerWithTimeInterval(); + }); + } +} + +void RtpTransportControllerSend::UpdateControllerWithTimeInterval() { + if (controller_) { + ProcessInterval msg; + msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); + control_handler_->PostUpdates(controller_->OnProcessInterval(msg)); + } +} + +void RtpTransportControllerSend::UpdatePacerQueue() { + if (control_handler_) { + TimeDelta expected_queue_time = TimeDelta::ms(pacer_.ExpectedQueueTimeMs()); + control_handler_->OnPacerQueueUpdate(expected_queue_time); + } +} + +void RtpTransportControllerSend::MaybeUpdateOutstandingData() { + DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData(); + task_queue_.PostTask([this, in_flight_data]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (control_handler_) + control_handler_->OnOutstandingData(in_flight_data); + }); +} + +void RtpTransportControllerSend::UpdateStreamsConfig() { + streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); + if (controller_) + control_handler_->PostUpdates( + controller_->OnStreamsConfig(streams_config_)); +} + +void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks( + const ReportBlockList& report_blocks, + int64_t now_ms) { + if (report_blocks.empty()) + return; + + int total_packets_lost_delta = 0; + int total_packets_delta = 0; + + // Compute the packet loss from all report blocks. + for (const RTCPReportBlock& report_block : report_blocks) { + auto it = last_report_blocks_.find(report_block.source_ssrc); + if (it != last_report_blocks_.end()) { + auto number_of_packets = report_block.extended_highest_sequence_number - + it->second.extended_highest_sequence_number; + total_packets_delta += number_of_packets; + auto lost_delta = report_block.packets_lost - it->second.packets_lost; + total_packets_lost_delta += lost_delta; + } + last_report_blocks_[report_block.source_ssrc] = report_block; + } + // Can only compute delta if there has been previous blocks to compare to. If + // not, total_packets_delta will be unchanged and there's nothing more to do. + if (!total_packets_delta) + return; + int packets_received_delta = total_packets_delta - total_packets_lost_delta; + // To detect lost packets, at least one packet has to be received. This check + // is needed to avoid bandwith detection update in + // VideoSendStreamTest.SuspendBelowMinBitrate + + if (packets_received_delta < 1) + return; + Timestamp now = Timestamp::ms(now_ms); + TransportLossReport msg; + msg.packets_lost_delta = total_packets_lost_delta; + msg.packets_received_delta = packets_received_delta; + msg.receive_time = now; + msg.start_time = last_report_block_time_; + msg.end_time = now; + if (controller_) + control_handler_->PostUpdates(controller_->OnTransportLossReport(msg)); + last_report_block_time_ = now; +} + } // namespace webrtc diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index f1c46cc078..8406ca4b4c 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -11,6 +11,7 @@ #ifndef CALL_RTP_TRANSPORT_CONTROLLER_SEND_H_ #define CALL_RTP_TRANSPORT_CONTROLLER_SEND_H_ +#include #include #include #include @@ -20,15 +21,17 @@ #include "call/rtp_bitrate_configurator.h" #include "call/rtp_transport_controller_send_interface.h" #include "call/rtp_video_sender.h" -#include "modules/congestion_controller/include/send_side_congestion_controller_interface.h" +#include "modules/congestion_controller/include/network_changed_observer.h" +#include "modules/congestion_controller/rtp/control_handler.h" +#include "modules/congestion_controller/rtp/transport_feedback_adapter.h" #include "modules/pacing/packet_router.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/constructormagic.h" #include "rtc_base/networkroute.h" +#include "rtc_base/race_checker.h" #include "rtc_base/task_queue.h" namespace webrtc { - class Clock; class FrameEncryptorInterface; class RtcEventLog; @@ -38,7 +41,10 @@ class RtcEventLog; // per transport, sharing the same congestion controller. class RtpTransportControllerSend final : public RtpTransportControllerSendInterface, - public NetworkChangedObserver { + public NetworkChangedObserver, + public RtcpBandwidthObserver, + public CallStatsObserver, + public TransportFeedbackObserver { public: RtpTransportControllerSend( Clock* clock, @@ -108,7 +114,39 @@ class RtpTransportControllerSend final void OnTransportOverheadChanged( size_t transport_overhead_per_packet) override; + // Implements RtcpBandwidthObserver interface + void OnReceivedEstimatedBitrate(uint32_t bitrate) override; + void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks, + int64_t rtt, + int64_t now_ms) override; + + // Implements TransportFeedbackObserver interface + void AddPacket(uint32_t ssrc, + uint16_t sequence_number, + size_t length, + const PacedPacketInfo& pacing_info) override; + void OnTransportFeedback(const rtcp::TransportFeedback& feedback) override; + + // Implements CallStatsObserver interface + void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override; + + class PeriodicTask; + private: + void MaybeCreateControllers() RTC_RUN_ON(task_queue_); + void UpdateInitialConstraints(TargetRateConstraints new_contraints) + RTC_RUN_ON(task_queue_); + + void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); + void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); + void UpdatePacerQueue() RTC_RUN_ON(task_queue_); + + void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); + void MaybeUpdateOutstandingData(); + void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, + int64_t now_ms) + RTC_RUN_ON(task_queue_); + const Clock* const clock_; PacketRouter packet_router_; std::vector> video_rtp_senders_; @@ -117,9 +155,49 @@ class RtpTransportControllerSend final RtpBitrateConfigurator bitrate_configurator_; std::map network_routes_; const std::unique_ptr process_thread_; - rtc::CriticalSection observer_crit_; - TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_); - std::unique_ptr send_side_cc_; + + TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); + + // TODO(srte): Move all access to feedback adapter to task queue. + TransportFeedbackAdapter transport_feedback_adapter_; + + NetworkControllerFactoryInterface* const controller_factory_override_ + RTC_PT_GUARDED_BY(task_queue_); + const std::unique_ptr + controller_factory_fallback_ RTC_PT_GUARDED_BY(task_queue_); + + std::unique_ptr control_handler_ + RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_); + + std::unique_ptr controller_ + RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_); + + TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_); + + std::map last_report_blocks_ + RTC_GUARDED_BY(task_queue_); + Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_); + + NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_); + StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_); + + const bool reset_feedback_on_route_change_; + const bool send_side_bwe_with_overhead_; + // Transport overhead is written by OnNetworkRouteChanged and read by + // AddPacket. + // TODO(srte): Remove atomic when feedback adapter runs on task queue. + std::atomic transport_overhead_bytes_per_packet_; + bool network_available_ RTC_GUARDED_BY(task_queue_); + bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_); + bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_); + PeriodicTask* pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_) + RTC_PT_GUARDED_BY(task_queue_); + PeriodicTask* controller_task_ RTC_GUARDED_BY(task_queue_) + RTC_PT_GUARDED_BY(task_queue_); + // Protects access to last_packet_feedback_vector_ in feedback adapter. + // TODO(srte): Remove this checker when feedback adapter runs on task queue. + rtc::RaceChecker worker_race_; + RateLimiter retransmission_rate_limiter_; // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. diff --git a/modules/congestion_controller/rtp/BUILD.gn b/modules/congestion_controller/rtp/BUILD.gn index bd4e1282a2..9bf78bcbe6 100644 --- a/modules/congestion_controller/rtp/BUILD.gn +++ b/modules/congestion_controller/rtp/BUILD.gn @@ -16,47 +16,6 @@ config("bwe_test_logging") { } } -rtc_static_library("congestion_controller") { - visibility = [ "*" ] - configs += [ ":bwe_test_logging" ] - sources = [ - "include/send_side_congestion_controller.h", - "send_side_congestion_controller.cc", - ] - - if (!build_with_chromium && is_clang) { - # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). - suppressed_configs += [ "//build/config/clang:find_bad_constructs" ] - } - - deps = [ - ":control_handler", - ":transport_feedback", - "../:congestion_controller", - "../..:module_api", - "../../..:webrtc_common", - "../../../api/transport:goog_cc", - "../../../api/transport:network_control", - "../../../rtc_base:checks", - "../../../rtc_base:rate_limiter", - "../../../rtc_base:rtc_task_queue_api", - "../../../rtc_base:safe_minmax", - "../../../rtc_base:sequenced_task_checker", - "../../../rtc_base/experiments:congestion_controller_experiment", - "../../../rtc_base/network:sent_packet", - "../../../system_wrappers", - "../../../system_wrappers:field_trial", - "../../pacing", - "../../remote_bitrate_estimator", - "../../rtp_rtcp:rtp_rtcp_format", - "//third_party/abseil-cpp/absl/memory", - ] - - if (!build_with_mozilla) { - deps += [ "../../../rtc_base:rtc_base" ] - } -} - rtc_source_set("control_handler") { visibility = [ "*" ] sources = [ @@ -122,12 +81,10 @@ if (rtc_include_tests) { sources = [ "congestion_controller_unittests_helper.cc", "congestion_controller_unittests_helper.h", - "send_side_congestion_controller_unittest.cc", "send_time_history_unittest.cc", "transport_feedback_adapter_unittest.cc", ] deps = [ - ":congestion_controller", ":transport_feedback", "../:congestion_controller", "../:mock_congestion_controller", diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h deleted file mode 100644 index 55465cb75f..0000000000 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_ -#define MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_ - -#include -#include -#include -#include -#include - -#include "api/transport/network_control.h" -#include "api/transport/network_types.h" -#include "common_types.h" // NOLINT(build/include) -#include "modules/congestion_controller/include/send_side_congestion_controller_interface.h" -#include "modules/congestion_controller/rtp/control_handler.h" -#include "modules/congestion_controller/rtp/transport_feedback_adapter.h" -#include "modules/include/module.h" -#include "modules/include/module_common_types.h" -#include "modules/pacing/paced_sender.h" -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "rtc_base/constructormagic.h" -#include "rtc_base/criticalsection.h" -#include "rtc_base/networkroute.h" -#include "rtc_base/race_checker.h" -#include "rtc_base/task_queue.h" - -namespace rtc { -struct SentPacket; -} - -namespace webrtc { - -class Clock; -class RtcEventLog; - -namespace webrtc_cc { - -namespace send_side_cc_internal { - -// TODO(srte): Make sure the PeriodicTask implementation is reusable and move it -// to task_queue.h. -class PeriodicTask : public rtc::QueuedTask { - public: - virtual void Stop() = 0; -}; -} // namespace send_side_cc_internal - -class SendSideCongestionController - : public SendSideCongestionControllerInterface, - public RtcpBandwidthObserver { - public: - SendSideCongestionController( - const Clock* clock, - rtc::TaskQueue* task_queue, - RtcEventLog* event_log, - PacedSender* pacer, - int start_bitrate_bps, - int min_bitrate_bps, - int max_bitrate_bps, - NetworkControllerFactoryInterface* controller_factory); - - ~SendSideCongestionController() override; - - void RegisterPacketFeedbackObserver( - PacketFeedbackObserver* observer) override; - void DeRegisterPacketFeedbackObserver( - PacketFeedbackObserver* observer) override; - - // Currently, there can be at most one observer. - // TODO(nisse): The RegisterNetworkObserver method is needed because we first - // construct this object (as part of RtpTransportControllerSend), then pass a - // reference to Call, which then registers itself as the observer. We should - // try to break this circular chain of references, and make the observer a - // construction time constant. - void RegisterNetworkObserver(NetworkChangedObserver* observer) override; - - void SetBweBitrates(int min_bitrate_bps, - int start_bitrate_bps, - int max_bitrate_bps) override; - - void SetAllocatedSendBitrateLimits(int64_t min_send_bitrate_bps, - int64_t max_padding_bitrate_bps, - int64_t max_total_bitrate_bps) override; - - // Resets the BWE state. Note the first argument is the bitrate_bps. - void OnNetworkRouteChanged(const rtc::NetworkRoute& network_route, - int bitrate_bps, - int min_bitrate_bps, - int max_bitrate_bps) override; - void SignalNetworkState(NetworkState state) override; - - RtcpBandwidthObserver* GetBandwidthObserver() override; - - bool AvailableBandwidth(uint32_t* bandwidth) const override; - - TransportFeedbackObserver* GetTransportFeedbackObserver() override; - - void SetPerPacketFeedbackAvailable(bool available) override; - void EnablePeriodicAlrProbing(bool enable) override; - - void OnSentPacket(const rtc::SentPacket& sent_packet) override; - - // Implements RtcpBandwidthObserver - void OnReceivedEstimatedBitrate(uint32_t bitrate) override; - void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks, - int64_t rtt, - int64_t now_ms) override; - // Implements CallStatsObserver. - void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override; - - // Implements Module. - int64_t TimeUntilNextProcess() override; - void Process() override; - - // Implements TransportFeedbackObserver. - void AddPacket(uint32_t ssrc, - uint16_t sequence_number, - size_t length, - const PacedPacketInfo& pacing_info) override; - void OnTransportFeedback(const rtcp::TransportFeedback& feedback) override; - - std::vector GetTransportFeedbackVector() const; - - void SetPacingFactor(float pacing_factor) override; - - void SetAllocatedBitrateWithoutFeedback(uint32_t bitrate_bps) override; - - protected: - // TODO(srte): The tests should be rewritten to not depend on internals and - // these functions should be removed. - // Since we can't control the timing of the internal task queue, this method - // is used in unit tests to stop the periodic tasks from running unless - // PostPeriodicTasksForTest is called. - void DisablePeriodicTasks(); - // Post periodic tasks just once. This allows unit tests to trigger process - // updates immediately. - void PostPeriodicTasksForTest(); - // Waits for outstanding tasks to be finished. This allos unit tests to ensure - // that expected callbacks has been called. - void WaitOnTasksForTest(); - - private: - void MaybeCreateControllers() RTC_RUN_ON(task_queue_); - void MaybeRecreateControllers() RTC_RUN_ON(task_queue_); - void UpdateInitialConstraints(TargetRateConstraints new_contraints) - RTC_RUN_ON(task_queue_); - - void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); - void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); - void UpdatePacerQueue() RTC_RUN_ON(task_queue_); - - void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); - void MaybeUpdateOutstandingData(); - void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, - int64_t now_ms) - RTC_RUN_ON(task_queue_); - - const Clock* const clock_; - // PacedSender is thread safe and doesn't need protection here. - PacedSender* const pacer_; - // TODO(srte): Move all access to feedback adapter to task queue. - TransportFeedbackAdapter transport_feedback_adapter_; - - NetworkControllerFactoryInterface* const controller_factory_with_feedback_ - RTC_GUARDED_BY(task_queue_); - const std::unique_ptr - controller_factory_fallback_ RTC_GUARDED_BY(task_queue_); - - std::unique_ptr control_handler_ - RTC_GUARDED_BY(task_queue_); - - std::unique_ptr controller_ - RTC_GUARDED_BY(task_queue_); - - TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_); - - std::map last_report_blocks_ - RTC_GUARDED_BY(task_queue_); - Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_); - - NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_); - NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_); - StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_); - - const bool reset_feedback_on_route_change_; - const bool send_side_bwe_with_overhead_; - // Transport overhead is written by OnNetworkRouteChanged and read by - // AddPacket. - // TODO(srte): Remove atomic when feedback adapter runs on task queue. - std::atomic transport_overhead_bytes_per_packet_; - bool network_available_ RTC_GUARDED_BY(task_queue_); - bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_); - bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_); - send_side_cc_internal::PeriodicTask* pacer_queue_update_task_ - RTC_GUARDED_BY(task_queue_); - send_side_cc_internal::PeriodicTask* controller_task_ - RTC_GUARDED_BY(task_queue_); - - // Protects access to last_packet_feedback_vector_ in feedback adapter. - // TODO(srte): Remove this checker when feedback adapter runs on task queue. - rtc::RaceChecker worker_race_; - - rtc::TaskQueue* task_queue_; - - RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SendSideCongestionController); -}; -} // namespace webrtc_cc -} // namespace webrtc - -#endif // MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_ diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc deleted file mode 100644 index 982380f457..0000000000 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ /dev/null @@ -1,588 +0,0 @@ -/* - * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h" - -#include -#include -#include -#include -#include "absl/memory/memory.h" -#include "api/transport/goog_cc_factory.h" -#include "api/transport/network_types.h" -#include "modules/remote_bitrate_estimator/include/bwe_defines.h" -#include "rtc_base/bind.h" -#include "rtc_base/checks.h" -#include "rtc_base/event.h" -#include "rtc_base/format_macros.h" -#include "rtc_base/logging.h" -#include "rtc_base/network/sent_packet.h" -#include "rtc_base/numerics/safe_conversions.h" -#include "rtc_base/numerics/safe_minmax.h" -#include "rtc_base/rate_limiter.h" -#include "rtc_base/sequenced_task_checker.h" -#include "rtc_base/timeutils.h" -#include "system_wrappers/include/field_trial.h" - -using absl::make_unique; - -namespace webrtc { -namespace webrtc_cc { -namespace { -using send_side_cc_internal::PeriodicTask; -const int64_t PacerQueueUpdateIntervalMs = 25; - -TargetRateConstraints ConvertConstraints(int min_bitrate_bps, - int max_bitrate_bps, - int start_bitrate_bps, - const Clock* clock) { - TargetRateConstraints msg; - msg.at_time = Timestamp::ms(clock->TimeInMilliseconds()); - msg.min_data_rate = - min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero(); - msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps) - : DataRate::Infinity(); - if (start_bitrate_bps > 0) - msg.starting_rate = DataRate::bps(start_bitrate_bps); - return msg; -} - -// The template closure pattern is based on rtc::ClosureTask. -template -class PeriodicTaskImpl final : public PeriodicTask { - public: - PeriodicTaskImpl(rtc::TaskQueue* task_queue, - int64_t period_ms, - Closure&& closure) - : task_queue_(task_queue), - period_ms_(period_ms), - closure_(std::forward(closure)) {} - bool Run() override { - if (!running_) - return true; - closure_(); - // absl::WrapUnique lets us repost this task on the TaskQueue. - task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_); - // Return false to tell TaskQueue to not destruct this object, since we have - // taken ownership with absl::WrapUnique. - return false; - } - void Stop() override { - if (task_queue_->IsCurrent()) { - RTC_DCHECK(running_); - running_ = false; - } else { - task_queue_->PostTask([this] { Stop(); }); - } - } - - private: - rtc::TaskQueue* const task_queue_; - const int64_t period_ms_; - typename std::remove_const< - typename std::remove_reference::type>::type closure_; - bool running_ = true; -}; - -template -static PeriodicTask* StartPeriodicTask(rtc::TaskQueue* task_queue, - int64_t period_ms, - Closure&& closure) { - auto periodic_task = absl::make_unique>( - task_queue, period_ms, std::forward(closure)); - PeriodicTask* periodic_task_ptr = periodic_task.get(); - task_queue->PostDelayedTask(std::move(periodic_task), period_ms); - return periodic_task_ptr; -} - -} // namespace - -SendSideCongestionController::SendSideCongestionController( - const Clock* clock, - rtc::TaskQueue* task_queue, - RtcEventLog* event_log, - PacedSender* pacer, - int start_bitrate_bps, - int min_bitrate_bps, - int max_bitrate_bps, - NetworkControllerFactoryInterface* controller_factory) - : clock_(clock), - pacer_(pacer), - transport_feedback_adapter_(clock_), - controller_factory_with_feedback_(controller_factory), - controller_factory_fallback_( - absl::make_unique(event_log)), - process_interval_(controller_factory_fallback_->GetProcessInterval()), - last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())), - observer_(nullptr), - reset_feedback_on_route_change_( - !field_trial::IsEnabled("WebRTC-Bwe-NoFeedbackReset")), - send_side_bwe_with_overhead_( - webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")), - transport_overhead_bytes_per_packet_(0), - network_available_(false), - periodic_tasks_enabled_(true), - packet_feedback_available_(false), - pacer_queue_update_task_(nullptr), - controller_task_(nullptr), - task_queue_(task_queue) { - initial_config_.constraints = ConvertConstraints( - min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_); - RTC_DCHECK(start_bitrate_bps > 0); - // To be fully compatible with legacy SendSideCongestionController, make sure - // pacer is initialized even if there are no registered streams. This should - // not happen under normal circumstances, but some tests rely on it and there - // are no checks detecting when the legacy SendSideCongestionController is - // used. This way of setting the value has the drawback that it might be wrong - // compared to what the actual value from the congestion controller will be. - // TODO(srte): Remove this when the legacy SendSideCongestionController is - // removed. - pacer_->SetEstimatedBitrate(start_bitrate_bps); -} - -// There is no point in having a network controller for a network that is not -// yet available and if we don't have any observer of it's state. -// MaybeCreateControllers is used to trigger creation if those things are -// fulfilled. This is needed since dependent code uses the period until network -// is signalled to be avaliabile to set the expected start bitrate which is sent -// to the initializer for NetworkControllers. The observer is injected later due -// to a circular dependency between RtpTransportControllerSend and Call. -// TODO(srte): Break the circular dependency issue and make sure that starting -// bandwidth is set before this class is initialized so the controllers can be -// created in the constructor. -void SendSideCongestionController::MaybeCreateControllers() { - if (!controller_) - MaybeRecreateControllers(); -} - -void SendSideCongestionController::MaybeRecreateControllers() { - if (!network_available_ || !observer_) - return; - if (!control_handler_) { - control_handler_ = - absl::make_unique(observer_, pacer_); - } - - initial_config_.constraints.at_time = - Timestamp::ms(clock_->TimeInMilliseconds()); - initial_config_.stream_based_config = streams_config_; - - if (!controller_) { - // TODO(srte): Use fallback controller if no feedback is available. - if (controller_factory_with_feedback_) { - RTC_LOG(LS_INFO) << "Creating feedback based only controller"; - controller_ = controller_factory_with_feedback_->Create(initial_config_); - process_interval_ = - controller_factory_with_feedback_->GetProcessInterval(); - } else { - RTC_LOG(LS_INFO) << "Creating fallback controller"; - controller_ = controller_factory_fallback_->Create(initial_config_); - process_interval_ = controller_factory_fallback_->GetProcessInterval(); - } - UpdateControllerWithTimeInterval(); - StartProcessPeriodicTasks(); - } - RTC_DCHECK(controller_); -} - -void SendSideCongestionController::UpdateInitialConstraints( - TargetRateConstraints new_contraints) { - if (!new_contraints.starting_rate) - new_contraints.starting_rate = initial_config_.constraints.starting_rate; - RTC_DCHECK(new_contraints.starting_rate); - initial_config_.constraints = new_contraints; -} - -SendSideCongestionController::~SendSideCongestionController() = default; - -void SendSideCongestionController::RegisterPacketFeedbackObserver( - PacketFeedbackObserver* observer) { - transport_feedback_adapter_.RegisterPacketFeedbackObserver(observer); -} - -void SendSideCongestionController::DeRegisterPacketFeedbackObserver( - PacketFeedbackObserver* observer) { - transport_feedback_adapter_.DeRegisterPacketFeedbackObserver(observer); -} - -void SendSideCongestionController::RegisterNetworkObserver( - NetworkChangedObserver* observer) { - task_queue_->PostTask([this, observer]() { - RTC_DCHECK_RUN_ON(task_queue_); - RTC_DCHECK(observer_ == nullptr); - observer_ = observer; - MaybeCreateControllers(); - }); -} - -void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps, - int start_bitrate_bps, - int max_bitrate_bps) { - TargetRateConstraints constraints = ConvertConstraints( - min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_); - task_queue_->PostTask([this, constraints]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (controller_) { - control_handler_->PostUpdates( - controller_->OnTargetRateConstraints(constraints)); - } else { - UpdateInitialConstraints(constraints); - } - }); -} - -void SendSideCongestionController::SetAllocatedSendBitrateLimits( - int64_t min_send_bitrate_bps, - int64_t max_padding_bitrate_bps, - int64_t max_total_bitrate_bps) { - RTC_DCHECK_RUN_ON(task_queue_); - streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps); - streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps); - streams_config_.max_total_allocated_bitrate = - DataRate::bps(max_total_bitrate_bps); - UpdateStreamsConfig(); -} - -// TODO(holmer): Split this up and use SetBweBitrates in combination with -// OnNetworkRouteChanged. -void SendSideCongestionController::OnNetworkRouteChanged( - const rtc::NetworkRoute& network_route, - int start_bitrate_bps, - int min_bitrate_bps, - int max_bitrate_bps) { - if (reset_feedback_on_route_change_) - transport_feedback_adapter_.SetNetworkIds(network_route.local_network_id, - network_route.remote_network_id); - transport_overhead_bytes_per_packet_ = network_route.packet_overhead; - - NetworkRouteChange msg; - msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - msg.constraints = ConvertConstraints(min_bitrate_bps, max_bitrate_bps, - start_bitrate_bps, clock_); - - task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (controller_) { - control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg)); - } else { - UpdateInitialConstraints(msg.constraints); - } - pacer_->UpdateOutstandingData(0); - }); -} - -bool SendSideCongestionController::AvailableBandwidth( - uint32_t* bandwidth) const { - // This is only called in the OnNetworkChanged callback in - // RtpTransportControllerSend which is called from ControlHandler, which is - // running on the task queue. - // TODO(srte): Remove this function when RtpTransportControllerSend stops - // calling it. - RTC_DCHECK_RUN_ON(task_queue_); - if (!control_handler_) { - return false; - } - // TODO(srte): Remove this interface and push information about bandwidth - // estimation to users of this class, thereby reducing synchronous calls. - if (control_handler_->last_transfer_rate().has_value()) { - *bandwidth = control_handler_->last_transfer_rate() - ->network_estimate.bandwidth.bps(); - return true; - } - return false; -} - -RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() { - return this; -} - -void SendSideCongestionController::SetPerPacketFeedbackAvailable( - bool available) { - RTC_DCHECK_RUN_ON(task_queue_); - packet_feedback_available_ = available; - MaybeRecreateControllers(); -} - -void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) { - task_queue_->PostTask([this, enable]() { - RTC_DCHECK_RUN_ON(task_queue_); - streams_config_.requests_alr_probing = enable; - UpdateStreamsConfig(); - }); -} - -void SendSideCongestionController::UpdateStreamsConfig() { - streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - if (controller_) - control_handler_->PostUpdates( - controller_->OnStreamsConfig(streams_config_)); -} - -TransportFeedbackObserver* -SendSideCongestionController::GetTransportFeedbackObserver() { - return this; -} - -void SendSideCongestionController::SignalNetworkState(NetworkState state) { - RTC_LOG(LS_INFO) << "SignalNetworkState " - << (state == kNetworkUp ? "Up" : "Down"); - NetworkAvailability msg; - msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - msg.network_available = state == kNetworkUp; - task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_); - network_available_ = msg.network_available; - if (controller_) { - control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg)); - control_handler_->OnNetworkAvailability(msg); - } else { - MaybeCreateControllers(); - } - }); -} - -void SendSideCongestionController::OnSentPacket( - const rtc::SentPacket& sent_packet) { - absl::optional packet_msg = - transport_feedback_adapter_.ProcessSentPacket(sent_packet); - if (packet_msg) { - task_queue_->PostTask([this, packet_msg]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (controller_) - control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg)); - }); - } - MaybeUpdateOutstandingData(); -} - -void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, - int64_t max_rtt_ms) { - int64_t now_ms = clock_->TimeInMilliseconds(); - RoundTripTimeUpdate report; - report.receive_time = Timestamp::ms(now_ms); - report.round_trip_time = TimeDelta::ms(avg_rtt_ms); - report.smoothed = true; - task_queue_->PostTask([this, report]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (controller_) - control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report)); - }); -} - -int64_t SendSideCongestionController::TimeUntilNextProcess() { - // Using task queue to process, just sleep long to avoid wasting resources. - return 60 * 1000; -} - -void SendSideCongestionController::Process() { - // Ignored, using task queue to process. -} - -void SendSideCongestionController::StartProcessPeriodicTasks() { - if (!periodic_tasks_enabled_) - return; - if (!pacer_queue_update_task_) { - pacer_queue_update_task_ = - StartPeriodicTask(task_queue_, PacerQueueUpdateIntervalMs, [this]() { - RTC_DCHECK_RUN_ON(task_queue_); - UpdatePacerQueue(); - }); - } - if (controller_task_) { - // Stop is not synchronous, but is guaranteed to occur before the first - // invocation of the new controller task started below. - controller_task_->Stop(); - controller_task_ = nullptr; - } - if (process_interval_.IsFinite()) { - // The controller task is owned by the task queue and lives until the task - // queue is destroyed or some time after Stop() is called, whichever comes - // first. - controller_task_ = - StartPeriodicTask(task_queue_, process_interval_.ms(), [this]() { - RTC_DCHECK_RUN_ON(task_queue_); - UpdateControllerWithTimeInterval(); - }); - } -} - -void SendSideCongestionController::UpdateControllerWithTimeInterval() { - if (controller_) { - ProcessInterval msg; - msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - control_handler_->PostUpdates(controller_->OnProcessInterval(msg)); - } -} - -void SendSideCongestionController::UpdatePacerQueue() { - if (control_handler_) { - TimeDelta expected_queue_time = - TimeDelta::ms(pacer_->ExpectedQueueTimeMs()); - control_handler_->OnPacerQueueUpdate(expected_queue_time); - } -} - -void SendSideCongestionController::AddPacket( - uint32_t ssrc, - uint16_t sequence_number, - size_t length, - const PacedPacketInfo& pacing_info) { - if (send_side_bwe_with_overhead_) { - length += transport_overhead_bytes_per_packet_; - } - transport_feedback_adapter_.AddPacket(ssrc, sequence_number, length, - pacing_info); -} - -void SendSideCongestionController::OnTransportFeedback( - const rtcp::TransportFeedback& feedback) { - RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); - - absl::optional feedback_msg = - transport_feedback_adapter_.ProcessTransportFeedback(feedback); - if (feedback_msg) { - task_queue_->PostTask([this, feedback_msg]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (controller_) - control_handler_->PostUpdates( - controller_->OnTransportPacketsFeedback(*feedback_msg)); - }); - } - MaybeUpdateOutstandingData(); -} - -void SendSideCongestionController::MaybeUpdateOutstandingData() { - DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData(); - task_queue_->PostTask([this, in_flight_data]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (control_handler_) - control_handler_->OnOutstandingData(in_flight_data); - }); -} - -std::vector -SendSideCongestionController::GetTransportFeedbackVector() const { - RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); - return transport_feedback_adapter_.GetTransportFeedbackVector(); -} - -void SendSideCongestionController::PostPeriodicTasksForTest() { - task_queue_->PostTask([this]() { - RTC_DCHECK_RUN_ON(task_queue_); - UpdateControllerWithTimeInterval(); - UpdatePacerQueue(); - }); -} - -void SendSideCongestionController::WaitOnTasksForTest() { - rtc::Event event; - task_queue_->PostTask([&event]() { event.Set(); }); - event.Wait(rtc::Event::kForever); -} - -void SendSideCongestionController::SetPacingFactor(float pacing_factor) { - RTC_DCHECK_RUN_ON(task_queue_); - streams_config_.pacing_factor = pacing_factor; - UpdateStreamsConfig(); -} - -void SendSideCongestionController::SetAllocatedBitrateWithoutFeedback( - uint32_t bitrate_bps) { - task_queue_->PostTask([this, bitrate_bps]() { - RTC_DCHECK_RUN_ON(task_queue_); - streams_config_.unacknowledged_rate_allocation = DataRate::bps(bitrate_bps); - UpdateStreamsConfig(); - }); -} - -void SendSideCongestionController::DisablePeriodicTasks() { - task_queue_->PostTask([this]() { - RTC_DCHECK_RUN_ON(task_queue_); - periodic_tasks_enabled_ = false; - }); -} - -void SendSideCongestionController::OnReceivedEstimatedBitrate( - uint32_t bitrate) { - RemoteBitrateReport msg; - msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds()); - msg.bandwidth = DataRate::bps(bitrate); - task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (controller_) - control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg)); - }); -} - -void SendSideCongestionController::OnReceivedRtcpReceiverReport( - const webrtc::ReportBlockList& report_blocks, - int64_t rtt_ms, - int64_t now_ms) { - task_queue_->PostTask([this, report_blocks, now_ms]() { - RTC_DCHECK_RUN_ON(task_queue_); - OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); - }); - - task_queue_->PostTask([this, now_ms, rtt_ms]() { - RTC_DCHECK_RUN_ON(task_queue_); - RoundTripTimeUpdate report; - report.receive_time = Timestamp::ms(now_ms); - report.round_trip_time = TimeDelta::ms(rtt_ms); - report.smoothed = false; - if (controller_) - control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report)); - }); -} - -void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks( - const ReportBlockList& report_blocks, - int64_t now_ms) { - if (report_blocks.empty()) - return; - - int total_packets_lost_delta = 0; - int total_packets_delta = 0; - - // Compute the packet loss from all report blocks. - for (const RTCPReportBlock& report_block : report_blocks) { - auto it = last_report_blocks_.find(report_block.source_ssrc); - if (it != last_report_blocks_.end()) { - auto number_of_packets = report_block.extended_highest_sequence_number - - it->second.extended_highest_sequence_number; - total_packets_delta += number_of_packets; - auto lost_delta = report_block.packets_lost - it->second.packets_lost; - total_packets_lost_delta += lost_delta; - } - last_report_blocks_[report_block.source_ssrc] = report_block; - } - // Can only compute delta if there has been previous blocks to compare to. If - // not, total_packets_delta will be unchanged and there's nothing more to do. - if (!total_packets_delta) - return; - int packets_received_delta = total_packets_delta - total_packets_lost_delta; - // To detect lost packets, at least one packet has to be received. This check - // is needed to avoid bandwith detection update in - // VideoSendStreamTest.SuspendBelowMinBitrate - - if (packets_received_delta < 1) - return; - Timestamp now = Timestamp::ms(now_ms); - TransportLossReport msg; - msg.packets_lost_delta = total_packets_lost_delta; - msg.packets_received_delta = packets_received_delta; - msg.receive_time = now; - msg.start_time = last_report_block_time_; - msg.end_time = now; - if (controller_) - control_handler_->PostUpdates(controller_->OnTransportLossReport(msg)); - last_report_block_time_ = now; -} -} // namespace webrtc_cc -} // namespace webrtc diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc deleted file mode 100644 index 64a5234873..0000000000 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h" -#include "logging/rtc_event_log/mock/mock_rtc_event_log.h" -#include "modules/congestion_controller/include/mock/mock_congestion_observer.h" -#include "modules/congestion_controller/rtp/congestion_controller_unittests_helper.h" -#include "modules/pacing/mock/mock_paced_sender.h" -#include "modules/pacing/packet_router.h" -#include "modules/remote_bitrate_estimator/include/bwe_defines.h" -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" -#include "rtc_base/network/sent_packet.h" -#include "system_wrappers/include/clock.h" -#include "test/field_trial.h" -#include "test/gmock.h" -#include "test/gtest.h" - -using testing::_; -using testing::AtLeast; -using testing::Ge; -using testing::NiceMock; -using testing::Return; -using testing::SaveArg; -using testing::StrictMock; - -namespace webrtc { -namespace webrtc_cc { -namespace test { - -namespace { -using webrtc::test::MockCongestionObserver; -const webrtc::PacedPacketInfo kPacingInfo0(0, 5, 2000); -const webrtc::PacedPacketInfo kPacingInfo1(1, 8, 4000); - -const uint32_t kInitialBitrateBps = 60000; -const float kDefaultPacingRate = 2.5f; - -class SendSideCongestionControllerForTest - : public SendSideCongestionController { - public: - using SendSideCongestionController::SendSideCongestionController; - ~SendSideCongestionControllerForTest() {} - using SendSideCongestionController::DisablePeriodicTasks; - void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); } - void Process() override { - SendSideCongestionController::PostPeriodicTasksForTest(); - SendSideCongestionController::WaitOnTasksForTest(); - } -}; -} // namespace - -class SendSideCongestionControllerTest : public ::testing::Test { - protected: - SendSideCongestionControllerTest() - : clock_(123456), - target_bitrate_observer_(this), - bandwidth_observer_(nullptr) {} - ~SendSideCongestionControllerTest() override {} - - void SetUp() override { - pacer_.reset(new NiceMock()); - // Set the initial bitrate estimate and expect the |observer| and |pacer_| - // to be updated. - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _)); - EXPECT_CALL(*pacer_, - SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _)); - EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 3)); - EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 5)); - task_queue_ = absl::make_unique("SSCC Test"); - controller_.reset(new SendSideCongestionControllerForTest( - &clock_, task_queue_.get(), &event_log_, pacer_.get(), - kInitialBitrateBps, 0, 5 * kInitialBitrateBps, nullptr)); - controller_->DisablePeriodicTasks(); - controller_->RegisterNetworkObserver(&observer_); - controller_->SignalNetworkState(NetworkState::kNetworkUp); - bandwidth_observer_ = controller_->GetBandwidthObserver(); - controller_->WaitOnTasks(); - testing::Mock::VerifyAndClearExpectations(pacer_.get()); - testing::Mock::VerifyAndClearExpectations(&observer_); - } - - void TearDown() override { controller_->WaitOnTasks(); } - - // Custom setup - use an observer that tracks the target bitrate, without - // prescribing on which iterations it must change (like a mock would). - void TargetBitrateTrackingSetup() { - bandwidth_observer_ = nullptr; - pacer_.reset(new NiceMock()); - task_queue_ = absl::make_unique("SSCC Test"); - controller_.reset(new SendSideCongestionControllerForTest( - &clock_, task_queue_.get(), &event_log_, pacer_.get(), - kInitialBitrateBps, 0, 5 * kInitialBitrateBps, nullptr)); - controller_->DisablePeriodicTasks(); - controller_->RegisterNetworkObserver(&target_bitrate_observer_); - controller_->SignalNetworkState(NetworkState::kNetworkUp); - } - - void OnSentPacket(const PacketFeedback& packet_feedback) { - constexpr uint32_t ssrc = 0; - controller_->AddPacket(ssrc, packet_feedback.sequence_number, - packet_feedback.payload_size, - packet_feedback.pacing_info); - rtc::PacketInfo packet_info; - packet_info.included_in_feedback = true; - controller_->OnSentPacket(rtc::SentPacket(packet_feedback.sequence_number, - packet_feedback.send_time_ms, - packet_info)); - } - - // Allows us to track the target bitrate, without prescribing the exact - // iterations when this would hapen, like a mock would. - class TargetBitrateObserver : public NetworkChangedObserver { - public: - explicit TargetBitrateObserver(SendSideCongestionControllerTest* owner) - : owner_(owner) {} - ~TargetBitrateObserver() override = default; - void OnNetworkChanged(uint32_t bitrate_bps, - uint8_t fraction_loss, // 0 - 255. - int64_t rtt_ms, - int64_t probing_interval_ms) override { - owner_->target_bitrate_bps_ = bitrate_bps; - } - - private: - SendSideCongestionControllerTest* owner_; - }; - - void PacketTransmissionAndFeedbackBlock(uint16_t* seq_num, - int64_t runtime_ms, - int64_t delay) { - int64_t delay_buildup = 0; - int64_t start_time_ms = clock_.TimeInMilliseconds(); - while (clock_.TimeInMilliseconds() - start_time_ms < runtime_ms) { - constexpr size_t kPayloadSize = 1000; - PacketFeedback packet(clock_.TimeInMilliseconds() + delay_buildup, - clock_.TimeInMilliseconds(), *seq_num, kPayloadSize, - PacedPacketInfo()); - delay_buildup += delay; // Delay has to increase, or it's just RTT. - OnSentPacket(packet); - // Create expected feedback and send into adapter. - std::unique_ptr feedback( - new rtcp::TransportFeedback()); - feedback->SetBase(packet.sequence_number, packet.arrival_time_ms * 1000); - EXPECT_TRUE(feedback->AddReceivedPacket(packet.sequence_number, - packet.arrival_time_ms * 1000)); - rtc::Buffer raw_packet = feedback->Build(); - feedback = rtcp::TransportFeedback::ParseFrom(raw_packet.data(), - raw_packet.size()); - EXPECT_TRUE(feedback.get() != nullptr); - controller_->OnTransportFeedback(*feedback.get()); - clock_.AdvanceTimeMilliseconds(50); - controller_->Process(); - ++(*seq_num); - } - } - - SimulatedClock clock_; - StrictMock observer_; - TargetBitrateObserver target_bitrate_observer_; - NiceMock event_log_; - RtcpBandwidthObserver* bandwidth_observer_; - PacketRouter packet_router_; - std::unique_ptr> pacer_; - std::unique_ptr controller_; - absl::optional target_bitrate_bps_; - std::unique_ptr task_queue_; -}; - -TEST_F(SendSideCongestionControllerTest, OnNetworkChanged) { - // Test no change. - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); - - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _)); - EXPECT_CALL(*pacer_, - SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _)); - bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2); - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); - - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _)); - EXPECT_CALL(*pacer_, - SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _)); - bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps); - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); -} - -TEST_F(SendSideCongestionControllerTest, OnSendQueueFull) { - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1)); - - EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _)); - controller_->Process(); - - // Let the pacer not be full next time the controller checks. - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1)); - - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _)); - controller_->Process(); -} - -TEST_F(SendSideCongestionControllerTest, OnSendQueueFullAndEstimateChange) { - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1)); - EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _)); - controller_->Process(); - - // Receive new estimate but let the queue still be full. - bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2); - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1)); - // The send pacer should get the new estimate though. - EXPECT_CALL(*pacer_, - SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _)); - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); - - // Let the pacer not be full next time the controller checks. - // |OnNetworkChanged| should be called with the new estimate. - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1)); - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _)); - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); -} - -TEST_F(SendSideCongestionControllerTest, SignalNetworkState) { - EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _)); - controller_->SignalNetworkState(kNetworkDown); - controller_->WaitOnTasks(); - - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _)); - controller_->SignalNetworkState(kNetworkUp); - controller_->WaitOnTasks(); - - EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _)); - controller_->SignalNetworkState(kNetworkDown); -} - -TEST_F(SendSideCongestionControllerTest, OnNetworkRouteChanged) { - int new_bitrate = 200000; - EXPECT_CALL(observer_, OnNetworkChanged(new_bitrate, _, _, _)); - EXPECT_CALL(*pacer_, SetPacingRates(new_bitrate * kDefaultPacingRate, _)); - rtc::NetworkRoute route; - route.local_network_id = 1; - controller_->OnNetworkRouteChanged(route, new_bitrate, -1, -1); - controller_->WaitOnTasks(); - - testing::Mock::VerifyAndClearExpectations(pacer_.get()); - testing::Mock::VerifyAndClearExpectations(&observer_); - // If the bitrate is reset to -1, the new starting bitrate will be - // the minimum default bitrate kMinBitrateBps. - EXPECT_CALL( - observer_, - OnNetworkChanged(congestion_controller::GetMinBitrateBps(), _, _, _)); - EXPECT_CALL( - *pacer_, - SetPacingRates( - congestion_controller::GetMinBitrateBps() * kDefaultPacingRate, _)); - route.local_network_id = 2; - controller_->OnNetworkRouteChanged(route, -1, -1, -1); -} - -TEST_F(SendSideCongestionControllerTest, OldFeedback) { - int new_bitrate = 200000; - testing::Mock::VerifyAndClearExpectations(pacer_.get()); - EXPECT_CALL(observer_, OnNetworkChanged(new_bitrate, _, _, _)); - EXPECT_CALL(*pacer_, SetPacingRates(new_bitrate * kDefaultPacingRate, _)); - - // Send a few packets on the first network route. - std::vector packets; - packets.push_back(PacketFeedback(0, 0, 0, 1500, kPacingInfo0)); - packets.push_back(PacketFeedback(10, 10, 1, 1500, kPacingInfo0)); - packets.push_back(PacketFeedback(20, 20, 2, 1500, kPacingInfo0)); - packets.push_back(PacketFeedback(30, 30, 3, 1500, kPacingInfo1)); - packets.push_back(PacketFeedback(40, 40, 4, 1500, kPacingInfo1)); - - for (const PacketFeedback& packet : packets) - OnSentPacket(packet); - - // Change route and then insert a number of feedback packets. - rtc::NetworkRoute route; - route.local_network_id = 1; - controller_->OnNetworkRouteChanged(route, new_bitrate, -1, -1); - - for (const PacketFeedback& packet : packets) { - rtcp::TransportFeedback feedback; - feedback.SetBase(packet.sequence_number, packet.arrival_time_ms * 1000); - - EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number, - packet.arrival_time_ms * 1000)); - feedback.Build(); - controller_->OnTransportFeedback(feedback); - } - controller_->WaitOnTasks(); - // If the bitrate is reset to -1, the new starting bitrate will be - // the minimum default bitrate kMinBitrateBps. - EXPECT_CALL( - observer_, - OnNetworkChanged(congestion_controller::GetMinBitrateBps(), _, _, _)); - EXPECT_CALL( - *pacer_, - SetPacingRates( - congestion_controller::GetMinBitrateBps() * kDefaultPacingRate, _)); - route.local_network_id = 2; - controller_->OnNetworkRouteChanged(route, -1, -1, -1); -} - -TEST_F(SendSideCongestionControllerTest, - SignalNetworkStateAndQueueIsFullAndEstimateChange) { - // Send queue is full. - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1)); - EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _)); - controller_->Process(); - - // Queue is full and network is down. Expect no bitrate change. - controller_->SignalNetworkState(kNetworkDown); - controller_->Process(); - - // Queue is full but network is up. Expect no bitrate change. - controller_->SignalNetworkState(kNetworkUp); - controller_->Process(); - - // Receive new estimate but let the queue still be full. - EXPECT_CALL(*pacer_, - SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _)); - bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2); - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); - - // Let the pacer not be full next time the controller checks. - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1)); - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _)); - controller_->Process(); -} - -TEST_F(SendSideCongestionControllerTest, GetProbingInterval) { - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); - - EXPECT_CALL(observer_, OnNetworkChanged(_, _, _, testing::Ne(0))); - EXPECT_CALL(*pacer_, SetPacingRates(_, _)); - bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2); - clock_.AdvanceTimeMilliseconds(25); - controller_->Process(); -} - -TEST_F(SendSideCongestionControllerTest, ProbeOnRouteChange) { - EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 6)); - EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 12)); - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _)); - rtc::NetworkRoute route; - route.local_network_id = 1; - controller_->OnNetworkRouteChanged(route, 2 * kInitialBitrateBps, 0, - 20 * kInitialBitrateBps); - controller_->Process(); -} - -// Estimated bitrate reduced when the feedbacks arrive with such a long delay, -// that the send-time-history no longer holds the feedbacked packets. -TEST_F(SendSideCongestionControllerTest, LongFeedbackDelays) { - TargetBitrateTrackingSetup(); - - const int64_t kFeedbackTimeoutMs = 60001; - const int kMaxConsecutiveFailedLookups = 5; - for (int i = 0; i < kMaxConsecutiveFailedLookups; ++i) { - std::vector packets; - packets.push_back( - PacketFeedback(i * 100, 2 * i * 100, 0, 1500, kPacingInfo0)); - packets.push_back( - PacketFeedback(i * 100 + 10, 2 * i * 100 + 10, 1, 1500, kPacingInfo0)); - packets.push_back( - PacketFeedback(i * 100 + 20, 2 * i * 100 + 20, 2, 1500, kPacingInfo0)); - packets.push_back( - PacketFeedback(i * 100 + 30, 2 * i * 100 + 30, 3, 1500, kPacingInfo1)); - packets.push_back( - PacketFeedback(i * 100 + 40, 2 * i * 100 + 40, 4, 1500, kPacingInfo1)); - - for (const PacketFeedback& packet : packets) - OnSentPacket(packet); - - rtcp::TransportFeedback feedback; - feedback.SetBase(packets[0].sequence_number, - packets[0].arrival_time_ms * 1000); - - for (const PacketFeedback& packet : packets) { - EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number, - packet.arrival_time_ms * 1000)); - } - - feedback.Build(); - - clock_.AdvanceTimeMilliseconds(kFeedbackTimeoutMs); - PacketFeedback later_packet(kFeedbackTimeoutMs + i * 100 + 40, - kFeedbackTimeoutMs + i * 200 + 40, 5, 1500, - kPacingInfo1); - OnSentPacket(later_packet); - - controller_->OnTransportFeedback(feedback); - - // Check that packets have timed out. - for (PacketFeedback& packet : packets) { - packet.send_time_ms = PacketFeedback::kNoSendTime; - packet.payload_size = 0; - packet.pacing_info = PacedPacketInfo(); - } - ComparePacketFeedbackVectors(packets, - controller_->GetTransportFeedbackVector()); - } - - controller_->Process(); - - EXPECT_EQ(kInitialBitrateBps / 2, target_bitrate_bps_); - - // Test with feedback that isn't late enough to time out. - { - std::vector packets; - packets.push_back(PacketFeedback(100, 200, 0, 1500, kPacingInfo0)); - packets.push_back(PacketFeedback(110, 210, 1, 1500, kPacingInfo0)); - packets.push_back(PacketFeedback(120, 220, 2, 1500, kPacingInfo0)); - packets.push_back(PacketFeedback(130, 230, 3, 1500, kPacingInfo1)); - packets.push_back(PacketFeedback(140, 240, 4, 1500, kPacingInfo1)); - - for (const PacketFeedback& packet : packets) - OnSentPacket(packet); - - rtcp::TransportFeedback feedback; - feedback.SetBase(packets[0].sequence_number, - packets[0].arrival_time_ms * 1000); - - for (const PacketFeedback& packet : packets) { - EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number, - packet.arrival_time_ms * 1000)); - } - - feedback.Build(); - - clock_.AdvanceTimeMilliseconds(kFeedbackTimeoutMs - 1); - PacketFeedback later_packet(kFeedbackTimeoutMs + 140, - kFeedbackTimeoutMs + 240, 5, 1500, - kPacingInfo1); - OnSentPacket(later_packet); - - controller_->OnTransportFeedback(feedback); - ComparePacketFeedbackVectors(packets, - controller_->GetTransportFeedbackVector()); - } -} - -// Bandwidth estimation is updated when feedbacks are received. -// Feedbacks which show an increasing delay cause the estimation to be reduced. -TEST_F(SendSideCongestionControllerTest, UpdatesDelayBasedEstimate) { - TargetBitrateTrackingSetup(); - - const int64_t kRunTimeMs = 6000; - uint16_t seq_num = 0; - - // The test must run and insert packets/feedback long enough that the - // BWE computes a valid estimate. This is first done in an environment which - // simulates no bandwidth limitation, and therefore not built-up delay. - PacketTransmissionAndFeedbackBlock(&seq_num, kRunTimeMs, 0); - ASSERT_TRUE(target_bitrate_bps_); - - // Repeat, but this time with a building delay, and make sure that the - // estimation is adjusted downwards. - uint32_t bitrate_before_delay = *target_bitrate_bps_; - PacketTransmissionAndFeedbackBlock(&seq_num, kRunTimeMs, 50); - EXPECT_LT(*target_bitrate_bps_, bitrate_before_delay); -} - -TEST_F(SendSideCongestionControllerTest, PacerQueueEncodeRatePushback) { - ::webrtc::test::ScopedFieldTrials pushback_field_trial( - "WebRTC-PacerPushbackExperiment/Enabled/"); - SetUp(); - - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0)); - controller_->Process(); - - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(100)); - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 0.9, _, _, _)); - controller_->Process(); - - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(50)); - controller_->Process(); - - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0)); - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _)); - controller_->Process(); - - const uint32_t kMinAdjustedBps = 50000; - int expected_queue_threshold = - 1000 - kMinAdjustedBps * 1000.0 / kInitialBitrateBps; - - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillOnce(Return(expected_queue_threshold)); - EXPECT_CALL(observer_, OnNetworkChanged(Ge(kMinAdjustedBps), _, _, _)); - controller_->Process(); - - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()) - .WillOnce(Return(expected_queue_threshold + 1)); - EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _)); - controller_->Process(); - - EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0)); - EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _)); - controller_->Process(); -} - -} // namespace test -} // namespace webrtc_cc -} // namespace webrtc