diff --git a/pc/BUILD.gn b/pc/BUILD.gn index fab96326cf..75ee47fedb 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -217,6 +217,7 @@ rtc_static_library("peerconnection") { "../rtc_base:checks", "../rtc_base:rtc_base", "../rtc_base:rtc_base_approved", + "../rtc_base:rtc_post_message_with_functor", "../rtc_base/system:rtc_export", "../rtc_base/third_party/base64", "../rtc_base/third_party/sigslot", diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc index a25a785e21..3287b92762 100644 --- a/pc/rtc_stats_collector.cc +++ b/pc/rtc_stats_collector.cc @@ -25,6 +25,7 @@ #include "pc/peer_connection.h" #include "pc/rtc_stats_traversal.h" #include "rtc_base/checks.h" +#include "rtc_base/post_message_with_functor.h" #include "rtc_base/strings/string_builder.h" #include "rtc_base/time_utils.h" #include "rtc_base/trace_event.h" @@ -753,6 +754,8 @@ RTCStatsCollector::RTCStatsCollector(PeerConnectionInternal* pc, network_thread_(pc->network_thread()), num_pending_partial_reports_(0), partial_report_timestamp_us_(0), + network_report_event_(true /* manual_reset */, + true /* initially_signaled */), cache_timestamp_us_(0), cache_lifetime_us_(cache_lifetime_us) { RTC_DCHECK(pc_); @@ -799,7 +802,7 @@ void RTCStatsCollector::GetStatsReportInternal( // reentrancy problems. std::vector requests; requests.swap(requests_); - invoker_.AsyncInvoke( + rtc::PostMessageWithFunctor( RTC_FROM_HERE, signaling_thread_, rtc::Bind(&RTCStatsCollector::DeliverCachedReport, this, cached_report_, std::move(requests))); @@ -830,10 +833,14 @@ void RTCStatsCollector::GetStatsReportInternal( // network thread, where it more naturally belongs. call_stats_ = pc_->GetCallStats(); - invoker_.AsyncInvoke( + // Don't touch |network_report_| on the signaling thread until + // ProducePartialResultsOnNetworkThread() has signaled the + // |network_report_event_|. + network_report_event_.Reset(); + rtc::PostMessageWithFunctor( RTC_FROM_HERE, network_thread_, rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnNetworkThread, - rtc::scoped_refptr(this), timestamp_us)); + this, timestamp_us)); ProducePartialResultsOnSignalingThread(timestamp_us); } } @@ -845,89 +852,117 @@ void RTCStatsCollector::ClearCachedStatsReport() { void RTCStatsCollector::WaitForPendingRequest() { RTC_DCHECK(signaling_thread_->IsCurrent()); - if (num_pending_partial_reports_) { - rtc::Thread::Current()->ProcessMessages(0); - while (num_pending_partial_reports_) { - rtc::Thread::Current()->SleepMs(1); - rtc::Thread::Current()->ProcessMessages(0); - } - } + // If a request is pending, blocks until the |network_report_event_| is + // signaled and then delivers the result. Otherwise this is a NO-OP. + MergeNetworkReport_s(); } void RTCStatsCollector::ProducePartialResultsOnSignalingThread( int64_t timestamp_us) { RTC_DCHECK(signaling_thread_->IsCurrent()); - rtc::scoped_refptr report = RTCStatsReport::Create( - timestamp_us); + partial_report_ = RTCStatsReport::Create(timestamp_us); - ProduceDataChannelStats_s(timestamp_us, report.get()); - ProduceMediaStreamStats_s(timestamp_us, report.get()); - ProduceMediaStreamTrackStats_s(timestamp_us, report.get()); - ProducePeerConnectionStats_s(timestamp_us, report.get()); + ProducePartialResultsOnSignalingThreadImpl(timestamp_us, + partial_report_.get()); - AddPartialResults(report); + // ProducePartialResultsOnSignalingThread() is running synchronously on the + // signaling thread, so it is always the first partial result delivered on the + // signaling thread. The request is not complete until MergeNetworkReport_s() + // happens; we don't have to do anything here. + RTC_DCHECK_GT(num_pending_partial_reports_, 1); + --num_pending_partial_reports_; +} + +void RTCStatsCollector::ProducePartialResultsOnSignalingThreadImpl( + int64_t timestamp_us, + RTCStatsReport* partial_report) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + ProduceDataChannelStats_s(timestamp_us, partial_report); + ProduceMediaStreamStats_s(timestamp_us, partial_report); + ProduceMediaStreamTrackStats_s(timestamp_us, partial_report); + ProducePeerConnectionStats_s(timestamp_us, partial_report); } void RTCStatsCollector::ProducePartialResultsOnNetworkThread( int64_t timestamp_us) { RTC_DCHECK(network_thread_->IsCurrent()); - rtc::scoped_refptr report = RTCStatsReport::Create( - timestamp_us); + // Touching |network_report_| on this thread is safe by this method because + // |network_report_event_| is reset before this method is invoked. + network_report_ = RTCStatsReport::Create(timestamp_us); std::map transport_stats_by_name = pc_->GetTransportStatsByNames(transport_names_); - std::map transport_cert_stats = PrepareTransportCertificateStats_n(transport_stats_by_name); - ProduceCertificateStats_n(timestamp_us, transport_cert_stats, report.get()); - ProduceCodecStats_n(timestamp_us, transceiver_stats_infos_, report.get()); - ProduceIceCandidateAndPairStats_n(timestamp_us, transport_stats_by_name, - call_stats_, report.get()); - ProduceRTPStreamStats_n(timestamp_us, transceiver_stats_infos_, report.get()); - ProduceTransportStats_n(timestamp_us, transport_stats_by_name, - transport_cert_stats, report.get()); + ProducePartialResultsOnNetworkThreadImpl( + timestamp_us, transport_stats_by_name, transport_cert_stats, + network_report_.get()); - AddPartialResults(report); + // Signal that it is now safe to touch |network_report_| on the signaling + // thread, and post a task to merge it into the final results. + network_report_event_.Set(); + rtc::PostMessageWithFunctor( + RTC_FROM_HERE, signaling_thread_, + rtc::Bind(&RTCStatsCollector::MergeNetworkReport_s, this)); } -void RTCStatsCollector::AddPartialResults( - const rtc::scoped_refptr& partial_report) { - if (!signaling_thread_->IsCurrent()) { - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, - rtc::Bind(&RTCStatsCollector::AddPartialResults_s, - rtc::scoped_refptr(this), - partial_report)); +void RTCStatsCollector::ProducePartialResultsOnNetworkThreadImpl( + int64_t timestamp_us, + const std::map& + transport_stats_by_name, + const std::map& transport_cert_stats, + RTCStatsReport* partial_report) { + RTC_DCHECK(network_thread_->IsCurrent()); + ProduceCertificateStats_n(timestamp_us, transport_cert_stats, partial_report); + ProduceCodecStats_n(timestamp_us, transceiver_stats_infos_, partial_report); + ProduceIceCandidateAndPairStats_n(timestamp_us, transport_stats_by_name, + call_stats_, partial_report); + ProduceRTPStreamStats_n(timestamp_us, transceiver_stats_infos_, + partial_report); + ProduceTransportStats_n(timestamp_us, transport_stats_by_name, + transport_cert_stats, partial_report); +} + +void RTCStatsCollector::MergeNetworkReport_s() { + RTC_DCHECK(signaling_thread_->IsCurrent()); + // The |network_report_event_| must be signaled for it to be safe to touch + // |network_report_|. This is normally not blocking, but if + // WaitForPendingRequest() is called while a request is pending, we might have + // to wait until the network thread is done touching |network_report_|. + network_report_event_.Wait(rtc::Event::kForever); + if (!network_report_) { + // Normally, MergeNetworkReport_s() is executed because it is posted from + // the network thread. But if WaitForPendingRequest() is called while a + // request is pending, an early call to MergeNetworkReport_s() is made, + // merging the report and setting |network_report_| to null. If so, when the + // previously posted MergeNetworkReport_s() is later executed, the report is + // already null and nothing needs to be done here. return; } - AddPartialResults_s(partial_report); -} - -void RTCStatsCollector::AddPartialResults_s( - rtc::scoped_refptr partial_report) { - RTC_DCHECK(signaling_thread_->IsCurrent()); RTC_DCHECK_GT(num_pending_partial_reports_, 0); - if (!partial_report_) - partial_report_ = partial_report; - else - partial_report_->TakeMembersFrom(partial_report); + RTC_DCHECK(partial_report_); + partial_report_->TakeMembersFrom(network_report_); + network_report_ = nullptr; --num_pending_partial_reports_; - if (!num_pending_partial_reports_) { - cache_timestamp_us_ = partial_report_timestamp_us_; - cached_report_ = partial_report_; - partial_report_ = nullptr; - transceiver_stats_infos_.clear(); - // Trace WebRTC Stats when getStats is called on Javascript. - // This allows access to WebRTC stats from trace logs. To enable them, - // select the "webrtc_stats" category when recording traces. - TRACE_EVENT_INSTANT1("webrtc_stats", "webrtc_stats", "report", - cached_report_->ToJson()); + // |network_report_| is currently the only partial report collected + // asynchronously, so |num_pending_partial_reports_| must now be 0 and we are + // ready to deliver the result. + RTC_DCHECK_EQ(num_pending_partial_reports_, 0); + cache_timestamp_us_ = partial_report_timestamp_us_; + cached_report_ = partial_report_; + partial_report_ = nullptr; + transceiver_stats_infos_.clear(); + // Trace WebRTC Stats when getStats is called on Javascript. + // This allows access to WebRTC stats from trace logs. To enable them, + // select the "webrtc_stats" category when recording traces. + TRACE_EVENT_INSTANT1("webrtc_stats", "webrtc_stats", "report", + cached_report_->ToJson()); - // Deliver report and clear |requests_|. - std::vector requests; - requests.swap(requests_); - DeliverCachedReport(cached_report_, std::move(requests)); - } + // Deliver report and clear |requests_|. + std::vector requests; + requests.swap(requests_); + DeliverCachedReport(cached_report_, std::move(requests)); } void RTCStatsCollector::DeliverCachedReport( diff --git a/pc/rtc_stats_collector.h b/pc/rtc_stats_collector.h index a3f159323f..4837fc0abe 100644 --- a/pc/rtc_stats_collector.h +++ b/pc/rtc_stats_collector.h @@ -27,7 +27,7 @@ #include "pc/data_channel.h" #include "pc/peer_connection_internal.h" #include "pc/track_media_info_map.h" -#include "rtc_base/async_invoker.h" +#include "rtc_base/event.h" #include "rtc_base/ref_count.h" #include "rtc_base/ssl_identity.h" #include "rtc_base/third_party/sigslot/sigslot.h" @@ -77,14 +77,21 @@ class RTCStatsCollector : public virtual rtc::RefCountInterface, RTCStatsCollector(PeerConnectionInternal* pc, int64_t cache_lifetime_us); ~RTCStatsCollector(); - // Stats gathering on a particular thread. Calls |AddPartialResults| before - // returning. Virtual for the sake of testing. - virtual void ProducePartialResultsOnSignalingThread(int64_t timestamp_us); - virtual void ProducePartialResultsOnNetworkThread(int64_t timestamp_us); + struct CertificateStatsPair { + std::unique_ptr local; + std::unique_ptr remote; + }; - // Can be called on any thread. - void AddPartialResults( - const rtc::scoped_refptr& partial_report); + // Stats gathering on a particular thread. Virtual for the sake of testing. + virtual void ProducePartialResultsOnSignalingThreadImpl( + int64_t timestamp_us, + RTCStatsReport* partial_report); + virtual void ProducePartialResultsOnNetworkThreadImpl( + int64_t timestamp_us, + const std::map& + transport_stats_by_name, + const std::map& transport_cert_stats, + RTCStatsReport* partial_report); private: class RequestInfo { @@ -130,11 +137,6 @@ class RTCStatsCollector : public virtual rtc::RefCountInterface, void GetStatsReportInternal(RequestInfo request); - struct CertificateStatsPair { - std::unique_ptr local; - std::unique_ptr remote; - }; - // Structure for tracking stats about each RtpTransceiver managed by the // PeerConnection. This can either by a Plan B style or Unified Plan style // transceiver (i.e., can have 0 or many senders and receivers). @@ -150,7 +152,6 @@ class RTCStatsCollector : public virtual rtc::RefCountInterface, std::unique_ptr track_media_info_map; }; - void AddPartialResults_s(rtc::scoped_refptr partial_report); void DeliverCachedReport( rtc::scoped_refptr cached_report, std::vector requests); @@ -211,6 +212,13 @@ class RTCStatsCollector : public virtual rtc::RefCountInterface, std::vector PrepareTransceiverStatsInfos_s() const; std::set PrepareTransportNames_s() const; + // Stats gathering on a particular thread. + void ProducePartialResultsOnSignalingThread(int64_t timestamp_us); + void ProducePartialResultsOnNetworkThread(int64_t timestamp_us); + // Merges |network_report_| into |partial_report_| and completes the request. + // This is a NO-OP if |network_report_| is null. + void MergeNetworkReport_s(); + // Slots for signals (sigslot) that are wired up to |pc_|. void OnDataChannelCreated(DataChannel* channel); // Slots for signals (sigslot) that are wired up to |channel|. @@ -221,12 +229,24 @@ class RTCStatsCollector : public virtual rtc::RefCountInterface, rtc::Thread* const signaling_thread_; rtc::Thread* const worker_thread_; rtc::Thread* const network_thread_; - rtc::AsyncInvoker invoker_; int num_pending_partial_reports_; int64_t partial_report_timestamp_us_; + // Reports that are produced on the signaling thread or the network thread are + // merged into this report. It is only touched on the signaling thread. Once + // all partial reports are merged this is the result of a request. rtc::scoped_refptr partial_report_; std::vector requests_; + // Holds the result of ProducePartialResultsOnNetworkThread(). It is merged + // into |partial_report_| on the signaling thread and then nulled by + // MergeNetworkReport_s(). Thread-safety is ensured by using + // |network_report_event_|. + rtc::scoped_refptr network_report_; + // If set, it is safe to touch the |network_report_| on the signaling thread. + // This is reset before async-invoking ProducePartialResultsOnNetworkThread() + // and set when ProducePartialResultsOnNetworkThread() is complete, after it + // has updated the value of |network_report_|. + rtc::Event network_report_event_; // Set in |GetStatsReport|, read in |ProducePartialResultsOnNetworkThread| and // |ProducePartialResultsOnSignalingThread|, reset after work is complete. Not diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc index f628f82454..3be4ae7071 100644 --- a/pc/rtc_stats_collector_unittest.cc +++ b/pc/rtc_stats_collector_unittest.cc @@ -2249,7 +2249,9 @@ class FakeRTCStatsCollector : public RTCStatsCollector, worker_thread_(pc->worker_thread()), network_thread_(pc->network_thread()) {} - void ProducePartialResultsOnSignalingThread(int64_t timestamp_us) override { + void ProducePartialResultsOnSignalingThreadImpl( + int64_t timestamp_us, + RTCStatsReport* partial_report) override { EXPECT_TRUE(signaling_thread_->IsCurrent()); { rtc::CritScope cs(&lock_); @@ -2257,13 +2259,15 @@ class FakeRTCStatsCollector : public RTCStatsCollector, ++produced_on_signaling_thread_; } - rtc::scoped_refptr signaling_report = - RTCStatsReport::Create(0); - signaling_report->AddStats(std::unique_ptr( + partial_report->AddStats(std::unique_ptr( new RTCTestStats("SignalingThreadStats", timestamp_us))); - AddPartialResults(signaling_report); } - void ProducePartialResultsOnNetworkThread(int64_t timestamp_us) override { + void ProducePartialResultsOnNetworkThreadImpl( + int64_t timestamp_us, + const std::map& + transport_stats_by_name, + const std::map& transport_cert_stats, + RTCStatsReport* partial_report) override { EXPECT_TRUE(network_thread_->IsCurrent()); { rtc::CritScope cs(&lock_); @@ -2271,11 +2275,8 @@ class FakeRTCStatsCollector : public RTCStatsCollector, ++produced_on_network_thread_; } - rtc::scoped_refptr network_report = - RTCStatsReport::Create(0); - network_report->AddStats(std::unique_ptr( + partial_report->AddStats(std::unique_ptr( new RTCTestStats("NetworkThreadStats", timestamp_us))); - AddPartialResults(network_report); } private: