diff --git a/modules/congestion_controller/goog_cc/goog_cc_factory.cc b/modules/congestion_controller/goog_cc/goog_cc_factory.cc index ad230bc1e9..88d52ee604 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_factory.cc +++ b/modules/congestion_controller/goog_cc/goog_cc_factory.cc @@ -18,9 +18,10 @@ GoogCcNetworkControllerFactory::GoogCcNetworkControllerFactory( : event_log_(event_log) {} NetworkControllerInterface::uptr GoogCcNetworkControllerFactory::Create( - NetworkControllerObserver* observer) { + NetworkControllerObserver* observer, + NetworkControllerConfig config) { return rtc::MakeUnique(event_log_, - observer); + observer, config); } TimeDelta GoogCcNetworkControllerFactory::GetProcessInterval() const { diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control.cc b/modules/congestion_controller/goog_cc/goog_cc_network_control.cc index 4e42a5148e..6c06c2cc25 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_network_control.cc +++ b/modules/congestion_controller/goog_cc/goog_cc_network_control.cc @@ -105,7 +105,8 @@ std::vector ReceivedPacketsFeedbackAsRtp( GoogCcNetworkController::GoogCcNetworkController( RtcEventLog* event_log, - NetworkControllerObserver* observer) + NetworkControllerObserver* observer, + NetworkControllerConfig config) : event_log_(event_log), observer_(observer), probe_controller_(new ProbeController(observer_)), @@ -121,6 +122,8 @@ GoogCcNetworkController::GoogCcNetworkController( in_cwnd_experiment_(CwndExperimentEnabled()), accepted_queue_ms_(kDefaultAcceptedQueueMs) { delay_based_bwe_->SetMinBitrate(congestion_controller::GetMinBitrateBps()); + UpdateBitrateConstraints(config.constraints, config.starting_bandwidth); + OnStreamsConfig(config.stream_based_config); if (in_cwnd_experiment_ && !ReadCwndExperimentParameter(&accepted_queue_ms_)) { RTC_LOG(LS_WARNING) << "Failed to parse parameters for CwndExperiment " @@ -136,9 +139,9 @@ void GoogCcNetworkController::OnNetworkAvailability(NetworkAvailability msg) { } void GoogCcNetworkController::OnNetworkRouteChange(NetworkRouteChange msg) { - int64_t min_bitrate_bps = msg.constraints.min_data_rate.bps(); + int64_t min_bitrate_bps = msg.constraints.min_data_rate.bps_or(-1); int64_t max_bitrate_bps = msg.constraints.max_data_rate.bps_or(-1); - int64_t start_bitrate_bps = msg.constraints.starting_rate.bps_or(-1); + int64_t start_bitrate_bps = msg.starting_rate.bps_or(-1); ClampBitrates(&start_bitrate_bps, &min_bitrate_bps, &max_bitrate_bps); @@ -215,9 +218,15 @@ void GoogCcNetworkController::OnStreamsConfig(StreamsConfig msg) { void GoogCcNetworkController::OnTargetRateConstraints( TargetRateConstraints constraints) { - int64_t min_bitrate_bps = constraints.min_data_rate.bps(); + UpdateBitrateConstraints(constraints, DataRate::kNotInitialized); +} + +void GoogCcNetworkController::UpdateBitrateConstraints( + TargetRateConstraints constraints, + DataRate starting_rate) { + int64_t min_bitrate_bps = constraints.min_data_rate.bps_or(0); int64_t max_bitrate_bps = constraints.max_data_rate.bps_or(-1); - int64_t start_bitrate_bps = constraints.starting_rate.bps_or(-1); + int64_t start_bitrate_bps = starting_rate.bps_or(-1); ClampBitrates(&start_bitrate_bps, &min_bitrate_bps, &max_bitrate_bps); diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control.h b/modules/congestion_controller/goog_cc/goog_cc_network_control.h index c22596a4f4..12e22a815b 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_network_control.h +++ b/modules/congestion_controller/goog_cc/goog_cc_network_control.h @@ -32,7 +32,8 @@ namespace webrtc_cc { class GoogCcNetworkController : public NetworkControllerInterface { public: GoogCcNetworkController(RtcEventLog* event_log, - NetworkControllerObserver* observer); + NetworkControllerObserver* observer, + NetworkControllerConfig config); ~GoogCcNetworkController() override; // NetworkControllerInterface @@ -48,6 +49,8 @@ class GoogCcNetworkController : public NetworkControllerInterface { void OnTransportPacketsFeedback(TransportPacketsFeedback msg) override; private: + void UpdateBitrateConstraints(TargetRateConstraints constraints, + DataRate starting_rate); void MaybeUpdateCongestionWindow(); void MaybeTriggerOnNetworkChanged(Timestamp at_time); bool GetNetworkParameters(int32_t* estimated_bitrate_bps, diff --git a/modules/congestion_controller/goog_cc/include/goog_cc_factory.h b/modules/congestion_controller/goog_cc/include/goog_cc_factory.h index 20717f9f72..3e48462c9b 100644 --- a/modules/congestion_controller/goog_cc/include/goog_cc_factory.h +++ b/modules/congestion_controller/goog_cc/include/goog_cc_factory.h @@ -21,7 +21,8 @@ class GoogCcNetworkControllerFactory public: explicit GoogCcNetworkControllerFactory(RtcEventLog*); NetworkControllerInterface::uptr Create( - NetworkControllerObserver* observer) override; + NetworkControllerObserver* observer, + NetworkControllerConfig config) override; TimeDelta GetProcessInterval() const override; private: diff --git a/modules/congestion_controller/network_control/include/network_control.h b/modules/congestion_controller/network_control/include/network_control.h index fedd794a0b..1bff82a589 100644 --- a/modules/congestion_controller/network_control/include/network_control.h +++ b/modules/congestion_controller/network_control/include/network_control.h @@ -41,6 +41,21 @@ class NetworkControllerObserver : public TargetTransferRateObserver { virtual void OnProbeClusterConfig(ProbeClusterConfig) = 0; }; +// Configuration sent to factory create function. The parameters here are +// optional to use for a network controller implementation. +struct NetworkControllerConfig { + // The initial constraints to start with, these can be changed at any later + // time by calls to OnTargetRateConstraints. + TargetRateConstraints constraints; + // Initial stream specific configuration, these are changed at any later time + // by calls to OnStreamsConfig. + StreamsConfig stream_based_config; + // The initial bandwidth estimate to base target rate on. This should be used + // as the basis for initial OnTargetTransferRate and OnPacerConfig callbacks. + // Note that starting rate is only provided on construction. + DataRate starting_bandwidth; +}; + // NetworkControllerInterface is implemented by network controllers. A network // controller is a class that uses information about network state and traffic // to estimate network parameters such as round trip time and bandwidth. Network @@ -82,7 +97,8 @@ class NetworkControllerFactoryInterface { // Used to create a new network controller, requires an observer to be // provided to handle callbacks. virtual NetworkControllerInterface::uptr Create( - NetworkControllerObserver* observer) = 0; + NetworkControllerObserver* observer, + NetworkControllerConfig config) = 0; // Returns the interval by which the network controller expects // OnProcessInterval calls. virtual TimeDelta GetProcessInterval() const = 0; diff --git a/modules/congestion_controller/network_control/include/network_types.h b/modules/congestion_controller/network_control/include/network_types.h index bb3637fc87..cf90323540 100644 --- a/modules/congestion_controller/network_control/include/network_types.h +++ b/modules/congestion_controller/network_control/include/network_types.h @@ -38,7 +38,6 @@ struct StreamsConfig { struct TargetRateConstraints { Timestamp at_time; - DataRate starting_rate; DataRate min_data_rate; DataRate max_data_rate; }; @@ -55,6 +54,7 @@ struct NetworkRouteChange { // The TargetRateConstraints are set here so they can be changed synchronously // when network route changes. TargetRateConstraints constraints; + DataRate starting_rate; }; struct SentPacket { diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index ff3fefe4ac..91f7117f7e 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -141,32 +141,54 @@ class SendSideCongestionController void WaitOnTasks(); private: - void UpdateStreamsConfig(); - void WaitOnTask(std::function closure); + void MaybeCreateControllers(); + + void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void MaybeUpdateOutstandingData(); void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, - int64_t now_ms); + int64_t now_ms) + RTC_RUN_ON(task_queue_); const Clock* const clock_; + // PacedSender is thread safe and doesn't need protection here. PacedSender* const pacer_; + // TODO(srte): Move all access to feedback adapter to task queue. TransportFeedbackAdapter transport_feedback_adapter_; const std::unique_ptr controller_factory_; - const std::unique_ptr pacer_controller_; - const std::unique_ptr control_handler; - const std::unique_ptr controller_; + const std::unique_ptr pacer_controller_ + RTC_GUARDED_BY(task_queue_); + + std::unique_ptr control_handler_ + RTC_GUARDED_BY(task_queue_); + + std::unique_ptr controller_ + RTC_GUARDED_BY(task_queue_); + + // TODO(srte): Review access constraints of these when introducing delayed + // tasks. Only accessed from process threads. TimeDelta process_interval_; + // Only accessed from process threads. int64_t last_process_update_ms_ = 0; - std::map last_report_blocks_; - Timestamp last_report_block_time_; + std::map last_report_blocks_ + RTC_GUARDED_BY(task_queue_); + Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_); + + NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_); + NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_); + StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_); - StreamsConfig streams_config_; const bool send_side_bwe_with_overhead_; + // Transport overhead is written by OnNetworkRouteChanged and read by + // AddPacket. + // TODO(srte): Remove atomic when feedback adapter runs on task queue. std::atomic transport_overhead_bytes_per_packet_; - std::atomic network_available_; + bool network_available_ RTC_GUARDED_BY(task_queue_); + // Protects access to last_packet_feedback_vector_ in feedback adapter. + // TODO(srte): Remove this checker when feedback adapter runs on task queue. rtc::RaceChecker worker_race_; // Note that moving ownership of the task queue makes it neccessary to make diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index 41f06fba6d..2901321048 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -87,14 +87,11 @@ std::vector PacketResultsFromRtpFeedbackVector( TargetRateConstraints ConvertConstraints(int min_bitrate_bps, int max_bitrate_bps, - int start_bitrate_bps, const Clock* clock) { TargetRateConstraints msg; msg.at_time = Timestamp::ms(clock->TimeInMilliseconds()); msg.min_data_rate = min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero(); - msg.starting_rate = start_bitrate_bps > 0 ? DataRate::bps(start_bitrate_bps) - : DataRate::kNotInitialized; msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps) : DataRate::Infinity(); return msg; @@ -104,7 +101,9 @@ TargetRateConstraints ConvertConstraints(int min_bitrate_bps, namespace send_side_cc_internal { class ControlHandler : public NetworkControllerObserver { public: - ControlHandler(PacerController* pacer_controller, const Clock* clock); + ControlHandler(NetworkChangedObserver* observer, + PacerController* pacer_controller, + const Clock* clock); void OnCongestionWindow(CongestionWindow window) override; void OnPacerConfig(PacerConfig config) override; @@ -114,8 +113,6 @@ class ControlHandler : public NetworkControllerObserver { void OnNetworkAvailability(NetworkAvailability msg); void OnPacerQueueUpdate(PacerQueueUpdate msg); - void RegisterNetworkObserver(NetworkChangedObserver* observer); - rtc::Optional last_transfer_rate(); bool pacer_configured(); @@ -128,14 +125,12 @@ class ControlHandler : public NetworkControllerObserver { bool HasNetworkParametersToReportChanged(int64_t bitrate_bps, uint8_t fraction_loss, int64_t rtt); + NetworkChangedObserver* observer_ = nullptr; PacerController* pacer_controller_; - rtc::CriticalSection state_lock_; - rtc::Optional last_target_rate_ - RTC_GUARDED_BY(state_lock_); - bool pacer_configured_ RTC_GUARDED_BY(state_lock_) = false; + rtc::Optional last_target_rate_; + bool pacer_configured_ = false; - NetworkChangedObserver* observer_ = nullptr; rtc::Optional current_target_rate_msg_; bool network_available_ = true; int64_t last_reported_target_bitrate_bps_ = 0; @@ -149,9 +144,11 @@ class ControlHandler : public NetworkControllerObserver { RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(ControlHandler); }; -ControlHandler::ControlHandler(PacerController* pacer_controller, +ControlHandler::ControlHandler(NetworkChangedObserver* observer, + PacerController* pacer_controller, const Clock* clock) - : pacer_controller_(pacer_controller), + : observer_(observer), + pacer_controller_(pacer_controller), pacer_pushback_experiment_(IsPacerPushbackExperimentEnabled()) { sequenced_checker_.Detach(); } @@ -164,7 +161,6 @@ void ControlHandler::OnCongestionWindow(CongestionWindow window) { void ControlHandler::OnPacerConfig(PacerConfig config) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); pacer_controller_->OnPacerConfig(config); - rtc::CritScope cs(&state_lock_); pacer_configured_ = true; } @@ -177,7 +173,6 @@ void ControlHandler::OnTargetTransferRate(TargetTransferRate target_rate) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); current_target_rate_msg_ = target_rate; OnNetworkInvalidation(); - rtc::CritScope cs(&state_lock_); last_target_rate_ = target_rate; } @@ -193,12 +188,6 @@ void ControlHandler::OnPacerQueueUpdate(PacerQueueUpdate msg) { OnNetworkInvalidation(); } -void ControlHandler::RegisterNetworkObserver(NetworkChangedObserver* observer) { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - RTC_DCHECK(observer_ == nullptr); - observer_ = observer; -} - void ControlHandler::OnNetworkInvalidation() { if (!current_target_rate_msg_.has_value()) return; @@ -236,10 +225,8 @@ void ControlHandler::OnNetworkInvalidation() { } if (HasNetworkParametersToReportChanged(target_bitrate_bps, fraction_loss, rtt_ms)) { - if (observer_) { - observer_->OnNetworkChanged(target_bitrate_bps, fraction_loss, rtt_ms, - probing_interval_ms); - } + observer_->OnNetworkChanged(target_bitrate_bps, fraction_loss, rtt_ms, + probing_interval_ms); } } bool ControlHandler::HasNetworkParametersToReportChanged( @@ -266,12 +253,12 @@ bool ControlHandler::IsSendQueueFull() const { } rtc::Optional ControlHandler::last_transfer_rate() { - rtc::CritScope cs(&state_lock_); + RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); return last_target_rate_; } bool ControlHandler::pacer_configured() { - rtc::CritScope cs(&state_lock_); + RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); return pacer_configured_; } } // namespace send_side_cc_internal @@ -288,18 +275,45 @@ SendSideCongestionController::SendSideCongestionController( transport_feedback_adapter_(clock_), controller_factory_(ControllerFactory(event_log)), pacer_controller_(MakeUnique(pacer_)), - control_handler(MakeUnique( - pacer_controller_.get(), - clock_)), - controller_(controller_factory_->Create(control_handler.get())), process_interval_(controller_factory_->GetProcessInterval()), + observer_(nullptr), send_side_bwe_with_overhead_( webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")), transport_overhead_bytes_per_packet_(0), network_available_(false), task_queue_(MakeUnique("SendSideCCQueue")) { - SignalNetworkState(NetworkState::kNetworkDown); - SetBweBitrates(min_bitrate_bps, start_bitrate_bps, max_bitrate_bps); + initial_config_.constraints = + ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); + initial_config_.stream_based_config = StreamsConfig(); + RTC_DCHECK(start_bitrate_bps > 0); + initial_config_.starting_bandwidth = DataRate::bps(start_bitrate_bps); +} + +// There is no point in having a network controller for a network that is not +// yet available and if we don't have any observer of it's state. +// MaybeCreateControllers is used to trigger creation if those things are +// fulfilled. This is needed since dependent code uses the period until network +// is signalled to be avaliabile to set the expected start bitrate which is sent +// to the initializer for NetworkControllers. The observer is injected later due +// to a circular dependency between RtpTransportControllerSend and Call. +// TODO(srte): Break the circular dependency issue and make sure that starting +// bandwidth is set before this class is initialized so the controllers can be +// created in the constructor. +void SendSideCongestionController::MaybeCreateControllers() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_ || !network_available_ || !observer_) + return; + + initial_config_.constraints.at_time = + Timestamp::ms(clock_->TimeInMilliseconds()); + initial_config_.stream_based_config = streams_config_; + + control_handler_ = MakeUnique( + observer_, pacer_controller_.get(), clock_); + + controller_ = + controller_factory_->Create(control_handler_.get(), initial_config_); + UpdateStreamsConfig(); } SendSideCongestionController::~SendSideCongestionController() { @@ -319,23 +333,35 @@ void SendSideCongestionController::DeRegisterPacketFeedbackObserver( void SendSideCongestionController::RegisterNetworkObserver( NetworkChangedObserver* observer) { - WaitOnTask([this, observer]() { - control_handler->RegisterNetworkObserver(observer); + task_queue_->PostTask([this, observer]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + RTC_DCHECK(observer_ == nullptr); + observer_ = observer; + MaybeCreateControllers(); }); } - void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps, int start_bitrate_bps, int max_bitrate_bps) { - TargetRateConstraints msg = ConvertConstraints( - min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_); - WaitOnTask([this, msg]() { controller_->OnTargetRateConstraints(msg); }); + TargetRateConstraints constraints = + ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); + task_queue_->PostTask([this, constraints, start_bitrate_bps]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) { + controller_->OnTargetRateConstraints(constraints); + } else { + initial_config_.constraints = constraints; + if (start_bitrate_bps > 0) + initial_config_.starting_bandwidth = DataRate::bps(start_bitrate_bps); + } + }); } void SendSideCongestionController::SetMaxTotalAllocatedBitrate( int max_total_allocated_bitrate) { - WaitOnTask([this, max_total_allocated_bitrate]() { + task_queue_->PostTask([this, max_total_allocated_bitrate]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.max_total_allocated_bitrate = DataRate::bps(max_total_allocated_bitrate); UpdateStreamsConfig(); @@ -351,12 +377,16 @@ void SendSideCongestionController::OnNetworkRouteChanged( int max_bitrate_bps) { transport_feedback_adapter_.SetNetworkIds(network_route.local_network_id, network_route.remote_network_id); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; NetworkRouteChange msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - msg.constraints = ConvertConstraints(min_bitrate_bps, max_bitrate_bps, - start_bitrate_bps, clock_); - WaitOnTask([this, msg]() { + msg.constraints = + ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); + msg.starting_rate = start_bitrate_bps > 0 ? DataRate::bps(start_bitrate_bps) + : DataRate::kNotInitialized; + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); controller_->OnNetworkRouteChange(msg); pacer_controller_->OnNetworkRouteChange(msg); }); @@ -364,11 +394,20 @@ void SendSideCongestionController::OnNetworkRouteChanged( bool SendSideCongestionController::AvailableBandwidth( uint32_t* bandwidth) const { + // This is only called in the OnNetworkChanged callback in + // RtpTransportControllerSend which is called from ControlHandler, which is + // running on the task queue. + // TODO(srte): Remove this function when RtpTransportControllerSend stops + // calling it. + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (!control_handler_) { + return false; + } // TODO(srte): Remove this interface and push information about bandwidth // estimation to users of this class, thereby reducing synchronous calls. - if (control_handler->last_transfer_rate().has_value()) { - *bandwidth = - control_handler->last_transfer_rate()->network_estimate.bandwidth.bps(); + if (control_handler_->last_transfer_rate().has_value()) { + *bandwidth = control_handler_->last_transfer_rate() + ->network_estimate.bandwidth.bps(); return true; } return false; @@ -379,16 +418,17 @@ RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() { } void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) { - WaitOnTask([this, enable]() { + task_queue_->PostTask([this, enable]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.requests_alr_probing = enable; UpdateStreamsConfig(); }); } void SendSideCongestionController::UpdateStreamsConfig() { - RTC_DCHECK(task_queue_->IsCurrent()); streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); - controller_->OnStreamsConfig(streams_config_); + if (controller_) + controller_->OnStreamsConfig(streams_config_); } TransportFeedbackObserver* @@ -402,17 +442,23 @@ void SendSideCongestionController::SignalNetworkState(NetworkState state) { NetworkAvailability msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.network_available = state == kNetworkUp; - network_available_ = msg.network_available; - WaitOnTask([this, msg]() { - controller_->OnNetworkAvailability(msg); - pacer_controller_->OnNetworkAvailability(msg); - control_handler->OnNetworkAvailability(msg); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + network_available_ = msg.network_available; + if (controller_) { + controller_->OnNetworkAvailability(msg); + pacer_controller_->OnNetworkAvailability(msg); + control_handler_->OnNetworkAvailability(msg); + } else { + MaybeCreateControllers(); + } }); } void SendSideCongestionController::SetTransportOverhead( size_t transport_overhead_bytes_per_packet) { - transport_overhead_bytes_per_packet_ = transport_overhead_bytes_per_packet; + // TODO(srte): Remove this method from SendSideCongestionControllerInterface + RTC_NOTREACHED(); } void SendSideCongestionController::OnSentPacket( @@ -429,7 +475,11 @@ void SendSideCongestionController::OnSentPacket( SentPacket msg; msg.size = DataSize::bytes(packet->payload_size); msg.send_time = Timestamp::ms(packet->send_time_ms); - task_queue_->PostTask([this, msg]() { controller_->OnSentPacket(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnSentPacket(msg); + }); } } @@ -440,8 +490,11 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, report.receive_time = Timestamp::ms(now_ms); report.round_trip_time = TimeDelta::ms(avg_rtt_ms); report.smoothed = true; - task_queue_->PostTask( - [this, report]() { controller_->OnRoundTripTimeUpdate(report); }); + task_queue_->PostTask([this, report]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnRoundTripTimeUpdate(report); + }); } int64_t SendSideCongestionController::TimeUntilNextProcess() { @@ -460,15 +513,18 @@ void SendSideCongestionController::Process() { { ProcessInterval msg; msg.at_time = Timestamp::ms(now_ms); - task_queue_->PostTask( - [this, msg]() { controller_->OnProcessInterval(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnProcessInterval(msg); + }); } - if (control_handler->pacer_configured()) { + task_queue_->PostTask([this]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); PacerQueueUpdate msg; msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs()); - task_queue_->PostTask( - [this, msg]() { control_handler->OnPacerQueueUpdate(msg); }); - } + control_handler_->OnPacerQueueUpdate(msg); + }); } void SendSideCongestionController::AddPacket( @@ -504,8 +560,11 @@ void SendSideCongestionController::OnTransportFeedback( msg.prior_in_flight = prior_in_flight; msg.data_in_flight = DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); - task_queue_->PostTask( - [this, msg]() { controller_->OnTransportPacketsFeedback(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnTransportPacketsFeedback(msg); + }); } } @@ -513,8 +572,10 @@ void SendSideCongestionController::MaybeUpdateOutstandingData() { OutstandingData msg; msg.in_flight_data = DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); - task_queue_->PostTask( - [this, msg]() { pacer_controller_->OnOutstandingData(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + pacer_controller_->OnOutstandingData(msg); + }); } std::vector @@ -529,16 +590,12 @@ void SendSideCongestionController::WaitOnTasks() { event.Wait(rtc::Event::kForever); } -void SendSideCongestionController::WaitOnTask(std::function closure) { - rtc::Event done(false, false); - task_queue_->PostTask(rtc::NewClosure(closure, [&done] { done.Set(); })); - done.Wait(rtc::Event::kForever); -} - void SendSideCongestionController::SetSendBitrateLimits( int64_t min_send_bitrate_bps, int64_t max_padding_bitrate_bps) { - WaitOnTask([this, min_send_bitrate_bps, max_padding_bitrate_bps]() { + task_queue_->PostTask([this, min_send_bitrate_bps, + max_padding_bitrate_bps]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps); streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps); UpdateStreamsConfig(); @@ -546,7 +603,8 @@ void SendSideCongestionController::SetSendBitrateLimits( } void SendSideCongestionController::SetPacingFactor(float pacing_factor) { - WaitOnTask([this, pacing_factor]() { + task_queue_->PostTask([this, pacing_factor]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); streams_config_.pacing_factor = pacing_factor; UpdateStreamsConfig(); }); @@ -557,22 +615,31 @@ void SendSideCongestionController::OnReceivedEstimatedBitrate( RemoteBitrateReport msg; msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.bandwidth = DataRate::bps(bitrate); - task_queue_->PostTask( - [this, msg]() { controller_->OnRemoteBitrateReport(msg); }); + task_queue_->PostTask([this, msg]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) + controller_->OnRemoteBitrateReport(msg); + }); } void SendSideCongestionController::OnReceivedRtcpReceiverReport( const webrtc::ReportBlockList& report_blocks, int64_t rtt_ms, int64_t now_ms) { - OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); + task_queue_->PostTask([this, report_blocks, now_ms]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); + }); - RoundTripTimeUpdate report; - report.receive_time = Timestamp::ms(now_ms); - report.round_trip_time = TimeDelta::ms(rtt_ms); - report.smoothed = false; - task_queue_->PostTask( - [this, report]() { controller_->OnRoundTripTimeUpdate(report); }); + task_queue_->PostTask([this, now_ms, rtt_ms]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + RoundTripTimeUpdate report; + report.receive_time = Timestamp::ms(now_ms); + report.round_trip_time = TimeDelta::ms(rtt_ms); + report.smoothed = false; + if (controller_) + controller_->OnRoundTripTimeUpdate(report); + }); } void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks( @@ -614,8 +681,8 @@ void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks( msg.receive_time = now; msg.start_time = last_report_block_time_; msg.end_time = now; - task_queue_->PostTask( - [this, msg]() { controller_->OnTransportLossReport(msg); }); + if (controller_) + controller_->OnTransportLossReport(msg); last_report_block_time_ = now; } } // namespace webrtc_cc diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc index 4b4c4439c6..a982c74216 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc @@ -80,6 +80,7 @@ class SendSideCongestionControllerTest : public ::testing::Test { controller_->RegisterNetworkObserver(&observer_); controller_->SignalNetworkState(NetworkState::kNetworkUp); bandwidth_observer_ = controller_->GetBandwidthObserver(); + controller_->WaitOnTasks(); testing::Mock::VerifyAndClearExpectations(pacer_.get()); testing::Mock::VerifyAndClearExpectations(&observer_); }