diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index 61288b2b1f..312b1280b1 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -153,11 +153,7 @@ rtc::scoped_refptr JsepTransportController::GetSctpTransport( } void JsepTransportController::SetIceConfig(const cricket::IceConfig& config) { - if (!network_thread_->IsCurrent()) { - network_thread_->Invoke(RTC_FROM_HERE, [&] { SetIceConfig(config); }); - return; - } - + RTC_DCHECK_RUN_ON(network_thread_); ice_config_ = config; for (auto& dtls : GetDtlsTransports()) { dtls->ice_transport()->SetIceConfig(ice_config_); @@ -233,11 +229,6 @@ bool JsepTransportController::SetLocalCertificate( rtc::scoped_refptr JsepTransportController::GetLocalCertificate( const std::string& transport_name) const { - if (!network_thread_->IsCurrent()) { - return network_thread_->Invoke>( - RTC_FROM_HERE, [&] { return GetLocalCertificate(transport_name); }); - } - RTC_DCHECK_RUN_ON(network_thread_); const cricket::JsepTransport* t = GetJsepTransportByName(transport_name); @@ -250,10 +241,6 @@ JsepTransportController::GetLocalCertificate( std::unique_ptr JsepTransportController::GetRemoteSSLCertChain( const std::string& transport_name) const { - if (!network_thread_->IsCurrent()) { - return network_thread_->Invoke>( - RTC_FROM_HERE, [&] { return GetRemoteSSLCertChain(transport_name); }); - } RTC_DCHECK_RUN_ON(network_thread_); // Get the certificate from the RTP transport's DTLS handshake. Should be diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 97dca34f75..657b6a3c61 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -2190,17 +2190,22 @@ absl::optional PeerConnection::sctp_transport_name() const { } cricket::CandidateStatsList PeerConnection::GetPooledCandidateStats() const { + RTC_DCHECK_RUN_ON(network_thread()); + if (!network_thread_safety_->alive()) + return {}; cricket::CandidateStatsList candidate_states_list; - network_thread()->Invoke(RTC_FROM_HERE, [this, &candidate_states_list] { - port_allocator_->GetCandidateStatsFromPooledSessions( - &candidate_states_list); - }); + port_allocator_->GetCandidateStatsFromPooledSessions(&candidate_states_list); return candidate_states_list; } std::map PeerConnection::GetTransportNamesByMid() const { - RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK_RUN_ON(network_thread()); + rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; + + if (!network_thread_safety_->alive()) + return {}; + std::map transport_names_by_mid; for (const auto& transceiver : rtp_manager()->transceivers()->List()) { cricket::ChannelInterface* channel = transceiver->internal()->channel(); @@ -2214,10 +2219,10 @@ std::map PeerConnection::GetTransportNamesByMid() ->content_name()] = data_channel_controller_.rtp_data_channel()->transport_name(); } - if (data_channel_controller_.data_channel_transport()) { - absl::optional transport_name = sctp_transport_name(); - RTC_DCHECK(transport_name); - transport_names_by_mid[*sctp_mid_s_] = *transport_name; + if (sctp_mid_n_) { + cricket::DtlsTransportInternal* dtls_transport = + transport_controller_->GetDtlsTransport(*sctp_mid_n_); + transport_names_by_mid[*sctp_mid_n_] = dtls_transport->transport_name(); } return transport_names_by_mid; } @@ -2225,13 +2230,11 @@ std::map PeerConnection::GetTransportNamesByMid() std::map PeerConnection::GetTransportStatsByNames( const std::set& transport_names) { - if (!network_thread()->IsCurrent()) { - return network_thread() - ->Invoke>( - RTC_FROM_HERE, - [&] { return GetTransportStatsByNames(transport_names); }); - } RTC_DCHECK_RUN_ON(network_thread()); + if (!network_thread_safety_->alive()) + return {}; + + rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; std::map transport_stats_by_name; for (const std::string& transport_name : transport_names) { cricket::TransportStats transport_stats; @@ -2250,7 +2253,8 @@ PeerConnection::GetTransportStatsByNames( bool PeerConnection::GetLocalCertificate( const std::string& transport_name, rtc::scoped_refptr* certificate) { - if (!certificate) { + RTC_DCHECK_RUN_ON(network_thread()); + if (!network_thread_safety_->alive() || !certificate) { return false; } *certificate = transport_controller_->GetLocalCertificate(transport_name); @@ -2259,6 +2263,7 @@ bool PeerConnection::GetLocalCertificate( std::unique_ptr PeerConnection::GetRemoteSSLCertChain( const std::string& transport_name) { + RTC_DCHECK_RUN_ON(network_thread()); return transport_controller_->GetRemoteSSLCertChain(transport_name); } @@ -2427,7 +2432,8 @@ bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) { data_channel_controller_.set_data_channel_transport(transport); data_channel_controller_.SetupDataChannelTransport_n(); sctp_mid_n_ = mid; - auto dtls_transport = transport_controller_->GetDtlsTransport(mid); + cricket::DtlsTransportInternal* dtls_transport = + transport_controller_->GetDtlsTransport(mid); if (dtls_transport) { signaling_thread()->PostTask( ToQueuedTask(signaling_thread_safety_.flag(), @@ -2710,7 +2716,8 @@ void PeerConnection::ReportTransportStats() { } if (sctp_mid_n_) { - auto dtls_transport = transport_controller_->GetDtlsTransport(*sctp_mid_n_); + cricket::DtlsTransportInternal* dtls_transport = + transport_controller_->GetDtlsTransport(*sctp_mid_n_); if (dtls_transport) { media_types_by_transport_name[dtls_transport->transport_name()].insert( cricket::MEDIA_TYPE_DATA); diff --git a/pc/stats_collector.cc b/pc/stats_collector.cc index 5641061240..fdcbb8aac6 100644 --- a/pc/stats_collector.cc +++ b/pc/stats_collector.cc @@ -540,7 +540,7 @@ StatsCollector::StatsCollector(PeerConnectionInternal* pc) } StatsCollector::~StatsCollector() { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); } // Wallclock time in ms. @@ -551,7 +551,7 @@ double StatsCollector::GetTimeNow() { // Adds a MediaStream with tracks that can be used as a |selector| in a call // to GetStats. void StatsCollector::AddStream(MediaStreamInterface* stream) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); RTC_DCHECK(stream != NULL); CreateTrackReports(stream->GetAudioTracks(), &reports_, @@ -574,7 +574,7 @@ void StatsCollector::AddTrack(MediaStreamTrackInterface* track) { void StatsCollector::AddLocalAudioTrack(AudioTrackInterface* audio_track, uint32_t ssrc) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); RTC_DCHECK(audio_track != NULL); #if RTC_DCHECK_IS_ON for (const auto& track : local_audio_tracks_) @@ -608,7 +608,7 @@ void StatsCollector::RemoveLocalAudioTrack(AudioTrackInterface* audio_track, void StatsCollector::GetStats(MediaStreamTrackInterface* track, StatsReports* reports) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); RTC_DCHECK(reports != NULL); RTC_DCHECK(reports->empty()); @@ -648,7 +648,7 @@ void StatsCollector::GetStats(MediaStreamTrackInterface* track, void StatsCollector::UpdateStats( PeerConnectionInterface::StatsOutputLevel level) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); // Calls to UpdateStats() that occur less than kMinGatherStatsPeriodMs apart // will be ignored. Using a monotonic clock specifically for this, while using // a UTC clock for the reports themselves. @@ -661,15 +661,20 @@ void StatsCollector::UpdateStats( cache_timestamp_ms_ = cache_now_ms; stats_gathering_started_ = GetTimeNow(); + // TODO(tommi): ExtractSessionInfo now has a single hop to the network thread + // to fetch stats, then applies them on the signaling thread. See if we need + // to do this synchronously or if updating the stats without blocking is safe. + std::map transport_names_by_mid = + ExtractSessionInfo(); + // TODO(tommi): All of these hop over to the worker thread to fetch // information. We could use an AsyncInvoker to run all of these and post // the information back to the signaling thread where we can create and // update stats reports. That would also clean up the threading story a bit // since we'd be creating/updating the stats report objects consistently on // the same thread (this class has no locks right now). - ExtractSessionInfo(); ExtractBweInfo(); - ExtractMediaInfo(); + ExtractMediaInfo(transport_names_by_mid); ExtractSenderInfo(); ExtractDataInfo(); UpdateTrackReports(); @@ -680,7 +685,7 @@ StatsReport* StatsCollector::PrepareReport(bool local, const std::string& track_id, const StatsReport::Id& transport_id, StatsReport::Direction direction) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); StatsReport::Id id(StatsReport::NewIdWithDirection( local ? StatsReport::kStatsReportTypeSsrc : StatsReport::kStatsReportTypeRemoteSsrc, @@ -703,7 +708,7 @@ StatsReport* StatsCollector::PrepareReport(bool local, } StatsReport* StatsCollector::PrepareADMReport() { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); StatsReport::Id id(StatsReport::NewTypedId( StatsReport::kStatsReportTypeSession, pc_->session_id())); StatsReport* report = reports_.FindOrAddNew(id); @@ -717,7 +722,7 @@ bool StatsCollector::IsValidTrack(const std::string& track_id) { StatsReport* StatsCollector::AddCertificateReports( std::unique_ptr cert_stats) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); StatsReport* first_report = nullptr; StatsReport* prev_report = nullptr; @@ -843,35 +848,36 @@ StatsReport* StatsCollector::AddCandidateReport( return report; } -void StatsCollector::ExtractSessionInfo() { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); +std::map StatsCollector::ExtractSessionInfo() { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); - // Extract information from the base session. - StatsReport::Id id(StatsReport::NewTypedId( - StatsReport::kStatsReportTypeSession, pc_->session_id())); - StatsReport* report = reports_.ReplaceOrAddNew(id); - report->set_timestamp(stats_gathering_started_); - report->AddBoolean(StatsReport::kStatsValueNameInitiator, - pc_->initial_offerer()); + SessionStats stats; + pc_->network_thread()->Invoke( + RTC_FROM_HERE, [this, &stats] { stats = ExtractSessionInfo_n(); }); - cricket::CandidateStatsList pooled_candidate_stats_list = - pc_->GetPooledCandidateStats(); + ExtractSessionInfo_s(stats); - for (const cricket::CandidateStats& stats : pooled_candidate_stats_list) { - AddCandidateReport(stats, true); - } + return std::move(stats.transport_names_by_mid); +} + +StatsCollector::SessionStats StatsCollector::ExtractSessionInfo_n() { + RTC_DCHECK_RUN_ON(pc_->network_thread()); + rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; + SessionStats stats; + stats.candidate_stats = pc_->GetPooledCandidateStats(); + stats.transport_names_by_mid = pc_->GetTransportNamesByMid(); std::set transport_names; - for (const auto& entry : pc_->GetTransportNamesByMid()) { + for (const auto& entry : stats.transport_names_by_mid) { transport_names.insert(entry.second); } std::map transport_stats_by_name = pc_->GetTransportStatsByNames(transport_names); - for (const auto& entry : transport_stats_by_name) { - const std::string& transport_name = entry.first; - const cricket::TransportStats& transport_stats = entry.second; + for (auto& entry : transport_stats_by_name) { + stats.transport_stats.emplace_back(entry.first, std::move(entry.second)); + TransportStats& transport = stats.transport_stats.back(); // Attempt to get a copy of the certificates from the transport and // expose them in stats reports. All channels in a transport share the @@ -879,24 +885,59 @@ void StatsCollector::ExtractSessionInfo() { // StatsReport::Id local_cert_report_id, remote_cert_report_id; rtc::scoped_refptr certificate; - if (pc_->GetLocalCertificate(transport_name, &certificate)) { - StatsReport* r = AddCertificateReports( - certificate->GetSSLCertificateChain().GetStats()); + if (pc_->GetLocalCertificate(transport.name, &certificate)) { + transport.local_cert_stats = + certificate->GetSSLCertificateChain().GetStats(); + } + + std::unique_ptr remote_cert_chain = + pc_->GetRemoteSSLCertChain(transport.name); + if (remote_cert_chain) { + transport.remote_cert_stats = remote_cert_chain->GetStats(); + } + } + + return stats; +} + +void StatsCollector::ExtractSessionInfo_s(SessionStats& session_stats) { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); + rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; + + StatsReport::Id id(StatsReport::NewTypedId( + StatsReport::kStatsReportTypeSession, pc_->session_id())); + StatsReport* report = reports_.ReplaceOrAddNew(id); + report->set_timestamp(stats_gathering_started_); + report->AddBoolean(StatsReport::kStatsValueNameInitiator, + pc_->initial_offerer()); + + for (const cricket::CandidateStats& stats : session_stats.candidate_stats) { + AddCandidateReport(stats, true); + } + + for (auto& transport : session_stats.transport_stats) { + // Attempt to get a copy of the certificates from the transport and + // expose them in stats reports. All channels in a transport share the + // same local and remote certificates. + // + StatsReport::Id local_cert_report_id, remote_cert_report_id; + if (transport.local_cert_stats) { + StatsReport* r = + AddCertificateReports(std::move(transport.local_cert_stats)); if (r) local_cert_report_id = r->id(); } - std::unique_ptr remote_cert_chain = - pc_->GetRemoteSSLCertChain(transport_name); - if (remote_cert_chain) { - StatsReport* r = AddCertificateReports(remote_cert_chain->GetStats()); + if (transport.remote_cert_stats) { + StatsReport* r = + AddCertificateReports(std::move(transport.remote_cert_stats)); if (r) remote_cert_report_id = r->id(); } - for (const auto& channel_iter : transport_stats.channel_stats) { + for (const auto& channel_iter : transport.stats.channel_stats) { StatsReport::Id id( - StatsReport::NewComponentId(transport_name, channel_iter.component)); + StatsReport::NewComponentId(transport.name, channel_iter.component)); StatsReport* channel_report = reports_.ReplaceOrAddNew(id); channel_report->set_timestamp(stats_gathering_started_); channel_report->AddInt(StatsReport::kStatsValueNameComponent, @@ -939,7 +980,7 @@ void StatsCollector::ExtractSessionInfo() { for (const cricket::ConnectionInfo& info : channel_iter.ice_transport_stats.connection_infos) { StatsReport* connection_report = AddConnectionInfoReport( - transport_name, channel_iter.component, connection_id++, + transport.name, channel_iter.component, connection_id++, channel_report->id(), info); if (info.best_connection) { channel_report->AddId( @@ -952,7 +993,7 @@ void StatsCollector::ExtractSessionInfo() { } void StatsCollector::ExtractBweInfo() { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); if (pc_->signaling_state() == PeerConnectionInterface::kClosed) return; @@ -1087,7 +1128,8 @@ std::unique_ptr CreateMediaChannelStatsGatherer( } // namespace -void StatsCollector::ExtractMediaInfo() { +void StatsCollector::ExtractMediaInfo( + const std::map& transport_names_by_mid) { RTC_DCHECK_RUN_ON(pc_->signaling_thread()); std::vector> gatherers; @@ -1102,7 +1144,8 @@ void StatsCollector::ExtractMediaInfo() { std::unique_ptr gatherer = CreateMediaChannelStatsGatherer(channel->media_channel()); gatherer->mid = channel->content_name(); - gatherer->transport_name = channel->transport_name(); + gatherer->transport_name = transport_names_by_mid.at(gatherer->mid); + for (const auto& sender : transceiver->internal()->senders()) { std::string track_id = (sender->track() ? sender->track()->id() : ""); gatherer->sender_track_id_by_ssrc.insert( @@ -1143,7 +1186,7 @@ void StatsCollector::ExtractMediaInfo() { } void StatsCollector::ExtractSenderInfo() { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); for (const auto& sender : pc_->GetSenders()) { // TODO(nisse): SSRC == 0 currently means none. Delete check when @@ -1176,7 +1219,7 @@ void StatsCollector::ExtractSenderInfo() { } void StatsCollector::ExtractDataInfo() { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; @@ -1200,7 +1243,7 @@ void StatsCollector::ExtractDataInfo() { StatsReport* StatsCollector::GetReport(const StatsReport::StatsType& type, const std::string& id, StatsReport::Direction direction) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); RTC_DCHECK(type == StatsReport::kStatsReportTypeSsrc || type == StatsReport::kStatsReportTypeRemoteSsrc); return reports_.Find(StatsReport::NewIdWithDirection(type, id, direction)); @@ -1208,7 +1251,7 @@ StatsReport* StatsCollector::GetReport(const StatsReport::StatsType& type, void StatsCollector::UpdateStatsFromExistingLocalAudioTracks( bool has_remote_tracks) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); // Loop through the existing local audio tracks. for (const auto& it : local_audio_tracks_) { AudioTrackInterface* track = it.first; @@ -1236,7 +1279,7 @@ void StatsCollector::UpdateStatsFromExistingLocalAudioTracks( void StatsCollector::UpdateReportFromAudioTrack(AudioTrackInterface* track, StatsReport* report, bool has_remote_tracks) { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); RTC_DCHECK(track != NULL); // Don't overwrite report values if they're not available. @@ -1258,7 +1301,7 @@ void StatsCollector::UpdateReportFromAudioTrack(AudioTrackInterface* track, } void StatsCollector::UpdateTrackReports() { - RTC_DCHECK(pc_->signaling_thread()->IsCurrent()); + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; diff --git a/pc/stats_collector.h b/pc/stats_collector.h index 02a1ced116..eaefc438f2 100644 --- a/pc/stats_collector.h +++ b/pc/stats_collector.h @@ -55,7 +55,7 @@ class StatsCollector : public StatsCollectorInterface { explicit StatsCollector(PeerConnectionInternal* pc); virtual ~StatsCollector(); - // Adds a MediaStream with tracks that can be used as a |selector| in a call + // Adds a MediaStream with tracks that can be used as a `selector` in a call // to GetStats. void AddStream(MediaStreamInterface* stream); void AddTrack(MediaStreamTrackInterface* track); @@ -73,12 +73,12 @@ class StatsCollector : public StatsCollectorInterface { void UpdateStats(PeerConnectionInterface::StatsOutputLevel level); // Gets a StatsReports of the last collected stats. Note that UpdateStats must - // be called before this function to get the most recent stats. |selector| is + // be called before this function to get the most recent stats. `selector` is // a track label or empty string. The most recent reports are stored in - // |reports|. + // `reports`. // TODO(tommi): Change this contract to accept a callback object instead - // of filling in |reports|. As is, there's a requirement that the caller - // uses |reports| immediately without allowing any async activity on + // of filling in `reports`. As is, there's a requirement that the caller + // uses `reports` immediately without allowing any async activity on // the thread (message handling etc) and then discard the results. void GetStats(MediaStreamTrackInterface* track, StatsReports* reports) override; @@ -106,19 +106,48 @@ class StatsCollector : public StatsCollectorInterface { private: friend class StatsCollectorTest; + // Struct that's populated on the network thread and carries the values to + // the signaling thread where the stats are added to the stats reports. + struct TransportStats { + TransportStats() = default; + TransportStats(std::string transport_name, + cricket::TransportStats transport_stats) + : name(std::move(transport_name)), stats(std::move(transport_stats)) {} + TransportStats(TransportStats&&) = default; + TransportStats(const TransportStats&) = delete; + + std::string name; + cricket::TransportStats stats; + std::unique_ptr local_cert_stats; + std::unique_ptr remote_cert_stats; + }; + + struct SessionStats { + SessionStats() = default; + SessionStats(SessionStats&&) = default; + SessionStats(const SessionStats&) = delete; + + SessionStats& operator=(SessionStats&&) = default; + SessionStats& operator=(SessionStats&) = delete; + + cricket::CandidateStatsList candidate_stats; + std::vector transport_stats; + std::map transport_names_by_mid; + }; + // Overridden in unit tests to fake timing. virtual double GetTimeNow(); bool CopySelectedReports(const std::string& selector, StatsReports* reports); - // Helper method for creating IceCandidate report. |is_local| indicates + // Helper method for creating IceCandidate report. `is_local` indicates // whether this candidate is local or remote. StatsReport* AddCandidateReport( const cricket::CandidateStats& candidate_stats, bool local); // Adds a report for this certificate and every certificate in its chain, and - // returns the leaf certificate's report (|cert_stats|'s report). + // returns the leaf certificate's report (`cert_stats`'s report). StatsReport* AddCertificateReports( std::unique_ptr cert_stats); @@ -129,9 +158,14 @@ class StatsCollector : public StatsCollectorInterface { const cricket::ConnectionInfo& info); void ExtractDataInfo(); - void ExtractSessionInfo(); + + // Returns the `transport_names_by_mid` member from the SessionStats as + // gathered and used to populate the stats. + std::map ExtractSessionInfo(); + void ExtractBweInfo(); - void ExtractMediaInfo(); + void ExtractMediaInfo( + const std::map& transport_names_by_mid); void ExtractSenderInfo(); webrtc::StatsReport* GetReport(const StatsReport::StatsType& type, const std::string& id, @@ -146,6 +180,9 @@ class StatsCollector : public StatsCollectorInterface { // Helper method to update the timestamp of track records. void UpdateTrackReports(); + SessionStats ExtractSessionInfo_n(); + void ExtractSessionInfo_s(SessionStats& session_stats); + // A collection for all of our stats reports. StatsCollection reports_; TrackIdMap track_ids_;