diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 986fa09243..bd8e2d0303 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -22,7 +22,6 @@ #include "call/rtp_video_sender.h" #include "logging/rtc_event_log/events/rtc_event_remote_estimate.h" #include "logging/rtc_event_log/events/rtc_event_route_change.h" -#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/rate_limiter.h" @@ -229,7 +228,6 @@ void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) { } StreamFeedbackProvider* RtpTransportControllerSend::GetStreamFeedbackProvider() { - RTC_DCHECK_RUN_ON(&task_queue_); return &transport_feedback_adapter_; } @@ -280,6 +278,11 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( << " bps."; RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0); + if (reset_feedback_on_route_change_) + transport_feedback_adapter_.SetNetworkIds( + network_route.local_network_id, network_route.remote_network_id); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; + if (event_log_) { event_log_->Log(std::make_unique( network_route.connected, network_route.packet_overhead)); @@ -287,13 +290,8 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( NetworkRouteChange msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.constraints = ConvertConstraints(bitrate_config, clock_); - task_queue_.PostTask([this, msg, network_route] { + task_queue_.PostTask([this, msg] { RTC_DCHECK_RUN_ON(&task_queue_); - transport_overhead_bytes_per_packet_ = network_route.packet_overhead; - if (reset_feedback_on_route_change_) { - transport_feedback_adapter_.SetNetworkIds( - network_route.local_network_id, network_route.remote_network_id); - } if (controller_) { PostUpdates(controller_->OnNetworkRouteChange(msg)); } else { @@ -353,15 +351,17 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { } void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { - task_queue_.PostTask([this, sent_packet]() { - RTC_DCHECK_RUN_ON(&task_queue_); - absl::optional packet_msg = - transport_feedback_adapter_.ProcessSentPacket(sent_packet); - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); - if (packet_msg && controller_) - PostUpdates(controller_->OnSentPacket(*packet_msg)); - }); + absl::optional packet_msg = + transport_feedback_adapter_.ProcessSentPacket(sent_packet); + if (packet_msg) { + task_queue_.PostTask([this, packet_msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) + PostUpdates(controller_->OnSentPacket(*packet_msg)); + }); + } + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); } void RtpTransportControllerSend::OnReceivedPacket( @@ -468,30 +468,29 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReport( void RtpTransportControllerSend::OnAddPacket( const RtpPacketSendInfo& packet_info) { - auto creation_time = Timestamp::ms(clock_->TimeInMilliseconds()); - task_queue_.PostTask([this, packet_info, creation_time]() { - RTC_DCHECK_RUN_ON(&task_queue_); - transport_feedback_adapter_.AddPacket( - packet_info, - send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0, - creation_time); - }); + transport_feedback_adapter_.AddPacket( + packet_info, + send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load() + : 0, + Timestamp::ms(clock_->TimeInMilliseconds())); } void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { - auto feedback_time = Timestamp::ms(clock_->TimeInMilliseconds()); - task_queue_.PostTask([this, feedback, feedback_time]() { - RTC_DCHECK_RUN_ON(&task_queue_); - absl::optional feedback_msg = - transport_feedback_adapter_.ProcessTransportFeedback(feedback, - feedback_time); - if (feedback_msg && controller_) { - PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); - } - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); - }); + RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); + + absl::optional feedback_msg = + transport_feedback_adapter_.ProcessTransportFeedback( + feedback, Timestamp::ms(clock_->TimeInMilliseconds())); + if (feedback_msg) { + task_queue_.PostTask([this, feedback_msg]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (controller_) + PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); + }); + } + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); } void RtpTransportControllerSend::OnRemoteNetworkEstimate( diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 00b4c63be6..b5a53d7fe7 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -151,8 +151,7 @@ class RtpTransportControllerSend final TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); // TODO(srte): Move all access to feedback adapter to task queue. - TransportFeedbackAdapter transport_feedback_adapter_ - RTC_GUARDED_BY(task_queue_); + TransportFeedbackAdapter transport_feedback_adapter_; NetworkControllerFactoryInterface* const controller_factory_override_ RTC_PT_GUARDED_BY(task_queue_); @@ -177,13 +176,16 @@ class RtpTransportControllerSend final const bool reset_feedback_on_route_change_; const bool send_side_bwe_with_overhead_; const bool add_pacing_to_cwin_; - - size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(task_queue_); + // Transport overhead is written by OnNetworkRouteChanged and read by + // AddPacket. + // TODO(srte): Remove atomic when feedback adapter runs on task queue. + std::atomic transport_overhead_bytes_per_packet_; bool network_available_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); + // TODO(srte): Remove this checker when feedback adapter runs on task queue. + rtc::RaceChecker worker_race_; - // Protected by internal locks. RateLimiter retransmission_rate_limiter_; // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 94d0931314..8190eea5f3 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -135,24 +135,17 @@ class RtpVideoSenderTestFixture { VideoEncoderConfig::ContentType::kRealtimeVideo), retransmission_rate_limiter_(time_controller_.GetClock(), kRetransmitWindowSizeMs) { - rtc::Event done; - transport_controller_.GetWorkerQueue()->PostTask([&]() { - std::map suspended_ssrcs; - - router_ = std::make_unique( - time_controller_.GetClock(), suspended_ssrcs, - suspended_payload_states, config_.rtp, - config_.rtcp_report_interval_ms, &transport_, - CreateObservers(&call_stats_, &encoder_feedback_, &stats_proxy_, - &stats_proxy_, &stats_proxy_, &stats_proxy_, - frame_count_observer, &stats_proxy_, &stats_proxy_, - &send_delay_stats_), - &transport_controller_, &event_log_, &retransmission_rate_limiter_, - std::make_unique(time_controller_.GetClock()), - nullptr, CryptoOptions{}); - done.Set(); - }); - done.Wait(rtc::Event::kForever); + std::map suspended_ssrcs; + router_ = std::make_unique( + time_controller_.GetClock(), suspended_ssrcs, suspended_payload_states, + config_.rtp, config_.rtcp_report_interval_ms, &transport_, + CreateObservers(&call_stats_, &encoder_feedback_, &stats_proxy_, + &stats_proxy_, &stats_proxy_, &stats_proxy_, + frame_count_observer, &stats_proxy_, &stats_proxy_, + &send_delay_stats_), + &transport_controller_, &event_log_, &retransmission_rate_limiter_, + std::make_unique(time_controller_.GetClock()), + nullptr, CryptoOptions{}); } RtpVideoSenderTestFixture( const std::vector& ssrcs, @@ -164,14 +157,7 @@ class RtpVideoSenderTestFixture { payload_type, suspended_payload_states, /*frame_count_observer=*/nullptr) {} - ~RtpVideoSenderTestFixture() { - rtc::Event done; - transport_controller_.GetWorkerQueue()->PostTask([&]() { - router_.reset(); - done.Set(); - }); - done.Wait(rtc::Event::kForever); - } + RtpVideoSender* router() { return router_.get(); } MockTransport& transport() { return transport_; } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } diff --git a/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc b/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc index 8cf4d17a9f..2a8a224a81 100644 --- a/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc +++ b/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc @@ -155,8 +155,8 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) { ret_net->UpdateConfig( [](NetworkSimulationConfig* c) { c->delay = TimeDelta::ms(200); }); - s.RunFor(TimeDelta::seconds(35)); - EXPECT_NEAR(client->send_bandwidth().kbps(), 180, 50); + s.RunFor(TimeDelta::seconds(40)); + EXPECT_NEAR(client->send_bandwidth().kbps(), 200, 40); } } // namespace test diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc index df52ef1b2a..b070b0e23a 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc @@ -73,6 +73,7 @@ TransportFeedbackAdapter::~TransportFeedbackAdapter() { void TransportFeedbackAdapter::RegisterStreamFeedbackObserver( std::vector ssrcs, StreamFeedbackObserver* observer) { + rtc::CritScope cs(&observers_lock_); RTC_DCHECK(observer); RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { return pair.second == observer; @@ -82,6 +83,7 @@ void TransportFeedbackAdapter::RegisterStreamFeedbackObserver( void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver( StreamFeedbackObserver* observer) { + rtc::CritScope cs(&observers_lock_); RTC_DCHECK(observer); const auto it = absl::c_find_if( observers_, [=](const auto& pair) { return pair.second == observer; }); @@ -92,31 +94,35 @@ void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver( void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, size_t overhead_bytes, Timestamp creation_time) { - PacketFeedback packet; - packet.creation_time = creation_time; - packet.sent.sequence_number = - seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number); - packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes); - packet.local_net_id = local_net_id_; - packet.remote_net_id = remote_net_id_; - packet.sent.pacing_info = packet_info.pacing_info; - if (packet_info.has_rtp_sequence_number) { - packet.ssrc = packet_info.ssrc; - packet.rtp_sequence_number = packet_info.rtp_sequence_number; - } + { + rtc::CritScope cs(&lock_); + PacketFeedback packet; + packet.creation_time = creation_time; + packet.sent.sequence_number = + seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number); + packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes); + packet.local_net_id = local_net_id_; + packet.remote_net_id = remote_net_id_; + packet.sent.pacing_info = packet_info.pacing_info; + if (packet_info.has_rtp_sequence_number) { + packet.ssrc = packet_info.ssrc; + packet.rtp_sequence_number = packet_info.rtp_sequence_number; + } - while (!history_.empty() && - creation_time - history_.begin()->second.creation_time > - kSendTimeHistoryWindow) { - // TODO(sprang): Warn if erasing (too many) old items? - if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_) - in_flight_.RemoveInFlightPacketBytes(history_.begin()->second); - history_.erase(history_.begin()); + while (!history_.empty() && + creation_time - history_.begin()->second.creation_time > + kSendTimeHistoryWindow) { + // TODO(sprang): Warn if erasing (too many) old items? + if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_) + in_flight_.RemoveInFlightPacketBytes(history_.begin()->second); + history_.erase(history_.begin()); + } + history_.insert(std::make_pair(packet.sent.sequence_number, packet)); } - history_.insert(std::make_pair(packet.sent.sequence_number, packet)); } absl::optional TransportFeedbackAdapter::ProcessSentPacket( const rtc::SentPacket& sent_packet) { + rtc::CritScope cs(&lock_); auto send_time = Timestamp::ms(sent_packet.send_time_ms); // TODO(srte): Only use one way to indicate that packet feedback is used. if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) { @@ -165,37 +171,41 @@ TransportFeedbackAdapter::ProcessTransportFeedback( std::vector feedback_vector; TransportPacketsFeedback msg; msg.feedback_time = feedback_receive_time; - msg.prior_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); - feedback_vector = - ProcessTransportFeedbackInner(feedback, feedback_receive_time); - if (feedback_vector.empty()) - return absl::nullopt; + { + rtc::CritScope cs(&lock_); + msg.prior_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + feedback_vector = + ProcessTransportFeedbackInner(feedback, feedback_receive_time); + if (feedback_vector.empty()) + return absl::nullopt; - for (const PacketFeedback& fb : feedback_vector) { - PacketResult res; - res.sent_packet = fb.sent; - res.receive_time = fb.receive_time; - msg.packet_feedbacks.push_back(res); + for (const PacketFeedback& fb : feedback_vector) { + PacketResult res; + res.sent_packet = fb.sent; + res.receive_time = fb.receive_time; + msg.packet_feedbacks.push_back(res); + } + auto it = history_.find(last_ack_seq_num_); + if (it != history_.end()) { + msg.first_unacked_send_time = it->second.sent.send_time; + } + msg.data_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); } - auto it = history_.find(last_ack_seq_num_); - if (it != history_.end()) { - msg.first_unacked_send_time = it->second.sent.send_time; - } - msg.data_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); - SignalObservers(feedback_vector); return msg; } void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id, uint16_t remote_id) { + rtc::CritScope cs(&lock_); local_net_id_ = local_id; remote_net_id_ = remote_id; } DataSize TransportFeedbackAdapter::GetOutstandingData() const { + rtc::CritScope cs(&lock_); return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); } @@ -280,6 +290,7 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( void TransportFeedbackAdapter::SignalObservers( const std::vector& feedback_vector) { + rtc::CritScope cs(&observers_lock_); for (auto& observer : observers_) { std::vector selected_feedback; for (const auto& packet : feedback_vector) { diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.h b/modules/congestion_controller/rtp/transport_feedback_adapter.h index 392e15c8fa..699c6ed489 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.h +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.h @@ -87,33 +87,36 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider { std::vector ProcessTransportFeedbackInner( const rtcp::TransportFeedback& feedback, - Timestamp feedback_time); + Timestamp feedback_time) RTC_RUN_ON(&lock_); void SignalObservers( const std::vector& packet_feedback_vector); - DataSize pending_untracked_size_ = DataSize::Zero(); - Timestamp last_send_time_ = Timestamp::MinusInfinity(); - Timestamp last_untracked_send_time_ = Timestamp::MinusInfinity(); - SequenceNumberUnwrapper seq_num_unwrapper_; - std::map history_; + rtc::CriticalSection lock_; + DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero(); + Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); + Timestamp last_untracked_send_time_ RTC_GUARDED_BY(&lock_) = + Timestamp::MinusInfinity(); + SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); + std::map history_ RTC_GUARDED_BY(&lock_); // Sequence numbers are never negative, using -1 as it always < a real // sequence number. - int64_t last_ack_seq_num_ = -1; - InFlightBytesTracker in_flight_; + int64_t last_ack_seq_num_ RTC_GUARDED_BY(&lock_) = -1; + InFlightBytesTracker in_flight_ RTC_GUARDED_BY(&lock_); - Timestamp current_offset_ = Timestamp::MinusInfinity(); - TimeDelta last_timestamp_ = TimeDelta::MinusInfinity(); + Timestamp current_offset_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); + TimeDelta last_timestamp_ RTC_GUARDED_BY(&lock_) = TimeDelta::MinusInfinity(); - uint16_t local_net_id_ = 0; - uint16_t remote_net_id_ = 0; + uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0; + uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0; + rtc::CriticalSection observers_lock_; // Maps a set of ssrcs to corresponding observer. Vectors are used rather than // set/map to ensure that the processing order is consistent independently of // the randomized ssrcs. std::vector, StreamFeedbackObserver*>> - observers_; + observers_ RTC_GUARDED_BY(&observers_lock_); }; } // namespace webrtc