From 20f7456da9775605f4c62d18b0b2aa9f680cad8b Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Thu, 4 Feb 2021 10:22:50 +0100 Subject: [PATCH] Fix unsynchronized access to jsep_transports_by_name_. Also removing need for lock for ice restart flag, fix call paths and add information about how JsepTransportController's events could live fully on the network thread and complexity around signaling thread should be handled by PeerConnection (more details in webrtc:12427). Bug: webrtc:12426, webrtc:12427 Change-Id: I9b1fae8acf16d90d9716054fc3c390700877a82a Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205221 Reviewed-by: Niels Moller Commit-Queue: Tommi Cr-Commit-Position: refs/heads/master@{#33159} --- pc/jsep_transport.cc | 14 +++---- pc/jsep_transport.h | 14 +++---- pc/jsep_transport_controller.cc | 47 +++++++++++++---------- pc/jsep_transport_controller.h | 9 +++-- pc/peer_connection.cc | 48 +++++++++++++++--------- pc/webrtc_session_description_factory.cc | 7 ++-- 6 files changed, 81 insertions(+), 58 deletions(-) diff --git a/pc/jsep_transport.cc b/pc/jsep_transport.cc index 787e9b68df..2d7347b12b 100644 --- a/pc/jsep_transport.cc +++ b/pc/jsep_transport.cc @@ -232,13 +232,11 @@ webrtc::RTCError JsepTransport::SetLocalJsepTransportDescription( local_description_.reset(); return error; } - { - webrtc::MutexLock lock(&accessor_lock_); - if (needs_ice_restart_ && ice_restarting) { - needs_ice_restart_ = false; - RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag cleared for transport " - << mid(); - } + + if (needs_ice_restart_ && ice_restarting) { + needs_ice_restart_ = false; + RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag cleared for transport " + << mid(); } return webrtc::RTCError::OK(); @@ -341,7 +339,7 @@ webrtc::RTCError JsepTransport::AddRemoteCandidates( } void JsepTransport::SetNeedsIceRestartFlag() { - webrtc::MutexLock lock(&accessor_lock_); + RTC_DCHECK_RUN_ON(network_thread_); if (!needs_ice_restart_) { needs_ice_restart_ = true; RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag set for transport " << mid(); diff --git a/pc/jsep_transport.h b/pc/jsep_transport.h index 0260b9374b..2199f5ecc5 100644 --- a/pc/jsep_transport.h +++ b/pc/jsep_transport.h @@ -141,16 +141,14 @@ class JsepTransport : public sigslot::has_slots<> { // set, offers should generate new ufrags/passwords until an ICE restart // occurs. // - // This and the below method can be called safely from any thread as long as - // SetXTransportDescription is not in progress. - // TODO(tommi): Investigate on which threads (network or signal?) we really - // need to access the needs_ice_restart flag. - void SetNeedsIceRestartFlag() RTC_LOCKS_EXCLUDED(accessor_lock_); + // This and |needs_ice_restart()| must be called on the network thread. + void SetNeedsIceRestartFlag(); + // Returns true if the ICE restart flag above was set, and no ICE restart has // occurred yet for this transport (by applying a local description with // changed ufrag/password). - bool needs_ice_restart() const RTC_LOCKS_EXCLUDED(accessor_lock_) { - webrtc::MutexLock lock(&accessor_lock_); + bool needs_ice_restart() const { + RTC_DCHECK_RUN_ON(network_thread_); return needs_ice_restart_; } @@ -335,7 +333,7 @@ class JsepTransport : public sigslot::has_slots<> { mutable webrtc::Mutex accessor_lock_; const std::string mid_; // needs-ice-restart bit as described in JSEP. - bool needs_ice_restart_ RTC_GUARDED_BY(accessor_lock_) = false; + bool needs_ice_restart_ RTC_GUARDED_BY(network_thread_) = false; rtc::scoped_refptr local_certificate_ RTC_GUARDED_BY(network_thread_); std::unique_ptr local_description_ diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index 045c991eea..be049476eb 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -210,6 +210,7 @@ void JsepTransportController::SetIceConfig(const cricket::IceConfig& config) { } void JsepTransportController::SetNeedsIceRestartFlag() { + RTC_DCHECK_RUN_ON(network_thread_); for (auto& kv : jsep_transports_by_name_) { kv.second->SetNeedsIceRestartFlag(); } @@ -217,6 +218,14 @@ void JsepTransportController::SetNeedsIceRestartFlag() { bool JsepTransportController::NeedsIceRestart( const std::string& transport_name) const { + if (!network_thread_->IsCurrent()) { + RTC_DCHECK_RUN_ON(signaling_thread_); + return network_thread_->Invoke( + RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); }); + } + + RTC_DCHECK_RUN_ON(network_thread_); + const cricket::JsepTransport* transport = GetJsepTransportByName(transport_name); if (!transport) { @@ -246,6 +255,8 @@ bool JsepTransportController::SetLocalCertificate( RTC_FROM_HERE, [&] { return SetLocalCertificate(certificate); }); } + RTC_DCHECK_RUN_ON(network_thread_); + // Can't change a certificate, or set a null certificate. if (certificate_ || !certificate) { return false; @@ -273,6 +284,8 @@ JsepTransportController::GetLocalCertificate( RTC_FROM_HERE, [&] { return GetLocalCertificate(transport_name); }); } + RTC_DCHECK_RUN_ON(network_thread_); + const cricket::JsepTransport* t = GetJsepTransportByName(transport_name); if (!t) { return nullptr; @@ -287,6 +300,7 @@ JsepTransportController::GetRemoteSSLCertChain( return network_thread_->Invoke>( RTC_FROM_HERE, [&] { return GetRemoteSSLCertChain(transport_name); }); } + RTC_DCHECK_RUN_ON(network_thread_); // Get the certificate from the RTP transport's DTLS handshake. Should be // identical to the RTCP transport's, since they were given the same remote @@ -324,6 +338,8 @@ RTCError JsepTransportController::AddRemoteCandidates( }); } + RTC_DCHECK_RUN_ON(network_thread_); + // Verify each candidate before passing down to the transport layer. RTCError error = VerifyCandidates(candidates); if (!error.ok()) { @@ -345,6 +361,8 @@ RTCError JsepTransportController::RemoveRemoteCandidates( RTC_FROM_HERE, [&] { return RemoveRemoteCandidates(candidates); }); } + RTC_DCHECK_RUN_ON(network_thread_); + // Verify each candidate before passing down to the transport layer. RTCError error = VerifyCandidates(candidates); if (!error.ok()) { @@ -392,6 +410,8 @@ bool JsepTransportController::GetStats(const std::string& transport_name, RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); }); } + RTC_DCHECK_RUN_ON(network_thread_); + cricket::JsepTransport* transport = GetJsepTransportByName(transport_name); if (!transport) { return false; @@ -450,7 +470,7 @@ std::unique_ptr JsepTransportController::CreateDtlsTransport( const cricket::ContentInfo& content_info, cricket::IceTransportInternal* ice) { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread_); std::unique_ptr dtls; @@ -504,7 +524,7 @@ JsepTransportController::CreateUnencryptedRtpTransport( const std::string& transport_name, rtc::PacketTransportInternal* rtp_packet_transport, rtc::PacketTransportInternal* rtcp_packet_transport) { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread_); auto unencrypted_rtp_transport = std::make_unique(rtcp_packet_transport == nullptr); unencrypted_rtp_transport->SetRtpPacketTransport(rtp_packet_transport); @@ -519,7 +539,7 @@ JsepTransportController::CreateSdesTransport( const std::string& transport_name, cricket::DtlsTransportInternal* rtp_dtls_transport, cricket::DtlsTransportInternal* rtcp_dtls_transport) { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread_); auto srtp_transport = std::make_unique(rtcp_dtls_transport == nullptr); RTC_DCHECK(rtp_dtls_transport); @@ -555,6 +575,7 @@ JsepTransportController::CreateDtlsSrtpTransport( std::vector JsepTransportController::GetDtlsTransports() { + RTC_DCHECK_RUN_ON(network_thread_); std::vector dtls_transports; for (auto it = jsep_transports_by_name_.begin(); it != jsep_transports_by_name_.end(); ++it) { @@ -1066,8 +1087,6 @@ void JsepTransportController::MaybeDestroyJsepTransport( } void JsepTransportController::DestroyAllJsepTransports_n() { - RTC_DCHECK(network_thread_->IsCurrent()); - for (const auto& jsep_transport : jsep_transports_by_name_) { config_.transport_observer->OnTransportChanged(jsep_transport.first, nullptr, nullptr, nullptr); @@ -1077,10 +1096,9 @@ void JsepTransportController::DestroyAllJsepTransports_n() { } void JsepTransportController::SetIceRole_n(cricket::IceRole ice_role) { - RTC_DCHECK(network_thread_->IsCurrent()); - ice_role_ = ice_role; - for (auto& dtls : GetDtlsTransports()) { + auto dtls_transports = GetDtlsTransports(); + for (auto& dtls : dtls_transports) { dtls->ice_transport()->SetIceRole(ice_role_); } } @@ -1135,7 +1153,6 @@ cricket::IceRole JsepTransportController::DetermineIceRole( void JsepTransportController::OnTransportWritableState_n( rtc::PacketTransportInternal* transport) { - RTC_DCHECK(network_thread_->IsCurrent()); RTC_LOG(LS_INFO) << " Transport " << transport->transport_name() << " writability changed to " << transport->writable() << "."; @@ -1144,27 +1161,25 @@ void JsepTransportController::OnTransportWritableState_n( void JsepTransportController::OnTransportReceivingState_n( rtc::PacketTransportInternal* transport) { - RTC_DCHECK(network_thread_->IsCurrent()); UpdateAggregateStates_n(); } void JsepTransportController::OnTransportGatheringState_n( cricket::IceTransportInternal* transport) { - RTC_DCHECK(network_thread_->IsCurrent()); UpdateAggregateStates_n(); } void JsepTransportController::OnTransportCandidateGathered_n( cricket::IceTransportInternal* transport, const cricket::Candidate& candidate) { - RTC_DCHECK(network_thread_->IsCurrent()); - // We should never signal peer-reflexive candidates. if (candidate.type() == cricket::PRFLX_PORT_TYPE) { RTC_NOTREACHED(); return; } std::string transport_name = transport->transport_name(); + // TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be + // able to just call this directly here. invoker_.AsyncInvoke( RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] { signal_ice_candidates_gathered_.Send( @@ -1175,8 +1190,6 @@ void JsepTransportController::OnTransportCandidateGathered_n( void JsepTransportController::OnTransportCandidateError_n( cricket::IceTransportInternal* transport, const cricket::IceCandidateErrorEvent& event) { - RTC_DCHECK(network_thread_->IsCurrent()); - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this, event] { signal_ice_candidate_error_.Send(event); }); @@ -1197,7 +1210,6 @@ void JsepTransportController::OnTransportCandidatePairChanged_n( void JsepTransportController::OnTransportRoleConflict_n( cricket::IceTransportInternal* transport) { - RTC_DCHECK(network_thread_->IsCurrent()); // Note: since the role conflict is handled entirely on the network thread, // we don't need to worry about role conflicts occurring on two ports at // once. The first one encountered should immediately reverse the role. @@ -1214,7 +1226,6 @@ void JsepTransportController::OnTransportRoleConflict_n( void JsepTransportController::OnTransportStateChanged_n( cricket::IceTransportInternal* transport) { - RTC_DCHECK(network_thread_->IsCurrent()); RTC_LOG(LS_INFO) << transport->transport_name() << " Transport " << transport->component() << " state changed. Check if state is complete."; @@ -1222,8 +1233,6 @@ void JsepTransportController::OnTransportStateChanged_n( } void JsepTransportController::UpdateAggregateStates_n() { - RTC_DCHECK(network_thread_->IsCurrent()); - auto dtls_transports = GetDtlsTransports(); cricket::IceConnectionState new_connection_state = cricket::kIceConnectionConnecting; diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h index 3dab284a76..506a41808a 100644 --- a/pc/jsep_transport_controller.h +++ b/pc/jsep_transport_controller.h @@ -227,6 +227,8 @@ class JsepTransportController : public sigslot::has_slots<> { // F: void(const std::string&, const std::vector&) template void SubscribeIceCandidateGathered(F&& callback) { + // TODO(bugs.webrtc.org/12427): Post this subscription to the network + // thread. signal_ice_candidates_gathered_.AddReceiver(std::forward(callback)); } @@ -294,6 +296,7 @@ class JsepTransportController : public sigslot::has_slots<> { CallbackList signal_ice_gathering_state_; // [mid, candidates] + // TODO(bugs.webrtc.org/12427): Protect this with network_thread_. CallbackList&> signal_ice_candidates_gathered_; @@ -366,9 +369,9 @@ class JsepTransportController : public sigslot::has_slots<> { // Get the JsepTransport without considering the BUNDLE group. Return nullptr // if the JsepTransport is destroyed. const cricket::JsepTransport* GetJsepTransportByName( - const std::string& transport_name) const; + const std::string& transport_name) const RTC_RUN_ON(network_thread_); cricket::JsepTransport* GetJsepTransportByName( - const std::string& transport_name); + const std::string& transport_name) RTC_RUN_ON(network_thread_); // Creates jsep transport. Noop if transport is already created. // Transport is created either during SetLocalDescription (|local| == true) or @@ -454,7 +457,7 @@ class JsepTransportController : public sigslot::has_slots<> { AsyncResolverFactory* const async_resolver_factory_ = nullptr; std::map> - jsep_transports_by_name_; + jsep_transports_by_name_ RTC_GUARDED_BY(network_thread_); // This keeps track of the mapping between media section // (BaseChannel/SctpTransport) and the JsepTransport underneath. std::map mid_to_transport_; diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index c3ffa290d9..2cb43bf408 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -528,6 +528,9 @@ RTCError PeerConnection::Initialize( // The port allocator lives on the network thread and should be initialized // there. + // TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and + // initialize all the |transport_controller_->Subscribe*| calls below on the + // network thread via this invoke. const auto pa_result = network_thread()->Invoke( RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] { @@ -620,6 +623,10 @@ RTCError PeerConnection::Initialize( // due to lack of unit tests which trigger these scenarios. // TODO(bugs.webrtc.org/12160): Remove above comments. // callbacks for signaling_thread. + // TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network + // Invoke(), then perhaps we could post these subscription calls to the + // network thread so that the transport controller doesn't have to do the + // signaling/network handling internally and use AsyncInvoker. transport_controller_->SubscribeIceConnectionState( [this](cricket::IceConnectionState s) { RTC_DCHECK_RUN_ON(signaling_thread()); @@ -1379,10 +1386,29 @@ RTCError PeerConnection::SetConfiguration( const bool has_local_description = local_description() != nullptr; - // In theory this shouldn't fail. + const bool needs_ice_restart = + modified_config.servers != configuration_.servers || + NeedIceRestart( + configuration_.surface_ice_candidates_on_ice_transport_type_changed, + configuration_.type, modified_config.type) || + modified_config.GetTurnPortPrunePolicy() != + configuration_.GetTurnPortPrunePolicy(); + cricket::IceConfig ice_config = ParseIceConfig(modified_config); + + // Apply part of the configuration on the network thread. In theory this + // shouldn't fail. if (!network_thread()->Invoke( - RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &modified_config, - has_local_description] { + RTC_FROM_HERE, + [this, needs_ice_restart, &ice_config, &stun_servers, &turn_servers, + &modified_config, has_local_description] { + // As described in JSEP, calling setConfiguration with new ICE + // servers or candidate policy must set a "needs-ice-restart" bit so + // that the next offer triggers an ICE restart which will pick up + // the changes. + if (needs_ice_restart) + transport_controller_->SetNeedsIceRestartFlag(); + + transport_controller_->SetIceConfig(ice_config); return ReconfigurePortAllocator_n( stun_servers, turn_servers, modified_config.type, modified_config.ice_candidate_pool_size, @@ -1395,20 +1421,6 @@ RTCError PeerConnection::SetConfiguration( "Failed to apply configuration to PortAllocator."); } - // As described in JSEP, calling setConfiguration with new ICE servers or - // candidate policy must set a "needs-ice-restart" bit so that the next offer - // triggers an ICE restart which will pick up the changes. - if (modified_config.servers != configuration_.servers || - NeedIceRestart( - configuration_.surface_ice_candidates_on_ice_transport_type_changed, - configuration_.type, modified_config.type) || - modified_config.GetTurnPortPrunePolicy() != - configuration_.GetTurnPortPrunePolicy()) { - transport_controller_->SetNeedsIceRestartFlag(); - } - - transport_controller_->SetIceConfig(ParseIceConfig(modified_config)); - if (configuration_.active_reset_srtp_params != modified_config.active_reset_srtp_params) { transport_controller_->SetActiveResetSrtpParams( @@ -2155,6 +2167,8 @@ void PeerConnection::OnTransportControllerConnectionState( void PeerConnection::OnTransportControllerCandidatesGathered( const std::string& transport_name, const cricket::Candidates& candidates) { + // TODO(bugs.webrtc.org/12427): Expect this to come in on the network thread + // (not signaling as it currently does), handle appropriately. int sdp_mline_index; if (!GetLocalCandidateMediaIndex(transport_name, &sdp_mline_index)) { RTC_LOG(LS_ERROR) diff --git a/pc/webrtc_session_description_factory.cc b/pc/webrtc_session_description_factory.cc index 2a9dc3fbd8..348016d2d6 100644 --- a/pc/webrtc_session_description_factory.cc +++ b/pc/webrtc_session_description_factory.cc @@ -194,7 +194,7 @@ WebRtcSessionDescriptionFactory::WebRtcSessionDescriptionFactory( } WebRtcSessionDescriptionFactory::~WebRtcSessionDescriptionFactory() { - RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(signaling_thread_); // Fail any requests that were asked for before identity generation completed. FailPendingRequests(kFailedDueToSessionShutdown); @@ -222,6 +222,7 @@ void WebRtcSessionDescriptionFactory::CreateOffer( CreateSessionDescriptionObserver* observer, const PeerConnectionInterface::RTCOfferAnswerOptions& options, const cricket::MediaSessionOptions& session_options) { + RTC_DCHECK_RUN_ON(signaling_thread_); std::string error = "CreateOffer"; if (certificate_request_state_ == CERTIFICATE_FAILED) { error += kFailedDueToIdentityFailed; @@ -441,7 +442,7 @@ void WebRtcSessionDescriptionFactory::InternalCreateAnswer( void WebRtcSessionDescriptionFactory::FailPendingRequests( const std::string& reason) { - RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(signaling_thread_); while (!create_session_description_requests_.empty()) { const CreateSessionDescriptionRequest& request = create_session_description_requests_.front(); @@ -476,7 +477,7 @@ void WebRtcSessionDescriptionFactory::PostCreateSessionDescriptionSucceeded( } void WebRtcSessionDescriptionFactory::OnCertificateRequestFailed() { - RTC_DCHECK(signaling_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_LOG(LS_ERROR) << "Asynchronous certificate generation request failed."; certificate_request_state_ = CERTIFICATE_FAILED;