From efbcfb13a7a22a7050ad7787524510e38782f3c8 Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Tue, 13 Mar 2018 11:38:04 +0100 Subject: [PATCH] Configuration in constructor of Goog CC. Adding configuration of new GoogCcNetworkController to initializer, this makes sure that it is properly initialized from the start. To achieve this SendSideCongestionController waits until it has received the necessary information to construct the object. This information should be provided in the constructor for SendSideCongestionController in the future. Bug: webrtc:8415 Change-Id: Icc09b8b246bae9f9704b80855fc4caa3450b34fc Reviewed-on: https://webrtc-review.googlesource.com/58099 Reviewed-by: Stefan Holmer Reviewed-by: Niels Moller Commit-Queue: Sebastian Jansson Cr-Commit-Position: refs/heads/master@{#22404} --- .../goog_cc/goog_cc_factory.cc | 5 +- .../goog_cc/goog_cc_network_control.cc | 19 +- .../goog_cc/goog_cc_network_control.h | 5 +- .../goog_cc/include/goog_cc_factory.h | 3 +- .../network_control/include/network_control.h | 18 +- .../network_control/include/network_types.h | 2 +- .../include/send_side_congestion_controller.h | 42 ++- .../rtp/send_side_congestion_controller.cc | 241 +++++++++++------- ...end_side_congestion_controller_unittest.cc | 1 + 9 files changed, 228 insertions(+), 108 deletions(-) diff --git a/modules/congestion_controller/goog_cc/goog_cc_factory.cc b/modules/congestion_controller/goog_cc/goog_cc_factory.cc index ad230bc1e9..88d52ee604 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_factory.cc +++ b/modules/congestion_controller/goog_cc/goog_cc_factory.cc @@ -18,9 +18,10 @@ GoogCcNetworkControllerFactory::GoogCcNetworkControllerFactory( : event_log_(event_log) {} NetworkControllerInterface::uptr GoogCcNetworkControllerFactory::Create( - NetworkControllerObserver* observer) { + NetworkControllerObserver* observer, + NetworkControllerConfig config) { return rtc::MakeUnique(event_log_, - observer); + observer, config); } TimeDelta GoogCcNetworkControllerFactory::GetProcessInterval() const { diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control.cc b/modules/congestion_controller/goog_cc/goog_cc_network_control.cc index 4e42a5148e..6c06c2cc25 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_network_control.cc +++ b/modules/congestion_controller/goog_cc/goog_cc_network_control.cc @@ -105,7 +105,8 @@ std::vector ReceivedPacketsFeedbackAsRtp( GoogCcNetworkController::GoogCcNetworkController( RtcEventLog* event_log, - NetworkControllerObserver* observer) + NetworkControllerObserver* observer, + NetworkControllerConfig config) : event_log_(event_log), observer_(observer), probe_controller_(new ProbeController(observer_)), @@ -121,6 +122,8 @@ GoogCcNetworkController::GoogCcNetworkController( in_cwnd_experiment_(CwndExperimentEnabled()), accepted_queue_ms_(kDefaultAcceptedQueueMs) { delay_based_bwe_->SetMinBitrate(congestion_controller::GetMinBitrateBps()); + UpdateBitrateConstraints(config.constraints, config.starting_bandwidth); + OnStreamsConfig(config.stream_based_config); if (in_cwnd_experiment_ && !ReadCwndExperimentParameter(&accepted_queue_ms_)) { RTC_LOG(LS_WARNING) << "Failed to parse parameters for CwndExperiment " @@ -136,9 +139,9 @@ void GoogCcNetworkController::OnNetworkAvailability(NetworkAvailability msg) { } void GoogCcNetworkController::OnNetworkRouteChange(NetworkRouteChange msg) { - int64_t min_bitrate_bps = msg.constraints.min_data_rate.bps(); + int64_t min_bitrate_bps = msg.constraints.min_data_rate.bps_or(-1); int64_t max_bitrate_bps = msg.constraints.max_data_rate.bps_or(-1); - int64_t start_bitrate_bps = msg.constraints.starting_rate.bps_or(-1); + int64_t start_bitrate_bps = msg.starting_rate.bps_or(-1); ClampBitrates(&start_bitrate_bps, &min_bitrate_bps, &max_bitrate_bps); @@ -215,9 +218,15 @@ void GoogCcNetworkController::OnStreamsConfig(StreamsConfig msg) { void GoogCcNetworkController::OnTargetRateConstraints( TargetRateConstraints constraints) { - int64_t min_bitrate_bps = constraints.min_data_rate.bps(); + UpdateBitrateConstraints(constraints, DataRate::kNotInitialized); +} + +void GoogCcNetworkController::UpdateBitrateConstraints( + TargetRateConstraints constraints, + DataRate starting_rate) { + int64_t min_bitrate_bps = constraints.min_data_rate.bps_or(0); int64_t max_bitrate_bps = constraints.max_data_rate.bps_or(-1); - int64_t start_bitrate_bps = constraints.starting_rate.bps_or(-1); + int64_t start_bitrate_bps = starting_rate.bps_or(-1); ClampBitrates(&start_bitrate_bps, &min_bitrate_bps, &max_bitrate_bps); diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control.h b/modules/congestion_controller/goog_cc/goog_cc_network_control.h index c22596a4f4..12e22a815b 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_network_control.h +++ b/modules/congestion_controller/goog_cc/goog_cc_network_control.h @@ -32,7 +32,8 @@ namespace webrtc_cc { class GoogCcNetworkController : public NetworkControllerInterface { public: GoogCcNetworkController(RtcEventLog* event_log, - NetworkControllerObserver* observer); + NetworkControllerObserver* observer, + NetworkControllerConfig config); ~GoogCcNetworkController() override; // NetworkControllerInterface @@ -48,6 +49,8 @@ class GoogCcNetworkController : public NetworkControllerInterface { void OnTransportPacketsFeedback(TransportPacketsFeedback msg) override; private: + void UpdateBitrateConstraints(TargetRateConstraints constraints, + DataRate starting_rate); void MaybeUpdateCongestionWindow(); void MaybeTriggerOnNetworkChanged(Timestamp at_time); bool GetNetworkParameters(int32_t* estimated_bitrate_bps, diff --git a/modules/congestion_controller/goog_cc/include/goog_cc_factory.h b/modules/congestion_controller/goog_cc/include/goog_cc_factory.h index 20717f9f72..3e48462c9b 100644 --- a/modules/congestion_controller/goog_cc/include/goog_cc_factory.h +++ b/modules/congestion_controller/goog_cc/include/goog_cc_factory.h @@ -21,7 +21,8 @@ class GoogCcNetworkControllerFactory public: explicit GoogCcNetworkControllerFactory(RtcEventLog*); NetworkControllerInterface::uptr Create( - NetworkControllerObserver* observer) override; + NetworkControllerObserver* observer, + NetworkControllerConfig config) override; TimeDelta GetProcessInterval() const override; private: diff --git a/modules/congestion_controller/network_control/include/network_control.h b/modules/congestion_controller/network_control/include/network_control.h index fedd794a0b..1bff82a589 100644 --- a/modules/congestion_controller/network_control/include/network_control.h +++ b/modules/congestion_controller/network_control/include/network_control.h @@ -41,6 +41,21 @@ class NetworkControllerObserver : public TargetTransferRateObserver { virtual void OnProbeClusterConfig(ProbeClusterConfig) = 0; }; +// Configuration sent to factory create function. The parameters here are +// optional to use for a network controller implementation. +struct NetworkControllerConfig { + // The initial constraints to start with, these can be changed at any later + // time by calls to OnTargetRateConstraints. + TargetRateConstraints constraints; + // Initial stream specific configuration, these are changed at any later time + // by calls to OnStreamsConfig. + StreamsConfig stream_based_config; + // The initial bandwidth estimate to base target rate on. This should be used + // as the basis for initial OnTargetTransferRate and OnPacerConfig callbacks. + // Note that starting rate is only provided on construction. + DataRate starting_bandwidth; +}; + // NetworkControllerInterface is implemented by network controllers. A network // controller is a class that uses information about network state and traffic // to estimate network parameters such as round trip time and bandwidth. Network @@ -82,7 +97,8 @@ class NetworkControllerFactoryInterface { // Used to create a new network controller, requires an observer to be // provided to handle callbacks. virtual NetworkControllerInterface::uptr Create( - NetworkControllerObserver* observer) = 0; + NetworkControllerObserver* observer, + NetworkControllerConfig config) = 0; // Returns the interval by which the network controller expects // OnProcessInterval calls. virtual TimeDelta GetProcessInterval() const = 0; diff --git a/modules/congestion_controller/network_control/include/network_types.h b/modules/congestion_controller/network_control/include/network_types.h index bb3637fc87..cf90323540 100644 --- a/modules/congestion_controller/network_control/include/network_types.h +++ b/modules/congestion_controller/network_control/include/network_types.h @@ -38,7 +38,6 @@ struct StreamsConfig { struct TargetRateConstraints { Timestamp at_time; - DataRate starting_rate; DataRate min_data_rate; DataRate max_data_rate; }; @@ -55,6 +54,7 @@ struct NetworkRouteChange { // The TargetRateConstraints are set here so they can be changed synchronously // when network route changes. TargetRateConstraints constraints; + DataRate starting_rate; }; struct SentPacket { diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index ff3fefe4ac..91f7117f7e 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -141,32 +141,54 @@ class SendSideCongestionController void WaitOnTasks(); private: - void UpdateStreamsConfig(); - void WaitOnTask(std::function closure); + void MaybeCreateControllers(); + + void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void MaybeUpdateOutstandingData(); void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, - int64_t now_ms); + int64_t now_ms) + RTC_RUN_ON(task_queue_); const Clock* const clock_; + // PacedSender is thread safe and doesn't need protection here. PacedSender* const pacer_; + // TODO(srte): Move all access to feedback adapter to task queue. TransportFeedbackAdapter transport_feedback_adapter_; const std::unique_ptr controller_factory_; - const std::unique_ptr pacer_controller_; - const std::unique_ptr control_handler; - const std::unique_ptr controller_; + const std::unique_ptr pacer_controller_ + RTC_GUARDED_BY(task_queue_); + + std::unique_ptr control_handler_ + RTC_GUARDED_BY(task_queue_); + + std::unique_ptr controller_ + RTC_GUARDED_BY(task_queue_); + + // TODO(srte): Review access constraints of these when introducing delayed + // tasks. Only accessed from process threads. TimeDelta process_interval_; + // Only accessed from process threads. int64_t last_process_update_ms_ = 0; - std::map last_report_blocks_; - Timestamp last_report_block_time_; + std::map last_report_blocks_ + RTC_GUARDED_BY(task_queue_); + Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_); + + NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_); + NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_); + StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_); - StreamsConfig streams_config_; const bool send_side_bwe_with_overhead_; + // Transport overhead is written by OnNetworkRouteChanged and read by + // AddPacket. + // TODO(srte): Remove atomic when feedback adapter runs on task queue. std::atomic transport_overhead_bytes_per_packet_; - std::atomic network_available_; + bool network_available_ RTC_GUARDED_BY(task_queue_); + // Protects access to last_packet_feedback_vector_ in feedback adapter. + // TODO(srte): Remove this checker when feedback adapter runs on task queue. rtc::RaceChecker worker_race_; // Note that moving ownership of the task queue makes it neccessary to make diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index 41f06fba6d..2901321048 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -87,14 +87,11 @@ std::vector PacketResultsFromRtpFeedbackVector( TargetRateConstraints ConvertConstraints(int min_bitrate_bps, int max_bitrate_bps, - int start_bitrate_bps, const Clock* clock) { TargetRateConstraints msg; msg.at_time = Timestamp::ms(clock->TimeInMilliseconds()); msg.min_data_rate = min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero(); - msg.starting_rate = start_bitrate_bps > 0 ? DataRate::bps(start_bitrate_bps) - : DataRate::kNotInitialized; msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps) : DataRate::Infinity(); return msg; @@ -104,7 +101,9 @@ TargetRateConstraints ConvertConstraints(int min_bitrate_bps, namespace send_side_cc_internal { class ControlHandler : public NetworkControllerObserver { public: - ControlHandler(PacerController* pacer_controller, const Clock* clock); + ControlHandler(NetworkChangedObserver* observer, + PacerController* pacer_controller, + const Clock* clock); void OnCongestionWindow(CongestionWindow window) override; void OnPacerConfig(PacerConfig config) override; @@ -114,8 +113,6 @@ class ControlHandler : public NetworkControllerObserver { void OnNetworkAvailability(NetworkAvailability msg); void OnPacerQueueUpdate(PacerQueueUpdate msg); - void RegisterNetworkObserver(NetworkChangedObserver* observer); - rtc::Optional last_transfer_rate(); bool pacer_configured(); @@ -128,14 +125,12 @@ class ControlHandler : public NetworkControllerObserver { bool HasNetworkParametersToReportChanged(int64_t bitrate_bps, uint8_t fraction_loss, int64_t rtt); + NetworkChangedObserver* observer_ = nullptr; PacerController* pacer_controller_; - rtc::CriticalSection state_lock_; - rtc::Optional last_target_rate_ - RTC_GUARDED_BY(state_lock_); - bool pacer_configured_ RTC_GUARDED_BY(state_lock_) = false; + rtc::Optional last_target_rate_; + bool pacer_configured_ = false; - NetworkChangedObserver* observer_ = nullptr; rtc::Optional current_target_rate_msg_; bool network_available_ = true; int64_t last_reported_target_bitrate_bps_ = 0; @@ -149,9 +144,11 @@ class ControlHandler : public NetworkControllerObserver { RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(ControlHandler); }; -ControlHandler::ControlHandler(PacerController* pacer_controller, +ControlHandler::ControlHandler(NetworkChangedObserver* observer, + PacerController* pacer_controller, const Clock* clock) - : pacer_controller_(pacer_controller), + : observer_(observer), + pacer_controller_(pacer_controller), pacer_pushback_experiment_(IsPacerPushbackExperimentEnabled()) { sequenced_checker_.Detach(); } @@ -164,7 +161,6 @@ void ControlHandler::OnCongestionWindow(CongestionWindow window) { void ControlHandler::OnPacerConfig(PacerConfig config) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); pacer_controller_->OnPacerConfig(config); - rtc::CritScope cs(&state_lock_); pacer_configured_ = true; } @@ -177,7 +173,6 @@ void ControlHandler::OnTargetTransferRate(TargetTransferRate target_rate) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); current_target_rate_msg_ = target_rate; OnNetworkInvalidation(); - rtc::CritScope cs(&state_lock_); last_target_rate_ = target_rate; } @@ -193,12 +188,6 @@ void ControlHandler::OnPacerQueueUpdate(PacerQueueUpdate msg) { OnNetworkInvalidation(); } -void ControlHandler::RegisterNetworkObserver(NetworkChangedObserver* observer) { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - RTC_DCHECK(observer_ == nullptr); - observer_ = observer; -} - void ControlHandler::OnNetworkInvalidation() { if (!current_target_rate_msg_.has_value()) return; @@ -236,10 +225,8 @@ void ControlHandler::OnNetworkInvalidation() { } if (HasNetworkParametersToReportChanged(target_bitrate_bps, fraction_loss, rtt_ms)) { - if (observer_) { - observer_->OnNetworkChanged(target_bitrate_bps, fraction_loss, rtt_ms, - probing_interval_ms); - } + observer_->OnNetworkChanged(target_bitrate_bps, fraction_loss, rtt_ms, + probing_interval_ms); } } bool ControlHandler::HasNetworkParametersToReportChanged( @@ -266,12 +253,12 @@ bool ControlHandler::IsSendQueueFull() const { } rtc::Optional ControlHandler::last_transfer_rate() { - rtc::CritScope cs(&state_lock_); + RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); return last_target_rate_; } bool ControlHandler::pacer_configured() { - rtc::CritScope cs(&state_lock_); + RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); return pacer_configured_; } } // namespace send_side_cc_internal @@ -288,18 +275,45 @@ SendSideCongestionController::SendSideCongestionController( transport_feedback_adapter_(clock_), controller_factory_(ControllerFactory(event_log)), pacer_controller_(MakeUnique(pacer_)), - control_handler(MakeUnique( - pacer_controller_.get(), - clock_)), - controller_(controller_factory_->Create(control_handler.get())), process_interval_(controller_factory_->GetProcessInterval()), + observer_(nullptr), send_side_bwe_with_overhead_( webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")), transport_overhead_bytes_per_packet_(0), network_available_(false), task_queue_(MakeUnique("SendSideCCQueue")) { - SignalNetworkState(NetworkState::kNetworkDown); - SetBweBitrates(min_bitrate_bps, start_bitrate_bps, max_bitrate_bps); + initial_config_.constraints = + ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); + initial_config_.stream_based_config = StreamsConfig(); + RTC_DCHECK(start_bitrate_bps > 0); + initial_config_.starting_bandwidth = DataRate::bps(start_bitrate_bps); +} + +// There is no point in having a network controller for a network that is not +// yet available and if we don't have any observer of it's state. +// MaybeCreateControllers is used to trigger creation if those things are +// fulfilled. This is needed since dependent code uses the period until network +// is signalled to be avaliabile to set the expected start bitrate which is sent +// to the initializer for NetworkControllers. The observer is injected later due +// to a circular dependency between RtpTransportControllerSend and Call. +// TODO(srte): Break the circular dependency issue and make sure that starting +// bandwidth is set before this class is initialized so the controllers can be +// created in the constructor. +void SendSideCongestionController::MaybeCreateControllers() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_ || !network_available_ || !observer_) + return; + + initial_config_.constraints.at_time = + Timestamp::ms(clock_->TimeInMilliseconds()); + initial_config_.stream_based_config = streams_config_; + + control_handler_ = MakeUnique( + observer_, pacer_controller_.get(), clock_); + + controller_ = + controller_factory_->Create(control_handler_.get(), initial_config_); + UpdateStreamsConfig(); } SendSideCongestionController::~SendSideCongestionController() { @@ -319,23 +333,35 @@ void SendSideCongestionController::DeRegisterPacketFeedbackObserver( void SendSideCongestionController::RegisterNetworkObserver( NetworkChangedObserver* observer) { - WaitOnTask([this, observer]() { - control_handler->RegisterNetworkObserver(observer); + task_queue_->PostTask([this, observer]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + RTC_DCHECK(observer_ == nullptr); + observer_ = observer; + MaybeCreateControllers(); }); } - void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps, int start_bitrate_bps, int max_bitrate_bps) { - TargetRateConstraints msg = ConvertConstraints( - min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_); - WaitOnTask([this, msg]() { controller_->OnTargetRateConstraints(msg); }); + TargetRateConstraints constraints = + ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); + task_queue_->PostTask([this, constraints, start_bitrate_bps]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) { + controller_->OnTargetRateConstraints(constraints); + } else { + initial_config_.constraints = constraints; + if (start_bitrate_bps > 0) + initial_config_.starting_bandwidth = DataRate::bps(start_bitrate_bps); + } + }); } void SendSideCongestionController::SetMaxTotalAllocatedBitrate( int max_total_allocated_bitrate) { - WaitOnTask([this, max_total_allocated_bitrate]() { + task_queue_->PostTask([this, max_total_allocated_bitrate]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.max_total_allocated_bitrate = DataRate::bps(max_total_allocated_bitrate); UpdateStreamsConfig(); @@ -351,12 +377,16 @@ void SendSideCongestionController::OnNetworkRouteChanged( int max_bitrate_bps) { transport_feedback_adapter_.SetNetworkIds(network_route.local_network_id, network_route.remote_network_id); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; NetworkRouteChange msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - msg.constraints = ConvertConstraints(min_bitrate_bps, max_bitrate_bps, - start_bitrate_bps, clock_); - WaitOnTask([this, msg]() { + msg.constraints = + ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); + msg.starting_rate = start_bitrate_bps > 0 ? DataRate::bps(start_bitrate_bps) + : DataRate::kNotInitialized; + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); controller_->OnNetworkRouteChange(msg); pacer_controller_->OnNetworkRouteChange(msg); }); @@ -364,11 +394,20 @@ void SendSideCongestionController::OnNetworkRouteChanged( bool SendSideCongestionController::AvailableBandwidth( uint32_t* bandwidth) const { + // This is only called in the OnNetworkChanged callback in + // RtpTransportControllerSend which is called from ControlHandler, which is + // running on the task queue. + // TODO(srte): Remove this function when RtpTransportControllerSend stops + // calling it. + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (!control_handler_) { + return false; + } // TODO(srte): Remove this interface and push information about bandwidth // estimation to users of this class, thereby reducing synchronous calls. - if (control_handler->last_transfer_rate().has_value()) { - *bandwidth = - control_handler->last_transfer_rate()->network_estimate.bandwidth.bps(); + if (control_handler_->last_transfer_rate().has_value()) { + *bandwidth = control_handler_->last_transfer_rate() + ->network_estimate.bandwidth.bps(); return true; } return false; @@ -379,16 +418,17 @@ RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() { } void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) { - WaitOnTask([this, enable]() { + task_queue_->PostTask([this, enable]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.requests_alr_probing = enable; UpdateStreamsConfig(); }); } void SendSideCongestionController::UpdateStreamsConfig() { - RTC_DCHECK(task_queue_->IsCurrent()); streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - controller_->OnStreamsConfig(streams_config_); + if (controller_) + controller_->OnStreamsConfig(streams_config_); } TransportFeedbackObserver* @@ -402,17 +442,23 @@ void SendSideCongestionController::SignalNetworkState(NetworkState state) { NetworkAvailability msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.network_available = state == kNetworkUp; - network_available_ = msg.network_available; - WaitOnTask([this, msg]() { - controller_->OnNetworkAvailability(msg); - pacer_controller_->OnNetworkAvailability(msg); - control_handler->OnNetworkAvailability(msg); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + network_available_ = msg.network_available; + if (controller_) { + controller_->OnNetworkAvailability(msg); + pacer_controller_->OnNetworkAvailability(msg); + control_handler_->OnNetworkAvailability(msg); + } else { + MaybeCreateControllers(); + } }); } void SendSideCongestionController::SetTransportOverhead( size_t transport_overhead_bytes_per_packet) { - transport_overhead_bytes_per_packet_ = transport_overhead_bytes_per_packet; + // TODO(srte): Remove this method from SendSideCongestionControllerInterface + RTC_NOTREACHED(); } void SendSideCongestionController::OnSentPacket( @@ -429,7 +475,11 @@ void SendSideCongestionController::OnSentPacket( SentPacket msg; msg.size = DataSize::bytes(packet->payload_size); msg.send_time = Timestamp::ms(packet->send_time_ms); - task_queue_->PostTask([this, msg]() { controller_->OnSentPacket(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnSentPacket(msg); + }); } } @@ -440,8 +490,11 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, report.receive_time = Timestamp::ms(now_ms); report.round_trip_time = TimeDelta::ms(avg_rtt_ms); report.smoothed = true; - task_queue_->PostTask( - [this, report]() { controller_->OnRoundTripTimeUpdate(report); }); + task_queue_->PostTask([this, report]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnRoundTripTimeUpdate(report); + }); } int64_t SendSideCongestionController::TimeUntilNextProcess() { @@ -460,15 +513,18 @@ void SendSideCongestionController::Process() { { ProcessInterval msg; msg.at_time = Timestamp::ms(now_ms); - task_queue_->PostTask( - [this, msg]() { controller_->OnProcessInterval(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnProcessInterval(msg); + }); } - if (control_handler->pacer_configured()) { + task_queue_->PostTask([this]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); PacerQueueUpdate msg; msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs()); - task_queue_->PostTask( - [this, msg]() { control_handler->OnPacerQueueUpdate(msg); }); - } + control_handler_->OnPacerQueueUpdate(msg); + }); } void SendSideCongestionController::AddPacket( @@ -504,8 +560,11 @@ void SendSideCongestionController::OnTransportFeedback( msg.prior_in_flight = prior_in_flight; msg.data_in_flight = DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); - task_queue_->PostTask( - [this, msg]() { controller_->OnTransportPacketsFeedback(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnTransportPacketsFeedback(msg); + }); } } @@ -513,8 +572,10 @@ void SendSideCongestionController::MaybeUpdateOutstandingData() { OutstandingData msg; msg.in_flight_data = DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); - task_queue_->PostTask( - [this, msg]() { pacer_controller_->OnOutstandingData(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + pacer_controller_->OnOutstandingData(msg); + }); } std::vector @@ -529,16 +590,12 @@ void SendSideCongestionController::WaitOnTasks() { event.Wait(rtc::Event::kForever); } -void SendSideCongestionController::WaitOnTask(std::function closure) { - rtc::Event done(false, false); - task_queue_->PostTask(rtc::NewClosure(closure, [&done] { done.Set(); })); - done.Wait(rtc::Event::kForever); -} - void SendSideCongestionController::SetSendBitrateLimits( int64_t min_send_bitrate_bps, int64_t max_padding_bitrate_bps) { - WaitOnTask([this, min_send_bitrate_bps, max_padding_bitrate_bps]() { + task_queue_->PostTask([this, min_send_bitrate_bps, + max_padding_bitrate_bps]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps); streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps); UpdateStreamsConfig(); @@ -546,7 +603,8 @@ void SendSideCongestionController::SetSendBitrateLimits( } void SendSideCongestionController::SetPacingFactor(float pacing_factor) { - WaitOnTask([this, pacing_factor]() { + task_queue_->PostTask([this, pacing_factor]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.pacing_factor = pacing_factor; UpdateStreamsConfig(); }); @@ -557,22 +615,31 @@ void SendSideCongestionController::OnReceivedEstimatedBitrate( RemoteBitrateReport msg; msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.bandwidth = DataRate::bps(bitrate); - task_queue_->PostTask( - [this, msg]() { controller_->OnRemoteBitrateReport(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnRemoteBitrateReport(msg); + }); } void SendSideCongestionController::OnReceivedRtcpReceiverReport( const webrtc::ReportBlockList& report_blocks, int64_t rtt_ms, int64_t now_ms) { - OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); + task_queue_->PostTask([this, report_blocks, now_ms]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); + }); - RoundTripTimeUpdate report; - report.receive_time = Timestamp::ms(now_ms); - report.round_trip_time = TimeDelta::ms(rtt_ms); - report.smoothed = false; - task_queue_->PostTask( - [this, report]() { controller_->OnRoundTripTimeUpdate(report); }); + task_queue_->PostTask([this, now_ms, rtt_ms]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + RoundTripTimeUpdate report; + report.receive_time = Timestamp::ms(now_ms); + report.round_trip_time = TimeDelta::ms(rtt_ms); + report.smoothed = false; + if (controller_) + controller_->OnRoundTripTimeUpdate(report); + }); } void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks( @@ -614,8 +681,8 @@ void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks( msg.receive_time = now; msg.start_time = last_report_block_time_; msg.end_time = now; - task_queue_->PostTask( - [this, msg]() { controller_->OnTransportLossReport(msg); }); + if (controller_) + controller_->OnTransportLossReport(msg); last_report_block_time_ = now; } } // namespace webrtc_cc diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc index 4b4c4439c6..a982c74216 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc @@ -80,6 +80,7 @@ class SendSideCongestionControllerTest : public ::testing::Test { controller_->RegisterNetworkObserver(&observer_); controller_->SignalNetworkState(NetworkState::kNetworkUp); bandwidth_observer_ = controller_->GetBandwidthObserver(); + controller_->WaitOnTasks(); testing::Mock::VerifyAndClearExpectations(pacer_.get()); testing::Mock::VerifyAndClearExpectations(&observer_); }