From c82f2e1613e394f92c56d14c38b3f9069075039e Mon Sep 17 00:00:00 2001 From: hbos Date: Mon, 5 Sep 2016 01:36:50 -0700 Subject: [PATCH] RTCStatsCollector collecting stats on multiple threads. Changes GetStatsReport to a callback-based function. Stats collection is dispatched to three different stats collecting methods, being invoked asynchronously on the signaling, worker and network threads. The three resulting stats reports are merged into one before returned. The only current stats being collected is on the signaling thread, but a FakeRTCStatsCollector is able to test the multi-threaded and stats-merging behaviors. Future CLs simply have to put their stats collecting code in the appropriate ProducePartialResultsOnFooThread method. BUG=chromium:627816 Review-Url: https://codereview.webrtc.org/2270033004 Cr-Commit-Position: refs/heads/master@{#14064} --- webrtc/stats/rtcstatscollector.cc | 140 +++++++++-- webrtc/stats/rtcstatscollector.h | 53 ++++- webrtc/stats/rtcstatscollector_unittest.cc | 262 +++++++++++++++++++-- 3 files changed, 405 insertions(+), 50 deletions(-) diff --git a/webrtc/stats/rtcstatscollector.cc b/webrtc/stats/rtcstatscollector.cc index 4bedad62ee..1cdafd6c91 100644 --- a/webrtc/stats/rtcstatscollector.cc +++ b/webrtc/stats/rtcstatscollector.cc @@ -20,50 +20,144 @@ namespace webrtc { -RTCStatsCollector::RTCStatsCollector( - PeerConnection* pc, - int64_t cache_lifetime_us) +rtc::scoped_refptr RTCStatsCollector::Create( + PeerConnection* pc, int64_t cache_lifetime_us) { + return rtc::scoped_refptr( + new rtc::RefCountedObject(pc, cache_lifetime_us)); +} + +RTCStatsCollector::RTCStatsCollector(PeerConnection* pc, + int64_t cache_lifetime_us) : pc_(pc), + signaling_thread_(pc->session()->signaling_thread()), + worker_thread_(pc->session()->worker_thread()), + network_thread_(pc->session()->network_thread()), + num_pending_partial_reports_(0), + partial_report_timestamp_us_(0), cache_timestamp_us_(0), cache_lifetime_us_(cache_lifetime_us) { RTC_DCHECK(pc_); - RTC_DCHECK(IsOnSignalingThread()); + RTC_DCHECK(signaling_thread_); + RTC_DCHECK(worker_thread_); + RTC_DCHECK(network_thread_); RTC_DCHECK_GE(cache_lifetime_us_, 0); } -rtc::scoped_refptr RTCStatsCollector::GetStatsReport() { - RTC_DCHECK(IsOnSignalingThread()); +void RTCStatsCollector::GetStatsReport( + rtc::scoped_refptr callback) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK(callback); + callbacks_.push_back(callback); + // "Now" using a monotonically increasing timer. int64_t cache_now_us = rtc::TimeMicros(); if (cached_report_ && cache_now_us - cache_timestamp_us_ <= cache_lifetime_us_) { - return cached_report_; + // We have a fresh cached report to deliver. + DeliverCachedReport(); + } else if (!num_pending_partial_reports_) { + // Only start gathering stats if we're not already gathering stats. In the + // case of already gathering stats, |callback_| will be invoked when there + // are no more pending partial reports. + + // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970, + // UTC), in microseconds. The system clock could be modified and is not + // necessarily monotonically increasing. + int64_t timestamp_us = static_cast( + rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); + + num_pending_partial_reports_ = 3; + partial_report_timestamp_us_ = cache_now_us; + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, + rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnSignalingThread, + rtc::scoped_refptr(this), timestamp_us)); + invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, + rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnWorkerThread, + rtc::scoped_refptr(this), timestamp_us)); + invoker_.AsyncInvoke(RTC_FROM_HERE, network_thread_, + rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnNetworkThread, + rtc::scoped_refptr(this), timestamp_us)); } - cache_timestamp_us_ = cache_now_us; - // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970, UTC), - // in microseconds. The system clock could be modified and is not necessarily - // monotonically increasing. - int64_t timestamp_us = static_cast( - rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); - - rtc::scoped_refptr report = RTCStatsReport::Create(); - report->AddStats(ProducePeerConnectionStats(timestamp_us)); - - cached_report_ = report; - return cached_report_; } void RTCStatsCollector::ClearCachedStatsReport() { - RTC_DCHECK(IsOnSignalingThread()); + RTC_DCHECK(signaling_thread_->IsCurrent()); cached_report_ = nullptr; } -bool RTCStatsCollector::IsOnSignalingThread() const { - return pc_->session()->signaling_thread()->IsCurrent(); +void RTCStatsCollector::ProducePartialResultsOnSignalingThread( + int64_t timestamp_us) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + rtc::scoped_refptr report = RTCStatsReport::Create(); + + report->AddStats(ProducePeerConnectionStats_s(timestamp_us)); + + AddPartialResults(report); +} + +void RTCStatsCollector::ProducePartialResultsOnWorkerThread( + int64_t timestamp_us) { + RTC_DCHECK(worker_thread_->IsCurrent()); + rtc::scoped_refptr report = RTCStatsReport::Create(); + + // TODO(hbos): Gather stats on worker thread. + + AddPartialResults(report); +} + +void RTCStatsCollector::ProducePartialResultsOnNetworkThread( + int64_t timestamp_us) { + RTC_DCHECK(network_thread_->IsCurrent()); + rtc::scoped_refptr report = RTCStatsReport::Create(); + + // TODO(hbos): Gather stats on network thread. + + AddPartialResults(report); +} + +void RTCStatsCollector::AddPartialResults( + const rtc::scoped_refptr& partial_report) { + if (!signaling_thread_->IsCurrent()) { + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, + rtc::Bind(&RTCStatsCollector::AddPartialResults_s, + rtc::scoped_refptr(this), + partial_report)); + return; + } + AddPartialResults_s(partial_report); +} + +void RTCStatsCollector::AddPartialResults_s( + rtc::scoped_refptr partial_report) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK_GT(num_pending_partial_reports_, 0); + if (!partial_report_) + partial_report_ = partial_report; + else + partial_report_->TakeMembersFrom(partial_report); + --num_pending_partial_reports_; + if (!num_pending_partial_reports_) { + cache_timestamp_us_ = partial_report_timestamp_us_; + cached_report_ = partial_report_; + partial_report_ = nullptr; + DeliverCachedReport(); + } +} + +void RTCStatsCollector::DeliverCachedReport() { + RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK(!callbacks_.empty()); + RTC_DCHECK(cached_report_); + for (const rtc::scoped_refptr& callback : + callbacks_) { + callback->OnStatsDelivered(cached_report_); + } + callbacks_.clear(); } std::unique_ptr -RTCStatsCollector::ProducePeerConnectionStats(int64_t timestamp_us) const { +RTCStatsCollector::ProducePeerConnectionStats_s(int64_t timestamp_us) const { + RTC_DCHECK(signaling_thread_->IsCurrent()); // TODO(hbos): If data channels are removed from the peer connection this will // yield incorrect counts. Address before closing crbug.com/636818. See // https://w3c.github.io/webrtc-stats/webrtc-stats.html#pcstats-dict*. diff --git a/webrtc/stats/rtcstatscollector.h b/webrtc/stats/rtcstatscollector.h index 735421d541..ee59a10022 100644 --- a/webrtc/stats/rtcstatscollector.h +++ b/webrtc/stats/rtcstatscollector.h @@ -12,9 +12,12 @@ #define WEBRTC_STATS_RTCSTATSCOLLECTOR_H_ #include +#include #include "webrtc/api/rtcstats_objects.h" #include "webrtc/api/rtcstatsreport.h" +#include "webrtc/base/asyncinvoker.h" +#include "webrtc/base/refcount.h" #include "webrtc/base/scoped_ref_ptr.h" #include "webrtc/base/timeutils.h" @@ -22,11 +25,21 @@ namespace webrtc { class PeerConnection; -// All calls to the collector and gathering of stats is performed on the -// signaling thread. A stats report is cached for |cache_lifetime_| ms. -class RTCStatsCollector { +class RTCStatsCollectorCallback : public virtual rtc::RefCountInterface { public: - explicit RTCStatsCollector( + virtual ~RTCStatsCollectorCallback() {} + + virtual void OnStatsDelivered( + const rtc::scoped_refptr& report) = 0; +}; + +// All public methods of the collector are to be called on the signaling thread. +// Stats are gathered on the signaling, worker and network threads +// asynchronously. The callback is invoked on the signaling thread. Resulting +// reports are cached for |cache_lifetime_| ms. +class RTCStatsCollector : public virtual rtc::RefCountInterface { + public: + static rtc::scoped_refptr Create( PeerConnection* pc, int64_t cache_lifetime_us = 50 * rtc::kNumMicrosecsPerMillisec); @@ -34,18 +47,42 @@ class RTCStatsCollector { // it is returned, otherwise new stats are gathered and returned. A report is // considered fresh for |cache_lifetime_| ms. const RTCStatsReports are safe // to use across multiple threads and may be destructed on any thread. - rtc::scoped_refptr GetStatsReport(); + void GetStatsReport(rtc::scoped_refptr callback); // Clears the cache's reference to the most recent stats report. Subsequently // calling |GetStatsReport| guarantees fresh stats. void ClearCachedStatsReport(); - private: - bool IsOnSignalingThread() const; + protected: + RTCStatsCollector(PeerConnection* pc, int64_t cache_lifetime_us); - std::unique_ptr ProducePeerConnectionStats( + // Stats gathering on a particular thread. Calls |AddPartialResults| before + // returning. Virtual for the sake of testing. + virtual void ProducePartialResultsOnSignalingThread(int64_t timestamp_us); + virtual void ProducePartialResultsOnWorkerThread(int64_t timestamp_us); + virtual void ProducePartialResultsOnNetworkThread(int64_t timestamp_us); + + // Can be called on any thread. + void AddPartialResults( + const rtc::scoped_refptr& partial_report); + + private: + void AddPartialResults_s(rtc::scoped_refptr partial_report); + void DeliverCachedReport(); + + std::unique_ptr ProducePeerConnectionStats_s( int64_t timestamp_us) const; PeerConnection* const pc_; + rtc::Thread* const signaling_thread_; + rtc::Thread* const worker_thread_; + rtc::Thread* const network_thread_; + rtc::AsyncInvoker invoker_; + + int num_pending_partial_reports_; + int64_t partial_report_timestamp_us_; + rtc::scoped_refptr partial_report_; + std::vector> callbacks_; + // A timestamp, in microseconds, that is based on a timer that is // monotonically increasing. That is, even if the system clock is modified the // difference between the timer and this timestamp is how fresh the cached diff --git a/webrtc/stats/rtcstatscollector_unittest.cc b/webrtc/stats/rtcstatscollector_unittest.cc index f917a75809..1ead77b28d 100644 --- a/webrtc/stats/rtcstatscollector_unittest.cc +++ b/webrtc/stats/rtcstatscollector_unittest.cc @@ -24,6 +24,7 @@ #include "webrtc/base/fakeclock.h" #include "webrtc/base/gunit.h" #include "webrtc/base/logging.h" +#include "webrtc/base/thread_checker.h" #include "webrtc/base/timedelta.h" #include "webrtc/base/timeutils.h" #include "webrtc/base/timing.h" @@ -34,9 +35,13 @@ using testing::ReturnRef; namespace webrtc { -class RTCStatsCollectorTester : public SetSessionDescriptionObserver { +namespace { + +const int64_t kGetStatsReportTimeoutMs = 1000; + +class RTCStatsCollectorTestHelper : public SetSessionDescriptionObserver { public: - RTCStatsCollectorTester() + RTCStatsCollectorTestHelper() : worker_thread_(rtc::Thread::Current()), network_thread_(rtc::Thread::Current()), channel_manager_(new cricket::ChannelManager( @@ -77,41 +82,241 @@ class RTCStatsCollectorTester : public SetSessionDescriptionObserver { std::vector> data_channels_; }; -class RTCStatsCollectorTest : public testing::Test { +class RTCTestStats : public RTCStats { public: - RTCStatsCollectorTest() - : test_(new rtc::RefCountedObject()), - collector_(&test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec) { + RTCTestStats(const std::string& id, int64_t timestamp_us) + : RTCStats(id, timestamp_us), + dummy_stat("dummyStat") {} + + WEBRTC_RTCSTATS_IMPL(RTCStats, RTCTestStats, + &dummy_stat); + + RTCStatsMember dummy_stat; +}; + +const char RTCTestStats::kType[] = "test-stats"; + +// Overrides the stats collection to verify thread usage and that the resulting +// partial reports are merged. +class FakeRTCStatsCollector : public RTCStatsCollector, + public RTCStatsCollectorCallback { + public: + static rtc::scoped_refptr Create( + PeerConnection* pc, + int64_t cache_lifetime_us) { + return rtc::scoped_refptr( + new rtc::RefCountedObject( + pc, cache_lifetime_us)); + } + + // RTCStatsCollectorCallback implementation. + void OnStatsDelivered( + const rtc::scoped_refptr& report) override { + EXPECT_TRUE(signaling_thread_->IsCurrent()); + rtc::CritScope cs(&lock_); + delivered_report_ = report; + } + + void VerifyThreadUsageAndResultsMerging() { + GetStatsReport(rtc::scoped_refptr(this)); + EXPECT_TRUE_WAIT(HasVerifiedResults(), kGetStatsReportTimeoutMs); + } + + bool HasVerifiedResults() { + EXPECT_TRUE(signaling_thread_->IsCurrent()); + rtc::CritScope cs(&lock_); + if (!delivered_report_) + return false; + EXPECT_EQ(produced_on_signaling_thread_, 1); + EXPECT_EQ(produced_on_worker_thread_, 1); + EXPECT_EQ(produced_on_network_thread_, 1); + + EXPECT_TRUE(delivered_report_->Get("SignalingThreadStats")); + EXPECT_TRUE(delivered_report_->Get("WorkerThreadStats")); + EXPECT_TRUE(delivered_report_->Get("NetworkThreadStats")); + + produced_on_signaling_thread_ = 0; + produced_on_worker_thread_ = 0; + produced_on_network_thread_ = 0; + delivered_report_ = nullptr; + return true; } protected: - rtc::scoped_refptr test_; - RTCStatsCollector collector_; + FakeRTCStatsCollector( + PeerConnection* pc, + int64_t cache_lifetime) + : RTCStatsCollector(pc, cache_lifetime), + signaling_thread_(pc->session()->signaling_thread()), + worker_thread_(pc->session()->worker_thread()), + network_thread_(pc->session()->network_thread()) { + } + + void ProducePartialResultsOnSignalingThread(int64_t timestamp_us) override { + EXPECT_TRUE(signaling_thread_->IsCurrent()); + { + rtc::CritScope cs(&lock_); + EXPECT_FALSE(delivered_report_); + ++produced_on_signaling_thread_; + } + + rtc::scoped_refptr signaling_report = + RTCStatsReport::Create(); + signaling_report->AddStats(std::unique_ptr( + new RTCTestStats("SignalingThreadStats", timestamp_us))); + AddPartialResults(signaling_report); + } + void ProducePartialResultsOnWorkerThread(int64_t timestamp_us) override { + EXPECT_TRUE(worker_thread_->IsCurrent()); + { + rtc::CritScope cs(&lock_); + EXPECT_FALSE(delivered_report_); + ++produced_on_worker_thread_; + } + + rtc::scoped_refptr worker_report = RTCStatsReport::Create(); + worker_report->AddStats(std::unique_ptr( + new RTCTestStats("WorkerThreadStats", timestamp_us))); + AddPartialResults(worker_report); + } + void ProducePartialResultsOnNetworkThread(int64_t timestamp_us) override { + EXPECT_TRUE(network_thread_->IsCurrent()); + { + rtc::CritScope cs(&lock_); + EXPECT_FALSE(delivered_report_); + ++produced_on_network_thread_; + } + + rtc::scoped_refptr network_report = + RTCStatsReport::Create(); + network_report->AddStats(std::unique_ptr( + new RTCTestStats("NetworkThreadStats", timestamp_us))); + AddPartialResults(network_report); + } + + private: + rtc::Thread* const signaling_thread_; + rtc::Thread* const worker_thread_; + rtc::Thread* const network_thread_; + + rtc::CriticalSection lock_; + rtc::scoped_refptr delivered_report_; + int produced_on_signaling_thread_ = 0; + int produced_on_worker_thread_ = 0; + int produced_on_network_thread_ = 0; }; -TEST_F(RTCStatsCollectorTest, CachedStatsReport) { +class StatsCallback : public RTCStatsCollectorCallback { + public: + static rtc::scoped_refptr Create( + rtc::scoped_refptr* report_ptr = nullptr) { + return rtc::scoped_refptr( + new rtc::RefCountedObject(report_ptr)); + } + + void OnStatsDelivered( + const rtc::scoped_refptr& report) override { + EXPECT_TRUE(thread_checker_.CalledOnValidThread()); + report_ = report; + if (report_ptr_) + *report_ptr_ = report_; + } + + rtc::scoped_refptr report() const { + EXPECT_TRUE(thread_checker_.CalledOnValidThread()); + return report_; + } + + protected: + explicit StatsCallback(rtc::scoped_refptr* report_ptr) + : report_ptr_(report_ptr) {} + + private: + rtc::ThreadChecker thread_checker_; + rtc::scoped_refptr report_; + rtc::scoped_refptr* report_ptr_; +}; + +class RTCStatsCollectorTest : public testing::Test { + public: + RTCStatsCollectorTest() + : test_(new rtc::RefCountedObject()), + collector_(RTCStatsCollector::Create( + &test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec)) { + } + + rtc::scoped_refptr GetStatsReport() { + rtc::scoped_refptr callback = StatsCallback::Create(); + collector_->GetStatsReport(callback); + EXPECT_TRUE_WAIT(callback->report(), kGetStatsReportTimeoutMs); + return callback->report(); + } + + protected: + rtc::scoped_refptr test_; + rtc::scoped_refptr collector_; +}; + +TEST_F(RTCStatsCollectorTest, SingleCallback) { + rtc::scoped_refptr result; + collector_->GetStatsReport(StatsCallback::Create(&result)); + EXPECT_TRUE_WAIT(result, kGetStatsReportTimeoutMs); +} + +TEST_F(RTCStatsCollectorTest, MultipleCallbacks) { + rtc::scoped_refptr a; + rtc::scoped_refptr b; + rtc::scoped_refptr c; + collector_->GetStatsReport(StatsCallback::Create(&a)); + collector_->GetStatsReport(StatsCallback::Create(&b)); + collector_->GetStatsReport(StatsCallback::Create(&c)); + EXPECT_TRUE_WAIT(a, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(b, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(c, kGetStatsReportTimeoutMs); + EXPECT_EQ(a.get(), b.get()); + EXPECT_EQ(b.get(), c.get()); +} + +TEST_F(RTCStatsCollectorTest, CachedStatsReports) { rtc::ScopedFakeClock fake_clock; // Caching should ensure |a| and |b| are the same report. - rtc::scoped_refptr a = collector_.GetStatsReport(); - rtc::scoped_refptr b = collector_.GetStatsReport(); - EXPECT_TRUE(a); + rtc::scoped_refptr a = GetStatsReport(); + rtc::scoped_refptr b = GetStatsReport(); EXPECT_EQ(a.get(), b.get()); // Invalidate cache by clearing it. - collector_.ClearCachedStatsReport(); - rtc::scoped_refptr c = collector_.GetStatsReport(); - EXPECT_TRUE(c); + collector_->ClearCachedStatsReport(); + rtc::scoped_refptr c = GetStatsReport(); EXPECT_NE(b.get(), c.get()); // Invalidate cache by advancing time. fake_clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(51)); - rtc::scoped_refptr d = collector_.GetStatsReport(); + rtc::scoped_refptr d = GetStatsReport(); EXPECT_TRUE(d); EXPECT_NE(c.get(), d.get()); } +TEST_F(RTCStatsCollectorTest, MultipleCallbacksWithInvalidatedCacheInBetween) { + rtc::ScopedFakeClock fake_clock; + rtc::scoped_refptr a; + rtc::scoped_refptr b; + rtc::scoped_refptr c; + collector_->GetStatsReport(StatsCallback::Create(&a)); + collector_->GetStatsReport(StatsCallback::Create(&b)); + // Cache is invalidated after 50 ms. + fake_clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(51)); + collector_->GetStatsReport(StatsCallback::Create(&c)); + EXPECT_TRUE_WAIT(a, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(b, kGetStatsReportTimeoutMs); + EXPECT_TRUE_WAIT(c, kGetStatsReportTimeoutMs); + EXPECT_EQ(a.get(), b.get()); + // The act of doing |AdvanceTime| processes all messages. If this was not the + // case we might not require |c| to be fresher than |b|. + EXPECT_NE(c.get(), b.get()); +} + TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { int64_t before = static_cast( rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); - rtc::scoped_refptr report = collector_.GetStatsReport(); + rtc::scoped_refptr report = GetStatsReport(); int64_t after = static_cast( rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec); EXPECT_EQ(report->GetStatsOfType().size(), @@ -137,8 +342,8 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { test_->data_channels().push_back( new MockDataChannel(DataChannelInterface::kClosed)); - collector_.ClearCachedStatsReport(); - report = collector_.GetStatsReport(); + collector_->ClearCachedStatsReport(); + report = GetStatsReport(); EXPECT_EQ(report->GetStatsOfType().size(), static_cast(1)) << "Expecting 1 RTCPeerConnectionStats."; stats = report->Get("RTCPeerConnection"); @@ -156,4 +361,23 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { } } +class RTCStatsCollectorTestWithFakeCollector : public testing::Test { + public: + RTCStatsCollectorTestWithFakeCollector() + : test_(new rtc::RefCountedObject()), + collector_(FakeRTCStatsCollector::Create( + &test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec)) { + } + + protected: + rtc::scoped_refptr test_; + rtc::scoped_refptr collector_; +}; + +TEST_F(RTCStatsCollectorTestWithFakeCollector, ThreadUsageAndResultsMerging) { + collector_->VerifyThreadUsageAndResultsMerging(); +} + +} // namespace + } // namespace webrtc