From 9e117c5e1b279aa2920786351eaf72b4f07ce295 Mon Sep 17 00:00:00 2001 From: stefan Date: Wed, 16 Aug 2017 08:16:25 -0700 Subject: [PATCH] Reland of Add functionality which limits the number of bytes on the network. (patchset #1 id:1 of https://codereview.webrtc.org/3001653002/ ) Reason for revert: Reland Original issue's description: > Revert of Add functionality which limits the number of bytes on the network. (patchset #26 id:500001 of https://codereview.webrtc.org/2918323002/ ) > > Reason for revert: > Speculative revert to see if this caused regressions in android perf tests. > > Original issue's description: > > Add functionality which limits the number of bytes on the network. > > > > The limit is based on the bandwidth delay product, but also adds some additional slack to compensate for the sawtooth-like BWE pattern and the slowness of the encoder rate control. The delay is estimated based on the time from sending a packet until an ack is received. Since acks are received in bursts (feedback is only sent periodically), a min filter is used to estimate the rtt. > > > > Whenever the in flight bytes reaches the congestion window, the pacer is paused, which in turn will result in send-side queues growing. Eventually the encoders will be paused as the pacer queue grows large (currently 2 seconds). > > > > BUG=webrtc:7926 > > > > Review-Url: https://codereview.webrtc.org/2918323002 > > Cr-Commit-Position: refs/heads/master@{#19289} > > Committed: https://chromium.googlesource.com/external/webrtc/+/8497fdde43d920ab1f0cc90362534e5493d23abe > > TBR=terelius@webrtc.org,philipel@webrtc.org,tschumim@webrtc.org,gnish@webrtc.org > # Not skipping CQ checks because original CL landed more than 1 days ago. > BUG=webrtc:7926 > > Review-Url: https://codereview.webrtc.org/3001653002 > Cr-Commit-Position: refs/heads/master@{#19339} > Committed: https://chromium.googlesource.com/external/webrtc/+/64136af364d1fecada49e35b1bfa39ef2641d5d0 TBR=terelius@webrtc.org,philipel@webrtc.org,tschumim@webrtc.org,gnish@webrtc.org # Not skipping CQ checks because original CL landed more than 1 days ago. BUG=webrtc:7926 Review-Url: https://codereview.webrtc.org/2994343002 Cr-Commit-Position: refs/heads/master@{#19373} --- webrtc/call/call.cc | 21 +++-- .../include/send_side_congestion_controller.h | 7 ++ .../send_side_congestion_controller.cc | 63 ++++++++++++- .../transport_feedback_adapter.cc | 31 ++++++- .../transport_feedback_adapter.h | 6 ++ webrtc/modules/pacing/paced_sender.cc | 40 +++++--- .../modules/pacing/paced_sender_unittest.cc | 20 ++-- .../include/send_time_history.h | 4 + .../send_time_history.cc | 20 ++++ webrtc/video/end_to_end_tests.cc | 93 +++++++++++++++++-- 10 files changed, 268 insertions(+), 37 deletions(-) diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc index 61ed66cf43..ff5ebca0e2 100644 --- a/webrtc/call/call.cc +++ b/webrtc/call/call.cc @@ -435,17 +435,20 @@ Call::Call(const Call::Config& config, call_stats_->RegisterStatsObserver(&receive_side_cc_); call_stats_->RegisterStatsObserver(transport_send_->send_side_cc()); - module_process_thread_->Start(); - module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE); - module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE); - module_process_thread_->RegisterModule(transport_send_->send_side_cc(), - RTC_FROM_HERE); + // We have to attach the pacer to the pacer thread before starting the + // module process thread to avoid a race accessing the process thread + // both from the process thread and the pacer thread. pacer_thread_->RegisterModule(transport_send_->send_side_cc()->pacer(), RTC_FROM_HERE); pacer_thread_->RegisterModule( receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE); - pacer_thread_->Start(); + + module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE); + module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE); + module_process_thread_->RegisterModule(transport_send_->send_side_cc(), + RTC_FROM_HERE); + module_process_thread_->Start(); } Call::~Call() { @@ -457,11 +460,15 @@ Call::~Call() { RTC_CHECK(audio_receive_streams_.empty()); RTC_CHECK(video_receive_streams_.empty()); + // The send-side congestion controller must be de-registered prior to + // the pacer thread being stopped to avoid a race when accessing the + // pacer thread object on the module process thread at the same time as + // the pacer thread is stopped. + module_process_thread_->DeRegisterModule(transport_send_->send_side_cc()); pacer_thread_->Stop(); pacer_thread_->DeRegisterModule(transport_send_->send_side_cc()->pacer()); pacer_thread_->DeRegisterModule( receive_side_cc_.GetRemoteBitrateEstimator(true)); - module_process_thread_->DeRegisterModule(transport_send_->send_side_cc()); module_process_thread_->DeRegisterModule(&receive_side_cc_); module_process_thread_->DeRegisterModule(call_stats_.get()); module_process_thread_->Stop(); diff --git a/webrtc/modules/congestion_controller/include/send_side_congestion_controller.h b/webrtc/modules/congestion_controller/include/send_side_congestion_controller.h index 6bebf238ff..68213507a6 100644 --- a/webrtc/modules/congestion_controller/include/send_side_congestion_controller.h +++ b/webrtc/modules/congestion_controller/include/send_side_congestion_controller.h @@ -136,6 +136,7 @@ class SendSideCongestionController : public CallStatsObserver, bool HasNetworkParametersToReportChanged(uint32_t bitrate_bps, uint8_t fraction_loss, int64_t rtt); + void LimitOutstandingBytes(size_t num_outstanding_bytes); const Clock* const clock_; rtc::CriticalSection observer_lock_; Observer* observer_ GUARDED_BY(observer_lock_); @@ -151,9 +152,15 @@ class SendSideCongestionController : public CallStatsObserver, uint8_t last_reported_fraction_loss_ GUARDED_BY(network_state_lock_); int64_t last_reported_rtt_ GUARDED_BY(network_state_lock_); NetworkState network_state_ GUARDED_BY(network_state_lock_); + bool pause_pacer_ GUARDED_BY(network_state_lock_); + // Duplicate the pacer paused state to avoid grabbing a lock when + // pausing the pacer. This can be removed when we move this class + // over to the task queue. + bool pacer_paused_; rtc::CriticalSection bwe_lock_; int min_bitrate_bps_ GUARDED_BY(bwe_lock_); std::unique_ptr delay_based_bwe_ GUARDED_BY(bwe_lock_); + const bool in_cwnd_experiment_; bool was_in_alr_; rtc::RaceChecker worker_race_; diff --git a/webrtc/modules/congestion_controller/send_side_congestion_controller.cc b/webrtc/modules/congestion_controller/send_side_congestion_controller.cc index df9d762aa8..751b541517 100644 --- a/webrtc/modules/congestion_controller/send_side_congestion_controller.cc +++ b/webrtc/modules/congestion_controller/send_side_congestion_controller.cc @@ -25,10 +25,20 @@ #include "webrtc/rtc_base/rate_limiter.h" #include "webrtc/rtc_base/socket.h" #include "webrtc/rtc_base/timeutils.h" +#include "webrtc/system_wrappers/include/field_trial.h" namespace webrtc { namespace { +const char kCwndExperiment[] = "WebRTC-CwndExperiment"; + +bool CwndExperimentEnabled() { + std::string experiment_string = + webrtc::field_trial::FindFullName(kCwndExperiment); + // The experiment is enabled iff the field trial string begins with "Enabled". + return experiment_string.find("Enabled") == 0; +} + static const int64_t kRetransmitWindowSizeMs = 500; // Makes sure that the bitrate and the min, max values are in valid range. @@ -100,8 +110,11 @@ SendSideCongestionController::SendSideCongestionController( last_reported_fraction_loss_(0), last_reported_rtt_(0), network_state_(kNetworkUp), + pause_pacer_(false), + pacer_paused_(false), min_bitrate_bps_(congestion_controller::GetMinBitrateBps()), delay_based_bwe_(new DelayBasedBwe(event_log_, clock_)), + in_cwnd_experiment_(CwndExperimentEnabled()), was_in_alr_(0) { delay_based_bwe_->SetMinBitrate(min_bitrate_bps_); } @@ -219,13 +232,9 @@ SendSideCongestionController::GetTransportFeedbackObserver() { void SendSideCongestionController::SignalNetworkState(NetworkState state) { LOG(LS_INFO) << "SignalNetworkState " << (state == kNetworkUp ? "Up" : "Down"); - if (state == kNetworkUp) { - pacer_->Resume(); - } else { - pacer_->Pause(); - } { rtc::CritScope cs(&network_state_lock_); + pause_pacer_ = state == kNetworkDown; network_state_ = state; } probe_controller_->OnNetworkStateChanged(state); @@ -246,6 +255,7 @@ void SendSideCongestionController::OnSentPacket( return; transport_feedback_adapter_.OnSentPacket(sent_packet.packet_id, sent_packet.send_time_ms); + LimitOutstandingBytes(transport_feedback_adapter_.GetOutstandingBytes()); } void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, @@ -259,6 +269,20 @@ int64_t SendSideCongestionController::TimeUntilNextProcess() { } void SendSideCongestionController::Process() { + bool pause_pacer; + // TODO(holmer): Once this class is running on a task queue we should + // replace this with a task instead. + { + rtc::CritScope lock(&network_state_lock_); + pause_pacer = pause_pacer_; + } + if (pause_pacer && !pacer_paused_) { + pacer_->Pause(); + pacer_paused_ = true; + } else if (!pause_pacer && pacer_paused_) { + pacer_->Resume(); + pacer_paused_ = false; + } bitrate_controller_->Process(); probe_controller_->Process(); MaybeTriggerOnNetworkChanged(); @@ -305,6 +329,35 @@ void SendSideCongestionController::OnTransportFeedback( } if (result.recovered_from_overuse) probe_controller_->RequestProbe(); + LimitOutstandingBytes(transport_feedback_adapter_.GetOutstandingBytes()); +} + +void SendSideCongestionController::LimitOutstandingBytes( + size_t num_outstanding_bytes) { + if (!in_cwnd_experiment_) + return; + { + rtc::CritScope lock(&network_state_lock_); + rtc::Optional min_rtt_ms = + transport_feedback_adapter_.GetMinFeedbackLoopRtt(); + // No valid RTT. Could be because send-side BWE isn't used, in which case + // we don't try to limit the outstanding packets. + if (!min_rtt_ms) + return; + const int64_t kAcceptedQueueMs = 250; + const size_t kMinCwndBytes = 2 * 1500; + size_t max_outstanding_bytes = + std::max((*min_rtt_ms + kAcceptedQueueMs) * + last_reported_bitrate_bps_ / 1000 / 8, + kMinCwndBytes); + LOG(LS_INFO) << clock_->TimeInMilliseconds() + << " Outstanding bytes: " << num_outstanding_bytes + << " pacer queue: " << pacer_->QueueInMs() + << " max outstanding: " << max_outstanding_bytes; + LOG(LS_INFO) << "Feedback rtt: " << *min_rtt_ms + << " Bitrate: " << last_reported_bitrate_bps_; + pause_pacer_ = num_outstanding_bytes > max_outstanding_bytes; + } } std::vector diff --git a/webrtc/modules/congestion_controller/transport_feedback_adapter.cc b/webrtc/modules/congestion_controller/transport_feedback_adapter.cc index 7f3f744ecf..918b9c5b0e 100644 --- a/webrtc/modules/congestion_controller/transport_feedback_adapter.cc +++ b/webrtc/modules/congestion_controller/transport_feedback_adapter.cc @@ -10,6 +10,8 @@ #include "webrtc/modules/congestion_controller/transport_feedback_adapter.h" +#include + #include "webrtc/modules/congestion_controller/delay_based_bwe.h" #include "webrtc/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" #include "webrtc/rtc_base/checks.h" @@ -103,11 +105,12 @@ void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id, std::vector TransportFeedbackAdapter::GetPacketFeedbackVector( const rtcp::TransportFeedback& feedback) { int64_t timestamp_us = feedback.GetBaseTimeUs(); + int64_t now_ms = clock_->TimeInMilliseconds(); // Add timestamp deltas to a local time base selected on first packet arrival. // This won't be the true time base, but makes it easier to manually inspect // time stamps. if (last_timestamp_us_ == kNoTimestamp) { - current_offset_ms_ = clock_->TimeInMilliseconds(); + current_offset_ms_ = now_ms; } else { int64_t delta = timestamp_us - last_timestamp_us_; @@ -128,7 +131,7 @@ std::vector TransportFeedbackAdapter::GetPacketFeedbackVector( return packet_feedback_vector; } packet_feedback_vector.reserve(feedback.GetPacketStatusCount()); - + int64_t feedback_rtt = -1; { rtc::CritScope cs(&lock_); size_t failed_lookups = 0; @@ -158,6 +161,12 @@ std::vector TransportFeedbackAdapter::GetPacketFeedbackVector( ++failed_lookups; if (packet_feedback.local_net_id == local_net_id_ && packet_feedback.remote_net_id == remote_net_id_) { + if (packet_feedback.send_time_ms >= 0) { + int64_t rtt = now_ms - packet_feedback.send_time_ms; + // max() is used to account for feedback being delayed by the + // receiver. + feedback_rtt = std::max(rtt, feedback_rtt); + } packet_feedback_vector.push_back(packet_feedback); } @@ -169,6 +178,14 @@ std::vector TransportFeedbackAdapter::GetPacketFeedbackVector( << " packet" << (failed_lookups > 1 ? "s" : "") << ". Send time history too small?"; } + if (feedback_rtt > -1) { + feedback_rtts_.push_back(feedback_rtt); + const size_t kFeedbackRttWindow = 32; + if (feedback_rtts_.size() > kFeedbackRttWindow) + feedback_rtts_.pop_front(); + min_feedback_rtt_.emplace( + *std::min_element(feedback_rtts_.begin(), feedback_rtts_.end())); + } } return packet_feedback_vector; } @@ -188,4 +205,14 @@ std::vector TransportFeedbackAdapter::GetTransportFeedbackVector() const { return last_packet_feedback_vector_; } + +rtc::Optional TransportFeedbackAdapter::GetMinFeedbackLoopRtt() const { + rtc::CritScope cs(&lock_); + return min_feedback_rtt_; +} + +size_t TransportFeedbackAdapter::GetOutstandingBytes() const { + rtc::CritScope cs(&lock_); + return send_time_history_.GetOutstandingBytes(local_net_id_, remote_net_id_); +} } // namespace webrtc diff --git a/webrtc/modules/congestion_controller/transport_feedback_adapter.h b/webrtc/modules/congestion_controller/transport_feedback_adapter.h index 063b34dcfb..4fe70c5dcc 100644 --- a/webrtc/modules/congestion_controller/transport_feedback_adapter.h +++ b/webrtc/modules/congestion_controller/transport_feedback_adapter.h @@ -11,6 +11,7 @@ #ifndef WEBRTC_MODULES_CONGESTION_CONTROLLER_TRANSPORT_FEEDBACK_ADAPTER_H_ #define WEBRTC_MODULES_CONGESTION_CONTROLLER_TRANSPORT_FEEDBACK_ADAPTER_H_ +#include #include #include "webrtc/modules/remote_bitrate_estimator/include/send_time_history.h" @@ -46,11 +47,14 @@ class TransportFeedbackAdapter { // to the CongestionController interface. void OnTransportFeedback(const rtcp::TransportFeedback& feedback); std::vector GetTransportFeedbackVector() const; + rtc::Optional GetMinFeedbackLoopRtt() const; void SetTransportOverhead(int transport_overhead_bytes_per_packet); void SetNetworkIds(uint16_t local_id, uint16_t remote_id); + size_t GetOutstandingBytes() const; + private: std::vector GetPacketFeedbackVector( const rtcp::TransportFeedback& feedback); @@ -65,6 +69,8 @@ class TransportFeedbackAdapter { std::vector last_packet_feedback_vector_; uint16_t local_net_id_ GUARDED_BY(&lock_); uint16_t remote_net_id_ GUARDED_BY(&lock_); + std::deque feedback_rtts_ GUARDED_BY(&lock_); + rtc::Optional min_feedback_rtt_ GUARDED_BY(&lock_); rtc::CriticalSection observers_lock_; std::vector observers_ GUARDED_BY(&observers_lock_); diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index 2442bf0723..acfcd1fbff 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -29,6 +29,7 @@ namespace { // Time limit in milliseconds between packet bursts. const int64_t kMinPacketLimitMs = 5; +const int64_t kPausedPacketIntervalMs = 500; // Upper cap on process interval, in case process has not been called in a long // time. @@ -267,9 +268,10 @@ void PacedSender::CreateProbeCluster(int bitrate_bps) { } void PacedSender::Pause() { - LOG(LS_INFO) << "PacedSender paused."; { rtc::CritScope cs(&critsect_); + if (!paused_) + LOG(LS_INFO) << "PacedSender paused."; paused_ = true; packets_->SetPauseState(true, clock_->TimeInMilliseconds()); } @@ -280,9 +282,10 @@ void PacedSender::Pause() { } void PacedSender::Resume() { - LOG(LS_INFO) << "PacedSender resumed."; { rtc::CritScope cs(&critsect_); + if (paused_) + LOG(LS_INFO) << "PacedSender resumed."; paused_ = false; packets_->SetPauseState(false, clock_->TimeInMilliseconds()); } @@ -385,16 +388,18 @@ int64_t PacedSender::AverageQueueTimeMs() { int64_t PacedSender::TimeUntilNextProcess() { rtc::CritScope cs(&critsect_); + int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; + int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; + // When paused we wake up every 500 ms to send a padding packet to ensure + // we won't get stuck in the paused state due to no feedback being received. if (paused_) - return 1000 * 60 * 60; + return std::max(kPausedPacketIntervalMs - elapsed_time_ms, 0); if (prober_->IsProbing()) { int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); if (ret > 0 || (ret == 0 && !probing_send_failure_)) return ret; } - int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; - int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; return std::max(kMinPacketLimitMs - elapsed_time_ms, 0); } @@ -402,9 +407,21 @@ void PacedSender::Process() { int64_t now_us = clock_->TimeInMicroseconds(); rtc::CritScope cs(&critsect_); int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; - time_last_update_us_ = now_us; int target_bitrate_kbps = pacing_bitrate_kbps_; - if (!paused_ && elapsed_time_ms > 0) { + + if (paused_) { + PacedPacketInfo pacing_info; + time_last_update_us_ = now_us; + // We can not send padding unless a normal packet has first been sent. If we + // do, timestamps get messed up. + if (packet_counter_ == 0) + return; + size_t bytes_sent = SendPadding(1, pacing_info); + alr_detector_->OnBytesSent(bytes_sent, now_us / 1000); + return; + } + + if (elapsed_time_ms > 0) { size_t queue_size_bytes = packets_->SizeInBytes(); if (queue_size_bytes > 0) { // Assuming equal size packets and input/output rate, the average packet @@ -425,6 +442,8 @@ void PacedSender::Process() { UpdateBudgetWithElapsedTime(elapsed_time_ms); } + time_last_update_us_ = now_us; + bool is_probing = prober_->IsProbing(); PacedPacketInfo pacing_info; size_t bytes_sent = 0; @@ -454,14 +473,13 @@ void PacedSender::Process() { } } - if (packets_->Empty() && !paused_) { + if (packets_->Empty()) { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. if (packet_counter_ > 0) { int padding_needed = static_cast(is_probing ? (recommended_probe_size - bytes_sent) : padding_budget_->bytes_remaining()); - if (padding_needed > 0) bytes_sent += SendPadding(padding_needed, pacing_info); } @@ -481,8 +499,7 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { bool PacedSender::SendPacket(const paced_sender::Packet& packet, const PacedPacketInfo& pacing_info) { - if (paused_) - return false; + RTC_DCHECK(!paused_); if (media_budget_->bytes_remaining() == 0 && pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) { return false; @@ -512,6 +529,7 @@ bool PacedSender::SendPacket(const paced_sender::Packet& packet, size_t PacedSender::SendPadding(size_t padding_needed, const PacedPacketInfo& pacing_info) { + RTC_DCHECK_GT(packet_counter_, 0); critsect_.Leave(); size_t bytes_sent = packet_sender_->TimeToSendPadding(padding_needed, pacing_info); diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index 0b2ac1c987..8a3cee0991 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -651,12 +651,20 @@ TEST_F(PacedSenderTest, Pause) { EXPECT_EQ(second_capture_time_ms - capture_time_ms, send_bucket_->QueueInMs()); - for (int i = 0; i < 10; ++i) { - clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + send_bucket_->Process(); + + int64_t expected_time_until_send = 500; + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + while (expected_time_until_send >= 0) { // TimeUntilNextProcess must not return 0 when paused. If it does, // we risk running a busy loop, so ideally it should return a large value. - EXPECT_GE(send_bucket_->TimeUntilNextProcess(), 1000); - send_bucket_->Process(); + EXPECT_EQ(expected_time_until_send, send_bucket_->TimeUntilNextProcess()); + if (expected_time_until_send == 0) + send_bucket_->Process(); + clock_.AdvanceTimeMilliseconds(5); + expected_time_until_send -= 5; } // Expect high prio packets to come out first followed by normal @@ -699,10 +707,10 @@ TEST_F(PacedSenderTest, Pause) { send_bucket_->Resume(); for (size_t i = 0; i < 4; i++) { - EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); - clock_.AdvanceTimeMilliseconds(5); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); send_bucket_->Process(); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); } EXPECT_EQ(0, send_bucket_->QueueInMs()); diff --git a/webrtc/modules/remote_bitrate_estimator/include/send_time_history.h b/webrtc/modules/remote_bitrate_estimator/include/send_time_history.h index 026670acbf..bc3b8ccac0 100644 --- a/webrtc/modules/remote_bitrate_estimator/include/send_time_history.h +++ b/webrtc/modules/remote_bitrate_estimator/include/send_time_history.h @@ -38,11 +38,15 @@ class SendTimeHistory { // thus be non-null and have the sequence_number field set. bool GetFeedback(PacketFeedback* packet_feedback, bool remove); + size_t GetOutstandingBytes(uint16_t local_net_id, + uint16_t remote_net_id) const; + private: const Clock* const clock_; const int64_t packet_age_limit_ms_; SequenceNumberUnwrapper seq_num_unwrapper_; std::map history_; + rtc::Optional latest_acked_seq_num_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SendTimeHistory); }; diff --git a/webrtc/modules/remote_bitrate_estimator/send_time_history.cc b/webrtc/modules/remote_bitrate_estimator/send_time_history.cc index 734a920713..4cc7a7c1c2 100644 --- a/webrtc/modules/remote_bitrate_estimator/send_time_history.cc +++ b/webrtc/modules/remote_bitrate_estimator/send_time_history.cc @@ -52,6 +52,9 @@ bool SendTimeHistory::GetFeedback(PacketFeedback* packet_feedback, RTC_DCHECK(packet_feedback); int64_t unwrapped_seq_num = seq_num_unwrapper_.Unwrap(packet_feedback->sequence_number); + latest_acked_seq_num_.emplace( + std::max(unwrapped_seq_num, latest_acked_seq_num_.value_or(0))); + RTC_DCHECK_GE(*latest_acked_seq_num_, 0); auto it = history_.find(unwrapped_seq_num); if (it == history_.end()) return false; @@ -66,4 +69,21 @@ bool SendTimeHistory::GetFeedback(PacketFeedback* packet_feedback, return true; } +size_t SendTimeHistory::GetOutstandingBytes(uint16_t local_net_id, + uint16_t remote_net_id) const { + size_t outstanding_bytes = 0; + auto unacked_it = history_.begin(); + if (latest_acked_seq_num_) { + unacked_it = history_.lower_bound(*latest_acked_seq_num_); + } + for (; unacked_it != history_.end(); ++unacked_it) { + if (unacked_it->second.local_net_id == local_net_id && + unacked_it->second.remote_net_id == remote_net_id && + unacked_it->second.send_time_ms >= 0) { + outstanding_bytes += unacked_it->second.payload_size; + } + } + return outstanding_bytes; +} + } // namespace webrtc diff --git a/webrtc/video/end_to_end_tests.cc b/webrtc/video/end_to_end_tests.cc index 69efd1b37b..15db046b80 100644 --- a/webrtc/video/end_to_end_tests.cc +++ b/webrtc/video/end_to_end_tests.cc @@ -1831,9 +1831,9 @@ TEST_F(EndToEndTest, AssignsTransportSequenceNumbers) { class TransportFeedbackTester : public test::EndToEndTest { public: - explicit TransportFeedbackTester(bool feedback_enabled, - size_t num_video_streams, - size_t num_audio_streams) + TransportFeedbackTester(bool feedback_enabled, + size_t num_video_streams, + size_t num_audio_streams) : EndToEndTest(::webrtc::EndToEndTest::kDefaultTimeoutMs), feedback_enabled_(feedback_enabled), num_video_streams_(num_video_streams), @@ -1928,6 +1928,80 @@ TEST_F(EndToEndTest, AudioVideoReceivesTransportFeedback) { RunBaseTest(&test); } +TEST_F(EndToEndTest, StopsSendingMediaWithoutFeedback) { + test::ScopedFieldTrials override_field_trials( + "WebRTC-CwndExperiment/Enabled/"); + + class TransportFeedbackTester : public test::EndToEndTest { + public: + TransportFeedbackTester(size_t num_video_streams, size_t num_audio_streams) + : EndToEndTest(::webrtc::EndToEndTest::kDefaultTimeoutMs), + num_video_streams_(num_video_streams), + num_audio_streams_(num_audio_streams), + media_sent_(0), + padding_sent_(0) { + // Only one stream of each supported for now. + EXPECT_LE(num_video_streams, 1u); + EXPECT_LE(num_audio_streams, 1u); + } + + protected: + Action OnSendRtp(const uint8_t* packet, size_t length) override { + RTPHeader header; + EXPECT_TRUE(parser_->Parse(packet, length, &header)); + const bool only_padding = + header.headerLength + header.paddingLength == length; + rtc::CritScope lock(&crit_); + if (only_padding) { + ++padding_sent_; + } else { + ++media_sent_; + EXPECT_LT(media_sent_, 40) << "Media sent without feedback."; + } + + return SEND_PACKET; + } + + Action OnReceiveRtcp(const uint8_t* data, size_t length) override { + rtc::CritScope lock(&crit_); + if (media_sent_ > 20 && HasTransportFeedback(data, length)) { + return DROP_PACKET; + } + return SEND_PACKET; + } + + bool HasTransportFeedback(const uint8_t* data, size_t length) const { + test::RtcpPacketParser parser; + EXPECT_TRUE(parser.Parse(data, length)); + return parser.transport_feedback()->num_packets() > 0; + } + + Call::Config GetSenderCallConfig() override { + Call::Config config = EndToEndTest::GetSenderCallConfig(); + config.bitrate_config.max_bitrate_bps = 300000; + return config; + } + + void PerformTest() override { + const int64_t kDisabledFeedbackTimeoutMs = 10000; + observation_complete_.Wait(kDisabledFeedbackTimeoutMs); + rtc::CritScope lock(&crit_); + EXPECT_GT(padding_sent_, 0); + } + + size_t GetNumVideoStreams() const override { return num_video_streams_; } + size_t GetNumAudioStreams() const override { return num_audio_streams_; } + + private: + const size_t num_video_streams_; + const size_t num_audio_streams_; + rtc::CriticalSection crit_; + int media_sent_ GUARDED_BY(crit_); + int padding_sent_ GUARDED_BY(crit_); + } test(1, 0); + RunBaseTest(&test); +} + TEST_F(EndToEndTest, ObserversEncodedFrames) { class EncodedFrameTestObserver : public EncodedFrameObserver { public: @@ -2410,8 +2484,8 @@ TEST_F(EndToEndTest, TriggerMidCallProbing) { if (success) return; } - RTC_DCHECK(success) << "Failed to perform mid call probing (" << kMaxAttempts - << " attempts)."; + EXPECT_TRUE(success) << "Failed to perform mid call probing (" << kMaxAttempts + << " attempts)."; } TEST_F(EndToEndTest, VerifyNackStats) { @@ -4196,12 +4270,17 @@ TEST_F(EndToEndTest, RespectsNetworkState) { receiver_call_(nullptr), sender_state_(kNetworkUp), sender_rtp_(0), + sender_padding_(0), sender_rtcp_(0), receiver_rtcp_(0), down_frames_(0) {} Action OnSendRtp(const uint8_t* packet, size_t length) override { rtc::CritScope lock(&test_crit_); + RTPHeader header; + EXPECT_TRUE(parser_->Parse(packet, length, &header)); + if (length == header.headerLength + header.paddingLength) + ++sender_padding_; ++sender_rtp_; packet_event_.Set(); return SEND_PACKET; @@ -4326,7 +4405,8 @@ TEST_F(EndToEndTest, RespectsNetworkState) { int64_t time_now_ms = clock_->TimeInMilliseconds(); rtc::CritScope lock(&test_crit_); if (sender_down) { - ASSERT_LE(sender_rtp_ - initial_sender_rtp, kNumAcceptedDowntimeRtp) + ASSERT_LE(sender_rtp_ - initial_sender_rtp - sender_padding_, + kNumAcceptedDowntimeRtp) << "RTP sent during sender-side downtime."; ASSERT_LE(sender_rtcp_ - initial_sender_rtcp, kNumAcceptedDowntimeRtcp) @@ -4361,6 +4441,7 @@ TEST_F(EndToEndTest, RespectsNetworkState) { Call* receiver_call_; NetworkState sender_state_ GUARDED_BY(test_crit_); int sender_rtp_ GUARDED_BY(test_crit_); + int sender_padding_ GUARDED_BY(test_crit_); int sender_rtcp_ GUARDED_BY(test_crit_); int receiver_rtcp_ GUARDED_BY(test_crit_); int down_frames_ GUARDED_BY(test_crit_);