diff --git a/call/call.cc b/call/call.cc index 2afabf33ac..a1d1ec0965 100644 --- a/call/call.cc +++ b/call/call.cc @@ -420,7 +420,7 @@ Call::Call(const Call::Config& config, : clock_(Clock::GetRealTimeClock()), num_cpu_cores_(CpuInfo::DetectNumberOfCores()), module_process_thread_(ProcessThread::Create("ModuleProcessThread")), - call_stats_(new CallStats(clock_)), + call_stats_(new CallStats(clock_, module_process_thread_.get())), bitrate_allocator_(new BitrateAllocator(this)), config_(config), audio_network_state_(kNetworkDown), @@ -592,8 +592,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( AudioSendStream* send_stream = new AudioSendStream( config, config_.audio_state, &worker_queue_, module_process_thread_.get(), transport_send_.get(), bitrate_allocator_.get(), event_log_, - call_stats_->rtcp_rtt_stats(), suspended_rtp_state, - &sent_rtp_audio_timer_ms_); + call_stats_.get(), suspended_rtp_state, &sent_rtp_audio_timer_ms_); { WriteLockScoped write_lock(*send_crit_); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == @@ -872,7 +871,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( // this locked scope. receive_stream = new FlexfecReceiveStreamImpl( &video_receiver_controller_, config, recovered_packet_receiver, - call_stats_->rtcp_rtt_stats(), module_process_thread_.get()); + call_stats_.get(), module_process_thread_.get()); RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == receive_rtp_config_.end()); @@ -933,7 +932,7 @@ Call::Stats Call::GetStats() const { aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0; } - stats.rtt_ms = call_stats_->rtcp_rtt_stats()->LastProcessedRtt(); + stats.rtt_ms = call_stats_->LastProcessedRtt(); { rtc::CritScope cs(&bitrate_crit_); stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_; diff --git a/video/BUILD.gn b/video/BUILD.gn index f3d2c6d80f..49e1865051 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -372,6 +372,7 @@ if (rtc_include_tests) { "../rtc_base:rtc_base_approved", "../rtc_base:rtc_base_tests_utils", "../rtc_base:rtc_numerics", + "../rtc_base:rtc_task_queue", "../rtc_base/experiments:alr_experiment", "../system_wrappers", "../system_wrappers:field_trial_default", diff --git a/video/call_stats.cc b/video/call_stats.cc index c9f019929f..8948e68e5a 100644 --- a/video/call_stats.cc +++ b/video/call_stats.cc @@ -12,178 +12,210 @@ #include -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" #include "rtc_base/constructormagic.h" +#include "rtc_base/location.h" +#include "rtc_base/logging.h" +#include "rtc_base/task_queue.h" #include "system_wrappers/include/metrics.h" namespace webrtc { namespace { -// Time interval for updating the observers. -const int64_t kUpdateIntervalMs = 1000; -// Weight factor to apply to the average rtt. -const float kWeightFactor = 0.3f; void RemoveOldReports(int64_t now, std::list* reports) { - // A rtt report is considered valid for this long. - const int64_t kRttTimeoutMs = 1500; - while (!reports->empty() && - (now - reports->front().time) > kRttTimeoutMs) { - reports->pop_front(); - } + static constexpr const int64_t kRttTimeoutMs = 1500; + reports->remove_if( + [&now](CallStats::RttTime& r) { return now - r.time > kRttTimeoutMs; }); } -int64_t GetMaxRttMs(std::list* reports) { - if (reports->empty()) - return -1; - int64_t max_rtt_ms = 0; - for (const CallStats::RttTime& rtt_time : *reports) +int64_t GetMaxRttMs(const std::list& reports) { + int64_t max_rtt_ms = -1; + for (const CallStats::RttTime& rtt_time : reports) max_rtt_ms = std::max(rtt_time.rtt, max_rtt_ms); return max_rtt_ms; } -int64_t GetAvgRttMs(std::list* reports) { - if (reports->empty()) { - return -1; - } +int64_t GetAvgRttMs(const std::list& reports) { + RTC_DCHECK(!reports.empty()); int64_t sum = 0; - for (std::list::const_iterator it = reports->begin(); - it != reports->end(); ++it) { + for (std::list::const_iterator it = reports.begin(); + it != reports.end(); ++it) { sum += it->rtt; } - return sum / reports->size(); + return sum / reports.size(); } -void UpdateAvgRttMs(std::list* reports, int64_t* avg_rtt) { +int64_t GetNewAvgRttMs(const std::list& reports, + int64_t prev_avg_rtt) { + if (reports.empty()) + return -1; // Reset (invalid average). + int64_t cur_rtt_ms = GetAvgRttMs(reports); - if (cur_rtt_ms == -1) { - // Reset. - *avg_rtt = -1; - return; - } - if (*avg_rtt == -1) { - // Initialize. - *avg_rtt = cur_rtt_ms; - return; - } - *avg_rtt = *avg_rtt * (1.0f - kWeightFactor) + cur_rtt_ms * kWeightFactor; + if (prev_avg_rtt == -1) + return cur_rtt_ms; // New initial average value. + + // Weight factor to apply to the average rtt. + // We weigh the old average at 70% against the new average (30%). + constexpr const float kWeightFactor = 0.3f; + return prev_avg_rtt * (1.0f - kWeightFactor) + cur_rtt_ms * kWeightFactor; } -} // namespace -class RtcpObserver : public RtcpRttStats { +// This class is used to de-register a Module from a ProcessThread to satisfy +// threading requirements of the Module (CallStats). +// The guarantee offered by TemporaryDeregistration is that while its in scope, +// no calls to |TimeUntilNextProcess| or |Process()| will occur and therefore +// synchronization with those methods, is not necessary. +class TemporaryDeregistration { public: - explicit RtcpObserver(CallStats* owner) : owner_(owner) {} - virtual ~RtcpObserver() {} - - virtual void OnRttUpdate(int64_t rtt) { - owner_->OnRttUpdate(rtt); + TemporaryDeregistration(Module* module, + ProcessThread* process_thread, + bool thread_running) + : module_(module), + process_thread_(process_thread), + deregistered_(thread_running) { + if (thread_running) + process_thread_->DeRegisterModule(module_); } - - // Returns the average RTT. - virtual int64_t LastProcessedRtt() const { - return owner_->avg_rtt_ms(); + ~TemporaryDeregistration() { + if (deregistered_) + process_thread_->RegisterModule(module_, RTC_FROM_HERE); } private: - CallStats* owner_; - - RTC_DISALLOW_COPY_AND_ASSIGN(RtcpObserver); + Module* const module_; + ProcessThread* const process_thread_; + const bool deregistered_; }; -CallStats::CallStats(Clock* clock) +} // namespace + +CallStats::CallStats(Clock* clock, ProcessThread* process_thread) : clock_(clock), - rtcp_rtt_stats_(new RtcpObserver(this)), last_process_time_(clock_->TimeInMilliseconds()), max_rtt_ms_(-1), avg_rtt_ms_(-1), sum_avg_rtt_ms_(0), num_avg_rtt_(0), - time_of_first_rtt_ms_(-1) {} + time_of_first_rtt_ms_(-1), + process_thread_(process_thread), + process_thread_running_(false) { + RTC_DCHECK(process_thread_); + process_thread_checker_.DetachFromThread(); +} CallStats::~CallStats() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!process_thread_running_); RTC_DCHECK(observers_.empty()); + UpdateHistograms(); } int64_t CallStats::TimeUntilNextProcess() { + RTC_DCHECK_RUN_ON(&process_thread_checker_); return last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds(); } void CallStats::Process() { - rtc::CritScope cs(&crit_); + RTC_DCHECK_RUN_ON(&process_thread_checker_); int64_t now = clock_->TimeInMilliseconds(); - if (now < last_process_time_ + kUpdateIntervalMs) - return; - last_process_time_ = now; + int64_t avg_rtt_ms = avg_rtt_ms_; RemoveOldReports(now, &reports_); - max_rtt_ms_ = GetMaxRttMs(&reports_); - UpdateAvgRttMs(&reports_, &avg_rtt_ms_); + max_rtt_ms_ = GetMaxRttMs(reports_); + avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms); + { + rtc::CritScope lock(&avg_rtt_ms_lock_); + avg_rtt_ms_ = avg_rtt_ms; + } // If there is a valid rtt, update all observers with the max rtt. if (max_rtt_ms_ >= 0) { - RTC_DCHECK_GE(avg_rtt_ms_, 0); - for (std::list::iterator it = observers_.begin(); - it != observers_.end(); ++it) { - (*it)->OnRttUpdate(avg_rtt_ms_, max_rtt_ms_); - } + RTC_DCHECK_GE(avg_rtt_ms, 0); + for (CallStatsObserver* observer : observers_) + observer->OnRttUpdate(avg_rtt_ms, max_rtt_ms_); // Sum for Histogram of average RTT reported over the entire call. - sum_avg_rtt_ms_ += avg_rtt_ms_; + sum_avg_rtt_ms_ += avg_rtt_ms; ++num_avg_rtt_; } } -int64_t CallStats::avg_rtt_ms() const { - rtc::CritScope cs(&crit_); - return avg_rtt_ms_; -} +void CallStats::ProcessThreadAttached(ProcessThread* process_thread) { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!process_thread || process_thread_ == process_thread); + process_thread_running_ = process_thread != nullptr; -RtcpRttStats* CallStats::rtcp_rtt_stats() const { - return rtcp_rtt_stats_.get(); + // Whether we just got attached or detached, we clear the + // |process_thread_checker_| so that it can be used to protect variables + // in either the process thread when it starts again, or UpdateHistograms() + // (mutually exclusive). + process_thread_checker_.DetachFromThread(); } void CallStats::RegisterStatsObserver(CallStatsObserver* observer) { - rtc::CritScope cs(&crit_); - for (std::list::iterator it = observers_.begin(); - it != observers_.end(); ++it) { - if (*it == observer) - return; - } - observers_.push_back(observer); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + TemporaryDeregistration deregister(this, process_thread_, + process_thread_running_); + + auto it = std::find(observers_.begin(), observers_.end(), observer); + if (it == observers_.end()) + observers_.push_back(observer); } void CallStats::DeregisterStatsObserver(CallStatsObserver* observer) { - rtc::CritScope cs(&crit_); - for (std::list::iterator it = observers_.begin(); - it != observers_.end(); ++it) { - if (*it == observer) { - observers_.erase(it); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + TemporaryDeregistration deregister(this, process_thread_, + process_thread_running_); + observers_.remove(observer); +} + +int64_t CallStats::LastProcessedRtt() const { + rtc::CritScope cs(&avg_rtt_ms_lock_); + return avg_rtt_ms_; +} + +void CallStats::OnRttUpdate(int64_t rtt) { + int64_t now_ms = clock_->TimeInMilliseconds(); + process_thread_->PostTask(rtc::NewClosure([rtt, now_ms, this]() { + RTC_DCHECK_RUN_ON(&process_thread_checker_); + reports_.push_back(RttTime(rtt, now_ms)); + if (time_of_first_rtt_ms_ == -1) + time_of_first_rtt_ms_ = now_ms; + + process_thread_->WakeUp(this); + })); +} + +void CallStats::UpdateHistograms() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!process_thread_running_); + + // The extra scope is because we have two 'dcheck run on' thread checkers. + // This is a special case since it's safe to access variables on the current + // thread that normally are only touched on the process thread. + // Since we're not attached to the process thread and/or the process thread + // isn't running, it's OK to touch these variables here. + { + // This method is called on the ctor thread (usually from the dtor, unless + // a test calls it). It's a requirement that the function be called when + // the process thread is not running (a condition that's met at destruction + // time), and thanks to that, we don't need a lock to synchronize against + // it. + RTC_DCHECK_RUN_ON(&process_thread_checker_); + + if (time_of_first_rtt_ms_ == -1 || num_avg_rtt_ < 1) return; + + int64_t elapsed_sec = + (clock_->TimeInMilliseconds() - time_of_first_rtt_ms_) / 1000; + if (elapsed_sec >= metrics::kMinRunTimeInSeconds) { + int64_t avg_rtt_ms = (sum_avg_rtt_ms_ + num_avg_rtt_ / 2) / num_avg_rtt_; + RTC_HISTOGRAM_COUNTS_10000( + "WebRTC.Video.AverageRoundTripTimeInMilliseconds", avg_rtt_ms); } } } -void CallStats::OnRttUpdate(int64_t rtt) { - rtc::CritScope cs(&crit_); - int64_t now_ms = clock_->TimeInMilliseconds(); - reports_.push_back(RttTime(rtt, now_ms)); - if (time_of_first_rtt_ms_ == -1) - time_of_first_rtt_ms_ = now_ms; -} - -void CallStats::UpdateHistograms() { - rtc::CritScope cs(&crit_); - if (time_of_first_rtt_ms_ == -1 || num_avg_rtt_ < 1) - return; - - int64_t elapsed_sec = - (clock_->TimeInMilliseconds() - time_of_first_rtt_ms_) / 1000; - if (elapsed_sec >= metrics::kMinRunTimeInSeconds) { - int64_t avg_rtt_ms = (sum_avg_rtt_ms_ + num_avg_rtt_ / 2) / num_avg_rtt_; - RTC_HISTOGRAM_COUNTS_10000( - "WebRTC.Video.AverageRoundTripTimeInMilliseconds", avg_rtt_ms); - } -} - } // namespace webrtc diff --git a/video/call_stats.h b/video/call_stats.h index af5c45c996..5ca44faba5 100644 --- a/video/call_stats.h +++ b/video/call_stats.h @@ -15,35 +15,42 @@ #include #include "modules/include/module.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "rtc_base/constructormagic.h" #include "rtc_base/criticalsection.h" +#include "rtc_base/thread_checker.h" #include "system_wrappers/include/clock.h" namespace webrtc { class CallStatsObserver; -class RtcpRttStats; // CallStats keeps track of statistics for a call. -class CallStats : public Module { +class CallStats : public Module, public RtcpRttStats { public: - friend class RtcpObserver; + // Time interval for updating the observers. + static constexpr int64_t kUpdateIntervalMs = 1000; - explicit CallStats(Clock* clock); + CallStats(Clock* clock, ProcessThread* process_thread); ~CallStats(); - // Implements Module, to use the process thread. - int64_t TimeUntilNextProcess() override; - void Process() override; - - // Returns a RtcpRttStats to register at a statistics provider. The object - // has the same lifetime as the CallStats instance. - RtcpRttStats* rtcp_rtt_stats() const; - // Registers/deregisters a new observer to receive statistics updates. + // Must be called from the construction thread. void RegisterStatsObserver(CallStatsObserver* observer); void DeregisterStatsObserver(CallStatsObserver* observer); + // Expose |LastProcessedRtt()| from RtcpRttStats to the public interface, as + // it is the part of the API that is needed by direct users of CallStats. + // TODO(tommi): Threading or lifetime guarantees are not explicit in how + // CallStats is used as RtcpRttStats or how pointers are cached in a + // few different places (distributed via Call). It would be good to clarify + // from what thread/TQ calls to OnRttUpdate and LastProcessedRtt need to be + // allowed. + int64_t LastProcessedRtt() const override; + + // Exposed for tests to test histogram support. + void UpdateHistogramsForTest() { UpdateHistograms(); } + // Helper struct keeping track of the time a rtt value is reported. struct RttTime { RttTime(int64_t new_rtt, int64_t rtt_time) @@ -52,34 +59,62 @@ class CallStats : public Module { const int64_t time; }; - protected: - void OnRttUpdate(int64_t rtt); - - int64_t avg_rtt_ms() const; - private: + // RtcpRttStats implementation. + void OnRttUpdate(int64_t rtt) override; + + // Implements Module, to use the process thread. + int64_t TimeUntilNextProcess() override; + void Process() override; + + // TODO(tommi): Use this to know when we're attached to the process thread? + // Alternatively, inject that pointer via the ctor since the call_stats + // test code, isn't using a processthread atm. + void ProcessThreadAttached(ProcessThread* process_thread) override; + + // This method must only be called when the process thread is not + // running, and from the construction thread. void UpdateHistograms(); Clock* const clock_; - // Protecting all members. - rtc::CriticalSection crit_; - // Observer receiving statistics updates. - std::unique_ptr rtcp_rtt_stats_; + // The last time 'Process' resulted in statistic update. - int64_t last_process_time_; + int64_t last_process_time_ RTC_GUARDED_BY(process_thread_checker_); // The last RTT in the statistics update (zero if there is no valid estimate). - int64_t max_rtt_ms_; + int64_t max_rtt_ms_ RTC_GUARDED_BY(process_thread_checker_); + + // Accessed from random threads (seemingly). Consider atomic. + // |avg_rtt_ms_| is allowed to be read on the process thread without a lock. + // |avg_rtt_ms_lock_| must be held elsewhere for reading. + // |avg_rtt_ms_lock_| must be held on the process thread for writing. int64_t avg_rtt_ms_; - int64_t sum_avg_rtt_ms_ RTC_GUARDED_BY(crit_); - int64_t num_avg_rtt_ RTC_GUARDED_BY(crit_); - int64_t time_of_first_rtt_ms_ RTC_GUARDED_BY(crit_); + + // Protects |avg_rtt_ms_|. + rtc::CriticalSection avg_rtt_ms_lock_; + + // |sum_avg_rtt_ms_|, |num_avg_rtt_| and |time_of_first_rtt_ms_| are only used + // on the ProcessThread when running. When the Process Thread is not running, + // (and only then) they can be used in UpdateHistograms(), usually called from + // the dtor. + int64_t sum_avg_rtt_ms_ RTC_GUARDED_BY(process_thread_checker_); + int64_t num_avg_rtt_ RTC_GUARDED_BY(process_thread_checker_); + int64_t time_of_first_rtt_ms_ RTC_GUARDED_BY(process_thread_checker_); // All Rtt reports within valid time interval, oldest first. - std::list reports_; + std::list reports_ RTC_GUARDED_BY(process_thread_checker_); // Observers getting stats reports. + // When attached to ProcessThread, this is read-only. In order to allow + // modification, we detach from the process thread while the observer + // list is updated, to avoid races. This allows us to not require a lock + // for the observers_ list, which makes the most common case lock free. std::list observers_; + rtc::ThreadChecker construction_thread_checker_; + rtc::ThreadChecker process_thread_checker_; + ProcessThread* const process_thread_; + bool process_thread_running_ RTC_GUARDED_BY(construction_thread_checker_); + RTC_DISALLOW_COPY_AND_ASSIGN(CallStats); }; diff --git a/video/call_stats_unittest.cc b/video/call_stats_unittest.cc index 989722d29f..33076c518c 100644 --- a/video/call_stats_unittest.cc +++ b/video/call_stats_unittest.cc @@ -8,17 +8,23 @@ * be found in the AUTHORS file in the root of the source tree. */ +#include "video/call_stats.h" + #include #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/utility/include/process_thread.h" +#include "rtc_base/event.h" +#include "rtc_base/location.h" +#include "rtc_base/task_queue.h" #include "system_wrappers/include/metrics.h" #include "system_wrappers/include/metrics_default.h" #include "test/gmock.h" #include "test/gtest.h" -#include "video/call_stats.h" using ::testing::_; using ::testing::AnyNumber; +using ::testing::InvokeWithoutArgs; using ::testing::Return; namespace webrtc { @@ -33,184 +39,277 @@ class MockStatsObserver : public CallStatsObserver { class CallStatsTest : public ::testing::Test { public: - CallStatsTest() : fake_clock_(12345) {} + CallStatsTest() { + process_thread_->RegisterModule(&call_stats_, RTC_FROM_HERE); + process_thread_->Start(); + } + ~CallStatsTest() override { + process_thread_->Stop(); + process_thread_->DeRegisterModule(&call_stats_); + } protected: - virtual void SetUp() { call_stats_.reset(new CallStats(&fake_clock_)); } - SimulatedClock fake_clock_; - std::unique_ptr call_stats_; + std::unique_ptr process_thread_{ + ProcessThread::Create("CallStats")}; + SimulatedClock fake_clock_{12345}; + CallStats call_stats_{&fake_clock_, process_thread_.get()}; }; TEST_F(CallStatsTest, AddAndTriggerCallback) { + rtc::Event event(false, false); + + static constexpr const int64_t kRtt = 25; + MockStatsObserver stats_observer; - RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats(); - call_stats_->RegisterStatsObserver(&stats_observer); - fake_clock_.AdvanceTimeMilliseconds(1000); + EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&event] { event.Set(); })); + + RtcpRttStats* rtcp_rtt_stats = &call_stats_; + call_stats_.RegisterStatsObserver(&stats_observer); EXPECT_EQ(-1, rtcp_rtt_stats->LastProcessedRtt()); - const int64_t kRtt = 25; rtcp_rtt_stats->OnRttUpdate(kRtt); - EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)).Times(1); - call_stats_->Process(); + + EXPECT_TRUE(event.Wait(1000)); EXPECT_EQ(kRtt, rtcp_rtt_stats->LastProcessedRtt()); - const int64_t kRttTimeOutMs = 1500 + 10; - fake_clock_.AdvanceTimeMilliseconds(kRttTimeOutMs); - EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(0); - call_stats_->Process(); - EXPECT_EQ(-1, rtcp_rtt_stats->LastProcessedRtt()); - - call_stats_->DeregisterStatsObserver(&stats_observer); + call_stats_.DeregisterStatsObserver(&stats_observer); } TEST_F(CallStatsTest, ProcessTime) { + rtc::Event event(false, false); + + static constexpr const int64_t kRtt = 100; + static constexpr const int64_t kRtt2 = 80; + + RtcpRttStats* rtcp_rtt_stats = &call_stats_; + MockStatsObserver stats_observer; - call_stats_->RegisterStatsObserver(&stats_observer); - RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats(); - rtcp_rtt_stats->OnRttUpdate(100); - // Time isn't updated yet. - EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(0); - call_stats_->Process(); + EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)) + .Times(2) + .WillOnce(InvokeWithoutArgs([this] { + // Advance clock and verify we get an update. + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + })) + .WillRepeatedly(InvokeWithoutArgs([this, rtcp_rtt_stats] { + rtcp_rtt_stats->OnRttUpdate(kRtt2); + // Advance clock just too little to get an update. + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs - 1); + })); - // Advance clock and verify we get an update. - fake_clock_.AdvanceTimeMilliseconds(1000); - EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(1); - call_stats_->Process(); + // In case you're reading this and wondering how this number is arrived at, + // please see comments in the ChangeRtt test that go into some detail. + static constexpr const int64_t kLastAvg = 94; + EXPECT_CALL(stats_observer, OnRttUpdate(kLastAvg, kRtt2)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&event] { event.Set(); })); - // Advance clock just too little to get an update. - fake_clock_.AdvanceTimeMilliseconds(999); - rtcp_rtt_stats->OnRttUpdate(100); - EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(0); - call_stats_->Process(); + call_stats_.RegisterStatsObserver(&stats_observer); - // Advance enough to trigger a new update. - fake_clock_.AdvanceTimeMilliseconds(1); - EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(1); - call_stats_->Process(); + rtcp_rtt_stats->OnRttUpdate(kRtt); + EXPECT_TRUE(event.Wait(1000)); - call_stats_->DeregisterStatsObserver(&stats_observer); + call_stats_.DeregisterStatsObserver(&stats_observer); } // Verify all observers get correct estimates and observers can be added and // removed. TEST_F(CallStatsTest, MultipleObservers) { MockStatsObserver stats_observer_1; - call_stats_->RegisterStatsObserver(&stats_observer_1); + call_stats_.RegisterStatsObserver(&stats_observer_1); // Add the second observer twice, there should still be only one report to the // observer. MockStatsObserver stats_observer_2; - call_stats_->RegisterStatsObserver(&stats_observer_2); - call_stats_->RegisterStatsObserver(&stats_observer_2); + call_stats_.RegisterStatsObserver(&stats_observer_2); + call_stats_.RegisterStatsObserver(&stats_observer_2); - RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats(); - const int64_t kRtt = 100; - rtcp_rtt_stats->OnRttUpdate(kRtt); + RtcpRttStats* rtcp_rtt_stats = &call_stats_; + static constexpr const int64_t kRtt = 100; // Verify both observers are updated. - fake_clock_.AdvanceTimeMilliseconds(1000); - EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)).Times(1); - EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(1); - call_stats_->Process(); + rtc::Event ev1(false, false); + rtc::Event ev2(false, false); + EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([&ev1] { ev1.Set(); })) + .WillRepeatedly(Return()); + EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([&ev2] { ev2.Set(); })) + .WillRepeatedly(Return()); + rtcp_rtt_stats->OnRttUpdate(kRtt); + ASSERT_TRUE(ev1.Wait(100)); + ASSERT_TRUE(ev2.Wait(100)); // Deregister the second observer and verify update is only sent to the first // observer. - call_stats_->DeregisterStatsObserver(&stats_observer_2); - rtcp_rtt_stats->OnRttUpdate(kRtt); - fake_clock_.AdvanceTimeMilliseconds(1000); - EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)).Times(1); + call_stats_.DeregisterStatsObserver(&stats_observer_2); + + EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([&ev1] { ev1.Set(); })) + .WillRepeatedly(Return()); EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(0); - call_stats_->Process(); + rtcp_rtt_stats->OnRttUpdate(kRtt); + ASSERT_TRUE(ev1.Wait(100)); // Deregister the first observer. - call_stats_->DeregisterStatsObserver(&stats_observer_1); - rtcp_rtt_stats->OnRttUpdate(kRtt); - fake_clock_.AdvanceTimeMilliseconds(1000); + call_stats_.DeregisterStatsObserver(&stats_observer_1); + + // Now make sure we don't get any callbacks. EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)).Times(0); EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(0); - call_stats_->Process(); + rtcp_rtt_stats->OnRttUpdate(kRtt); + + // Force a call to Process(). + process_thread_->WakeUp(&call_stats_); + + // Flush the queue on the process thread to make sure we return after + // Process() has been called. + rtc::Event event(false, false); + process_thread_->PostTask(rtc::NewClosure([&event]() { event.Set(); })); + event.Wait(rtc::Event::kForever); } // Verify increasing and decreasing rtt triggers callbacks with correct values. TEST_F(CallStatsTest, ChangeRtt) { + // TODO(tommi): This test assumes things about how old reports are removed + // inside of call_stats.cc. The threshold ms value is 1500ms, but it's not + // clear here that how the clock is advanced, affects that algorithm and + // subsequently the average reported rtt. + MockStatsObserver stats_observer; - call_stats_->RegisterStatsObserver(&stats_observer); - RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats(); + call_stats_.RegisterStatsObserver(&stats_observer); + RtcpRttStats* rtcp_rtt_stats = &call_stats_; - // Advance clock to be ready for an update. - fake_clock_.AdvanceTimeMilliseconds(1000); + rtc::Event event(false, false); - // Set a first value and verify the callback is triggered. - const int64_t kFirstRtt = 100; - rtcp_rtt_stats->OnRttUpdate(kFirstRtt); - EXPECT_CALL(stats_observer, OnRttUpdate(kFirstRtt, kFirstRtt)).Times(1); - call_stats_->Process(); + static constexpr const int64_t kFirstRtt = 100; + static constexpr const int64_t kLowRtt = kFirstRtt - 20; + static constexpr const int64_t kHighRtt = kFirstRtt + 20; - // Increase rtt and verify the new value is reported. - fake_clock_.AdvanceTimeMilliseconds(1000); - const int64_t kHighRtt = kFirstRtt + 20; - const int64_t kAvgRtt1 = 103; - rtcp_rtt_stats->OnRttUpdate(kHighRtt); - EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kHighRtt)).Times(1); - call_stats_->Process(); + EXPECT_CALL(stats_observer, OnRttUpdate(kFirstRtt, kFirstRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&rtcp_rtt_stats, this] { + fake_clock_.AdvanceTimeMilliseconds(1000); + rtcp_rtt_stats->OnRttUpdate(kHighRtt); // Reported at T1 (1000ms). + })); + + // TODO(tommi): This relies on the internal algorithms of call_stats.cc. + // There's a weight factor there (0.3), that weighs the previous average to + // the new one by 70%, so the number 103 in this case is arrived at like so: + // (100) / 1 * 0.7 + (100+120)/2 * 0.3 = 103 + static constexpr const int64_t kAvgRtt1 = 103; + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kHighRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&rtcp_rtt_stats, this] { + // This interacts with an internal implementation detail in call_stats + // that decays the oldest rtt value. See more below. + fake_clock_.AdvanceTimeMilliseconds(1000); + rtcp_rtt_stats->OnRttUpdate(kLowRtt); // Reported at T2 (2000ms). + })); // Increase time enough for a new update, but not too much to make the // rtt invalid. Report a lower rtt and verify the old/high value still is sent // in the callback. - fake_clock_.AdvanceTimeMilliseconds(1000); - const int64_t kLowRtt = kFirstRtt - 20; - const int64_t kAvgRtt2 = 102; - rtcp_rtt_stats->OnRttUpdate(kLowRtt); - EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kHighRtt)).Times(1); - call_stats_->Process(); - // Advance time to make the high report invalid, the lower rtt should now be - // in the callback. - fake_clock_.AdvanceTimeMilliseconds(1000); - const int64_t kAvgRtt3 = 95; - EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt3, kLowRtt)).Times(1); - call_stats_->Process(); + // Here, enough time must have passed in order to remove exactly the first + // report and nothing else (>1500ms has passed since the first rtt). + // So, this value is arrived by doing: + // (kAvgRtt1)/1 * 0.7 + (kHighRtt+kLowRtt)/2 * 0.3 = 102.1 + static constexpr const int64_t kAvgRtt2 = 102; + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kHighRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { + // Advance time to make the high report invalid, the lower rtt should + // now be in the callback. + fake_clock_.AdvanceTimeMilliseconds(1000); + })); - call_stats_->DeregisterStatsObserver(&stats_observer); + static constexpr const int64_t kAvgRtt3 = 95; + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt3, kLowRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&event] { event.Set(); })); + + // Trigger the first rtt value and set off the chain of callbacks. + rtcp_rtt_stats->OnRttUpdate(kFirstRtt); // Reported at T0 (0ms). + EXPECT_TRUE(event.Wait(1000)); + + call_stats_.DeregisterStatsObserver(&stats_observer); } TEST_F(CallStatsTest, LastProcessedRtt) { + rtc::Event event(false, false); MockStatsObserver stats_observer; - call_stats_->RegisterStatsObserver(&stats_observer); - RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats(); - fake_clock_.AdvanceTimeMilliseconds(1000); + call_stats_.RegisterStatsObserver(&stats_observer); + RtcpRttStats* rtcp_rtt_stats = &call_stats_; + + static constexpr const int64_t kRttLow = 10; + static constexpr const int64_t kRttHigh = 30; + // The following two average numbers dependend on average + weight + // calculations in call_stats.cc. + static constexpr const int64_t kAvgRtt1 = 13; + static constexpr const int64_t kAvgRtt2 = 15; + + EXPECT_CALL(stats_observer, OnRttUpdate(kRttLow, kRttLow)) + .Times(1) + .WillOnce(InvokeWithoutArgs([rtcp_rtt_stats] { + EXPECT_EQ(kRttLow, rtcp_rtt_stats->LastProcessedRtt()); + // Don't advance the clock to make sure that low and high rtt values + // are associated with the same time stamp. + rtcp_rtt_stats->OnRttUpdate(kRttHigh); + })); + + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kRttHigh)) + .Times(1) + .WillOnce(InvokeWithoutArgs([rtcp_rtt_stats, this] { + EXPECT_EQ(kAvgRtt1, rtcp_rtt_stats->LastProcessedRtt()); + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + rtcp_rtt_stats->OnRttUpdate(kRttLow); + rtcp_rtt_stats->OnRttUpdate(kRttHigh); + })); + + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kRttHigh)) + .Times(1) + .WillOnce(InvokeWithoutArgs([rtcp_rtt_stats, &event] { + EXPECT_EQ(kAvgRtt2, rtcp_rtt_stats->LastProcessedRtt()); + event.Set(); + })); // Set a first values and verify that LastProcessedRtt initially returns the // average rtt. - const int64_t kRttLow = 10; - const int64_t kRttHigh = 30; - const int64_t kAvgRtt = 20; + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); rtcp_rtt_stats->OnRttUpdate(kRttLow); - rtcp_rtt_stats->OnRttUpdate(kRttHigh); - EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt, kRttHigh)).Times(1); - call_stats_->Process(); - EXPECT_EQ(kAvgRtt, rtcp_rtt_stats->LastProcessedRtt()); + EXPECT_TRUE(event.Wait(1000)); + EXPECT_EQ(kAvgRtt2, rtcp_rtt_stats->LastProcessedRtt()); - // Update values and verify LastProcessedRtt. - fake_clock_.AdvanceTimeMilliseconds(1000); - rtcp_rtt_stats->OnRttUpdate(kRttLow); - rtcp_rtt_stats->OnRttUpdate(kRttHigh); - EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt, kRttHigh)).Times(1); - call_stats_->Process(); - EXPECT_EQ(kAvgRtt, rtcp_rtt_stats->LastProcessedRtt()); - - call_stats_->DeregisterStatsObserver(&stats_observer); + call_stats_.DeregisterStatsObserver(&stats_observer); } TEST_F(CallStatsTest, ProducesHistogramMetrics) { metrics::Reset(); - const int64_t kRtt = 123; - RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats(); + rtc::Event event(false, false); + static constexpr const int64_t kRtt = 123; + RtcpRttStats* rtcp_rtt_stats = &call_stats_; + MockStatsObserver stats_observer; + call_stats_.RegisterStatsObserver(&stats_observer); + EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([&event] { event.Set(); })) + .WillRepeatedly(Return()); + rtcp_rtt_stats->OnRttUpdate(kRtt); - fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds * 1000); + fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds * + CallStats::kUpdateIntervalMs); rtcp_rtt_stats->OnRttUpdate(kRtt); - call_stats_->Process(); - call_stats_.reset(); + EXPECT_TRUE(event.Wait(1000)); + + call_stats_.DeregisterStatsObserver(&stats_observer); + + process_thread_->Stop(); + call_stats_.UpdateHistogramsForTest(); EXPECT_EQ(1, metrics::NumSamples( "WebRTC.Video.AverageRoundTripTimeInMilliseconds")); diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc index 4b9bdf6393..a632167203 100644 --- a/video/video_receive_stream.cc +++ b/video/video_receive_stream.cc @@ -101,7 +101,7 @@ VideoReceiveStream::VideoReceiveStream( video_receiver_(clock_, nullptr, this, timing_.get(), this, this), stats_proxy_(&config_, clock_), rtp_video_stream_receiver_(&transport_adapter_, - call_stats_->rtcp_rtt_stats(), + call_stats, packet_router, &config_, rtp_receive_statistics_.get(), diff --git a/video/video_receive_stream_unittest.cc b/video/video_receive_stream_unittest.cc index 97a447ba77..e76c3a9b40 100644 --- a/video/video_receive_stream_unittest.cc +++ b/video/video_receive_stream_unittest.cc @@ -67,10 +67,10 @@ class MockVideoDecoder : public VideoDecoder { class VideoReceiveStreamTest : public testing::Test { public: VideoReceiveStreamTest() - : override_field_trials_(kNewJitterBufferFieldTrialEnabled), + : process_thread_(ProcessThread::Create("TestThread")), + override_field_trials_(kNewJitterBufferFieldTrialEnabled), config_(&mock_transport_), - call_stats_(Clock::GetRealTimeClock()), - process_thread_(ProcessThread::Create("TestThread")) {} + call_stats_(Clock::GetRealTimeClock(), process_thread_.get()) {} void SetUp() { constexpr int kDefaultNumCpuCores = 2; @@ -96,6 +96,7 @@ class VideoReceiveStreamTest : public testing::Test { } protected: + std::unique_ptr process_thread_; webrtc::test::ScopedFieldTrials override_field_trials_; VideoReceiveStream::Config config_; CallStats call_stats_; @@ -104,7 +105,6 @@ class VideoReceiveStreamTest : public testing::Test { cricket::FakeVideoRenderer fake_renderer_; MockTransport mock_transport_; PacketRouter packet_router_; - std::unique_ptr process_thread_; RtpStreamReceiverController rtp_stream_receiver_controller_; std::unique_ptr video_receive_stream_; }; diff --git a/video/video_send_stream.cc b/video/video_send_stream.cc index 7ca89612b4..a32917a651 100644 --- a/video/video_send_stream.cc +++ b/video/video_send_stream.cc @@ -721,7 +721,7 @@ VideoSendStreamImpl::VideoSendStreamImpl( &encoder_feedback_, bandwidth_observer_, transport, - call_stats_->rtcp_rtt_stats(), + call_stats, flexfec_sender_.get(), stats_proxy_, send_delay_stats,