diff --git a/webrtc/stats/rtcstatscollector.cc b/webrtc/stats/rtcstatscollector.cc index 4bedad62ee..1cdafd6c91 100644 --- a/webrtc/stats/rtcstatscollector.cc +++ b/webrtc/stats/rtcstatscollector.cc @@ -20,50 +20,144 @@ namespace webrtc { -RTCStatsCollector::RTCStatsCollector( - PeerConnection* pc, - int64_t cache_lifetime_us) +rtc::scoped_refptr RTCStatsCollector::Create( + PeerConnection* pc, int64_t cache_lifetime_us) { + return rtc::scoped_refptr( + new rtc::RefCountedObject(pc, cache_lifetime_us)); +} + +RTCStatsCollector::RTCStatsCollector(PeerConnection* pc, + int64_t cache_lifetime_us) : pc_(pc), + signaling_thread_(pc->session()->signaling_thread()), + worker_thread_(pc->session()->worker_thread()), + network_thread_(pc->session()->network_thread()), + num_pending_partial_reports_(0), + partial_report_timestamp_us_(0), cache_timestamp_us_(0), cache_lifetime_us_(cache_lifetime_us) { RTC_DCHECK(pc_); - RTC_DCHECK(IsOnSignalingThread()); + RTC_DCHECK(signaling_thread_); + RTC_DCHECK(worker_thread_); + RTC_DCHECK(network_thread_); RTC_DCHECK_GE(cache_lifetime_us_, 0); } -rtc::scoped_refptr RTCStatsCollector::GetStatsReport() { - RTC_DCHECK(IsOnSignalingThread()); +void RTCStatsCollector::GetStatsReport( + rtc::scoped_refptr callback) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK(callback); + callbacks_.push_back(callback); + // "Now" using a monotonically increasing timer. int64_t cache_now_us = rtc::TimeMicros(); if (cached_report_ && cache_now_us - cache_timestamp_us_ <= cache_lifetime_us_) { - return cached_report_; + // We have a fresh cached report to deliver. + DeliverCachedReport(); + } else if (!num_pending_partial_reports_) { + // Only start gathering stats if we're not already gathering stats. In the + // case of already gathering stats, |callback_| will be invoked when there + // are no more pending partial reports. + + // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970, + // UTC), in microseconds. The system clock could be modified and is not + // necessarily monotonically increasing. + int64_t timestamp_us = static_cast( + rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); + + num_pending_partial_reports_ = 3; + partial_report_timestamp_us_ = cache_now_us; + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, + rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnSignalingThread, + rtc::scoped_refptr(this), timestamp_us)); + invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, + rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnWorkerThread, + rtc::scoped_refptr(this), timestamp_us)); + invoker_.AsyncInvoke(RTC_FROM_HERE, network_thread_, + rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnNetworkThread, + rtc::scoped_refptr(this), timestamp_us)); } - cache_timestamp_us_ = cache_now_us; - // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970, UTC), - // in microseconds. The system clock could be modified and is not necessarily - // monotonically increasing. - int64_t timestamp_us = static_cast( - rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); - - rtc::scoped_refptr report = RTCStatsReport::Create(); - report->AddStats(ProducePeerConnectionStats(timestamp_us)); - - cached_report_ = report; - return cached_report_; } void RTCStatsCollector::ClearCachedStatsReport() { - RTC_DCHECK(IsOnSignalingThread()); + RTC_DCHECK(signaling_thread_->IsCurrent()); cached_report_ = nullptr; } -bool RTCStatsCollector::IsOnSignalingThread() const { - return pc_->session()->signaling_thread()->IsCurrent(); +void RTCStatsCollector::ProducePartialResultsOnSignalingThread( + int64_t timestamp_us) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + rtc::scoped_refptr report = RTCStatsReport::Create(); + + report->AddStats(ProducePeerConnectionStats_s(timestamp_us)); + + AddPartialResults(report); +} + +void RTCStatsCollector::ProducePartialResultsOnWorkerThread( + int64_t timestamp_us) { + RTC_DCHECK(worker_thread_->IsCurrent()); + rtc::scoped_refptr report = RTCStatsReport::Create(); + + // TODO(hbos): Gather stats on worker thread. + + AddPartialResults(report); +} + +void RTCStatsCollector::ProducePartialResultsOnNetworkThread( + int64_t timestamp_us) { + RTC_DCHECK(network_thread_->IsCurrent()); + rtc::scoped_refptr report = RTCStatsReport::Create(); + + // TODO(hbos): Gather stats on network thread. + + AddPartialResults(report); +} + +void RTCStatsCollector::AddPartialResults( + const rtc::scoped_refptr& partial_report) { + if (!signaling_thread_->IsCurrent()) { + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, + rtc::Bind(&RTCStatsCollector::AddPartialResults_s, + rtc::scoped_refptr(this), + partial_report)); + return; + } + AddPartialResults_s(partial_report); +} + +void RTCStatsCollector::AddPartialResults_s( + rtc::scoped_refptr partial_report) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK_GT(num_pending_partial_reports_, 0); + if (!partial_report_) + partial_report_ = partial_report; + else + partial_report_->TakeMembersFrom(partial_report); + --num_pending_partial_reports_; + if (!num_pending_partial_reports_) { + cache_timestamp_us_ = partial_report_timestamp_us_; + cached_report_ = partial_report_; + partial_report_ = nullptr; + DeliverCachedReport(); + } +} + +void RTCStatsCollector::DeliverCachedReport() { + RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK(!callbacks_.empty()); + RTC_DCHECK(cached_report_); + for (const rtc::scoped_refptr& callback : + callbacks_) { + callback->OnStatsDelivered(cached_report_); + } + callbacks_.clear(); } std::unique_ptr -RTCStatsCollector::ProducePeerConnectionStats(int64_t timestamp_us) const { +RTCStatsCollector::ProducePeerConnectionStats_s(int64_t timestamp_us) const { + RTC_DCHECK(signaling_thread_->IsCurrent()); // TODO(hbos): If data channels are removed from the peer connection this will // yield incorrect counts. Address before closing crbug.com/636818. See // https://w3c.github.io/webrtc-stats/webrtc-stats.html#pcstats-dict*. diff --git a/webrtc/stats/rtcstatscollector.h b/webrtc/stats/rtcstatscollector.h index 735421d541..ee59a10022 100644 --- a/webrtc/stats/rtcstatscollector.h +++ b/webrtc/stats/rtcstatscollector.h @@ -12,9 +12,12 @@ #define WEBRTC_STATS_RTCSTATSCOLLECTOR_H_ #include +#include #include "webrtc/api/rtcstats_objects.h" #include "webrtc/api/rtcstatsreport.h" +#include "webrtc/base/asyncinvoker.h" +#include "webrtc/base/refcount.h" #include "webrtc/base/scoped_ref_ptr.h" #include "webrtc/base/timeutils.h" @@ -22,11 +25,21 @@ namespace webrtc { class PeerConnection; -// All calls to the collector and gathering of stats is performed on the -// signaling thread. A stats report is cached for |cache_lifetime_| ms. -class RTCStatsCollector { +class RTCStatsCollectorCallback : public virtual rtc::RefCountInterface { public: - explicit RTCStatsCollector( + virtual ~RTCStatsCollectorCallback() {} + + virtual void OnStatsDelivered( + const rtc::scoped_refptr& report) = 0; +}; + +// All public methods of the collector are to be called on the signaling thread. +// Stats are gathered on the signaling, worker and network threads +// asynchronously. The callback is invoked on the signaling thread. Resulting +// reports are cached for |cache_lifetime_| ms. +class RTCStatsCollector : public virtual rtc::RefCountInterface { + public: + static rtc::scoped_refptr Create( PeerConnection* pc, int64_t cache_lifetime_us = 50 * rtc::kNumMicrosecsPerMillisec); @@ -34,18 +47,42 @@ class RTCStatsCollector { // it is returned, otherwise new stats are gathered and returned. A report is // considered fresh for |cache_lifetime_| ms. const RTCStatsReports are safe // to use across multiple threads and may be destructed on any thread. - rtc::scoped_refptr GetStatsReport(); + void GetStatsReport(rtc::scoped_refptr callback); // Clears the cache's reference to the most recent stats report. Subsequently // calling |GetStatsReport| guarantees fresh stats. void ClearCachedStatsReport(); - private: - bool IsOnSignalingThread() const; + protected: + RTCStatsCollector(PeerConnection* pc, int64_t cache_lifetime_us); - std::unique_ptr ProducePeerConnectionStats( + // Stats gathering on a particular thread. Calls |AddPartialResults| before + // returning. Virtual for the sake of testing. + virtual void ProducePartialResultsOnSignalingThread(int64_t timestamp_us); + virtual void ProducePartialResultsOnWorkerThread(int64_t timestamp_us); + virtual void ProducePartialResultsOnNetworkThread(int64_t timestamp_us); + + // Can be called on any thread. + void AddPartialResults( + const rtc::scoped_refptr& partial_report); + + private: + void AddPartialResults_s(rtc::scoped_refptr partial_report); + void DeliverCachedReport(); + + std::unique_ptr ProducePeerConnectionStats_s( int64_t timestamp_us) const; PeerConnection* const pc_; + rtc::Thread* const signaling_thread_; + rtc::Thread* const worker_thread_; + rtc::Thread* const network_thread_; + rtc::AsyncInvoker invoker_; + + int num_pending_partial_reports_; + int64_t partial_report_timestamp_us_; + rtc::scoped_refptr partial_report_; + std::vector> callbacks_; + // A timestamp, in microseconds, that is based on a timer that is // monotonically increasing. That is, even if the system clock is modified the // difference between the timer and this timestamp is how fresh the cached diff --git a/webrtc/stats/rtcstatscollector_unittest.cc b/webrtc/stats/rtcstatscollector_unittest.cc index f917a75809..1ead77b28d 100644 --- a/webrtc/stats/rtcstatscollector_unittest.cc +++ b/webrtc/stats/rtcstatscollector_unittest.cc @@ -24,6 +24,7 @@ #include "webrtc/base/fakeclock.h" #include "webrtc/base/gunit.h" #include "webrtc/base/logging.h" +#include "webrtc/base/thread_checker.h" #include "webrtc/base/timedelta.h" #include "webrtc/base/timeutils.h" #include "webrtc/base/timing.h" @@ -34,9 +35,13 @@ using testing::ReturnRef; namespace webrtc { -class RTCStatsCollectorTester : public SetSessionDescriptionObserver { +namespace { + +const int64_t kGetStatsReportTimeoutMs = 1000; + +class RTCStatsCollectorTestHelper : public SetSessionDescriptionObserver { public: - RTCStatsCollectorTester() + RTCStatsCollectorTestHelper() : worker_thread_(rtc::Thread::Current()), network_thread_(rtc::Thread::Current()), channel_manager_(new cricket::ChannelManager( @@ -77,41 +82,241 @@ class RTCStatsCollectorTester : public SetSessionDescriptionObserver { std::vector> data_channels_; }; -class RTCStatsCollectorTest : public testing::Test { +class RTCTestStats : public RTCStats { public: - RTCStatsCollectorTest() - : test_(new rtc::RefCountedObject()), - collector_(&test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec) { + RTCTestStats(const std::string& id, int64_t timestamp_us) + : RTCStats(id, timestamp_us), + dummy_stat("dummyStat") {} + + WEBRTC_RTCSTATS_IMPL(RTCStats, RTCTestStats, + &dummy_stat); + + RTCStatsMember dummy_stat; +}; + +const char RTCTestStats::kType[] = "test-stats"; + +// Overrides the stats collection to verify thread usage and that the resulting +// partial reports are merged. +class FakeRTCStatsCollector : public RTCStatsCollector, + public RTCStatsCollectorCallback { + public: + static rtc::scoped_refptr Create( + PeerConnection* pc, + int64_t cache_lifetime_us) { + return rtc::scoped_refptr( + new rtc::RefCountedObject( + pc, cache_lifetime_us)); + } + + // RTCStatsCollectorCallback implementation. + void OnStatsDelivered( + const rtc::scoped_refptr& report) override { + EXPECT_TRUE(signaling_thread_->IsCurrent()); + rtc::CritScope cs(&lock_); + delivered_report_ = report; + } + + void VerifyThreadUsageAndResultsMerging() { + GetStatsReport(rtc::scoped_refptr(this)); + EXPECT_TRUE_WAIT(HasVerifiedResults(), kGetStatsReportTimeoutMs); + } + + bool HasVerifiedResults() { + EXPECT_TRUE(signaling_thread_->IsCurrent()); + rtc::CritScope cs(&lock_); + if (!delivered_report_) + return false; + EXPECT_EQ(produced_on_signaling_thread_, 1); + EXPECT_EQ(produced_on_worker_thread_, 1); + EXPECT_EQ(produced_on_network_thread_, 1); + + EXPECT_TRUE(delivered_report_->Get("SignalingThreadStats")); + EXPECT_TRUE(delivered_report_->Get("WorkerThreadStats")); + EXPECT_TRUE(delivered_report_->Get("NetworkThreadStats")); + + produced_on_signaling_thread_ = 0; + produced_on_worker_thread_ = 0; + produced_on_network_thread_ = 0; + delivered_report_ = nullptr; + return true; } protected: - rtc::scoped_refptr test_; - RTCStatsCollector collector_; + FakeRTCStatsCollector( + PeerConnection* pc, + int64_t cache_lifetime) + : RTCStatsCollector(pc, cache_lifetime), + signaling_thread_(pc->session()->signaling_thread()), + worker_thread_(pc->session()->worker_thread()), + network_thread_(pc->session()->network_thread()) { + } + + void ProducePartialResultsOnSignalingThread(int64_t timestamp_us) override { + EXPECT_TRUE(signaling_thread_->IsCurrent()); + { + rtc::CritScope cs(&lock_); + EXPECT_FALSE(delivered_report_); + ++produced_on_signaling_thread_; + } + + rtc::scoped_refptr signaling_report = + RTCStatsReport::Create(); + signaling_report->AddStats(std::unique_ptr( + new RTCTestStats("SignalingThreadStats", timestamp_us))); + AddPartialResults(signaling_report); + } + void ProducePartialResultsOnWorkerThread(int64_t timestamp_us) override { + EXPECT_TRUE(worker_thread_->IsCurrent()); + { + rtc::CritScope cs(&lock_); + EXPECT_FALSE(delivered_report_); + ++produced_on_worker_thread_; + } + + rtc::scoped_refptr worker_report = RTCStatsReport::Create(); + worker_report->AddStats(std::unique_ptr( + new RTCTestStats("WorkerThreadStats", timestamp_us))); + AddPartialResults(worker_report); + } + void ProducePartialResultsOnNetworkThread(int64_t timestamp_us) override { + EXPECT_TRUE(network_thread_->IsCurrent()); + { + rtc::CritScope cs(&lock_); + EXPECT_FALSE(delivered_report_); + ++produced_on_network_thread_; + } + + rtc::scoped_refptr network_report = + RTCStatsReport::Create(); + network_report->AddStats(std::unique_ptr( + new RTCTestStats("NetworkThreadStats", timestamp_us))); + AddPartialResults(network_report); + } + + private: + rtc::Thread* const signaling_thread_; + rtc::Thread* const worker_thread_; + rtc::Thread* const network_thread_; + + rtc::CriticalSection lock_; + rtc::scoped_refptr delivered_report_; + int produced_on_signaling_thread_ = 0; + int produced_on_worker_thread_ = 0; + int produced_on_network_thread_ = 0; }; -TEST_F(RTCStatsCollectorTest, CachedStatsReport) { +class StatsCallback : public RTCStatsCollectorCallback { + public: + static rtc::scoped_refptr Create( + rtc::scoped_refptr* report_ptr = nullptr) { + return rtc::scoped_refptr( + new rtc::RefCountedObject(report_ptr)); + } + + void OnStatsDelivered( + const rtc::scoped_refptr& report) override { + EXPECT_TRUE(thread_checker_.CalledOnValidThread()); + report_ = report; + if (report_ptr_) + *report_ptr_ = report_; + } + + rtc::scoped_refptr report() const { + EXPECT_TRUE(thread_checker_.CalledOnValidThread()); + return report_; + } + + protected: + explicit StatsCallback(rtc::scoped_refptr* report_ptr) + : report_ptr_(report_ptr) {} + + private: + rtc::ThreadChecker thread_checker_; + rtc::scoped_refptr report_; + rtc::scoped_refptr* report_ptr_; +}; + +class RTCStatsCollectorTest : public testing::Test { + public: + RTCStatsCollectorTest() + : test_(new rtc::RefCountedObject()), + collector_(RTCStatsCollector::Create( + &test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec)) { + } + + rtc::scoped_refptr GetStatsReport() { + rtc::scoped_refptr callback = StatsCallback::Create(); + collector_->GetStatsReport(callback); + EXPECT_TRUE_WAIT(callback->report(), kGetStatsReportTimeoutMs); + return callback->report(); + } + + protected: + rtc::scoped_refptr test_; + rtc::scoped_refptr collector_; +}; + +TEST_F(RTCStatsCollectorTest, SingleCallback) { + rtc::scoped_refptr result; + collector_->GetStatsReport(StatsCallback::Create(&result)); + EXPECT_TRUE_WAIT(result, kGetStatsReportTimeoutMs); +} + +TEST_F(RTCStatsCollectorTest, MultipleCallbacks) { + rtc::scoped_refptr a; + rtc::scoped_refptr b; + rtc::scoped_refptr c; + collector_->GetStatsReport(StatsCallback::Create(&a)); + collector_->GetStatsReport(StatsCallback::Create(&b)); + collector_->GetStatsReport(StatsCallback::Create(&c)); + EXPECT_TRUE_WAIT(a, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(b, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(c, kGetStatsReportTimeoutMs); + EXPECT_EQ(a.get(), b.get()); + EXPECT_EQ(b.get(), c.get()); +} + +TEST_F(RTCStatsCollectorTest, CachedStatsReports) { rtc::ScopedFakeClock fake_clock; // Caching should ensure |a| and |b| are the same report. - rtc::scoped_refptr a = collector_.GetStatsReport(); - rtc::scoped_refptr b = collector_.GetStatsReport(); - EXPECT_TRUE(a); + rtc::scoped_refptr a = GetStatsReport(); + rtc::scoped_refptr b = GetStatsReport(); EXPECT_EQ(a.get(), b.get()); // Invalidate cache by clearing it. - collector_.ClearCachedStatsReport(); - rtc::scoped_refptr c = collector_.GetStatsReport(); - EXPECT_TRUE(c); + collector_->ClearCachedStatsReport(); + rtc::scoped_refptr c = GetStatsReport(); EXPECT_NE(b.get(), c.get()); // Invalidate cache by advancing time. fake_clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(51)); - rtc::scoped_refptr d = collector_.GetStatsReport(); + rtc::scoped_refptr d = GetStatsReport(); EXPECT_TRUE(d); EXPECT_NE(c.get(), d.get()); } +TEST_F(RTCStatsCollectorTest, MultipleCallbacksWithInvalidatedCacheInBetween) { + rtc::ScopedFakeClock fake_clock; + rtc::scoped_refptr a; + rtc::scoped_refptr b; + rtc::scoped_refptr c; + collector_->GetStatsReport(StatsCallback::Create(&a)); + collector_->GetStatsReport(StatsCallback::Create(&b)); + // Cache is invalidated after 50 ms. + fake_clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(51)); + collector_->GetStatsReport(StatsCallback::Create(&c)); + EXPECT_TRUE_WAIT(a, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(b, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(c, kGetStatsReportTimeoutMs); + EXPECT_EQ(a.get(), b.get()); + // The act of doing |AdvanceTime| processes all messages. If this was not the + // case we might not require |c| to be fresher than |b|. + EXPECT_NE(c.get(), b.get()); +} + TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { int64_t before = static_cast( rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); - rtc::scoped_refptr report = collector_.GetStatsReport(); + rtc::scoped_refptr report = GetStatsReport(); int64_t after = static_cast( rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); EXPECT_EQ(report->GetStatsOfType().size(), @@ -137,8 +342,8 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { test_->data_channels().push_back( new MockDataChannel(DataChannelInterface::kClosed)); - collector_.ClearCachedStatsReport(); - report = collector_.GetStatsReport(); + collector_->ClearCachedStatsReport(); + report = GetStatsReport(); EXPECT_EQ(report->GetStatsOfType().size(), static_cast(1)) << "Expecting 1 RTCPeerConnectionStats."; stats = report->Get("RTCPeerConnection"); @@ -156,4 +361,23 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { } } +class RTCStatsCollectorTestWithFakeCollector : public testing::Test { + public: + RTCStatsCollectorTestWithFakeCollector() + : test_(new rtc::RefCountedObject()), + collector_(FakeRTCStatsCollector::Create( + &test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec)) { + } + + protected: + rtc::scoped_refptr test_; + rtc::scoped_refptr collector_; +}; + +TEST_F(RTCStatsCollectorTestWithFakeCollector, ThreadUsageAndResultsMerging) { + collector_->VerifyThreadUsageAndResultsMerging(); +} + +} // namespace + } // namespace webrtc