From bc32c56f83f17314f8b4dc4e5c3938a74f80cd94 Mon Sep 17 00:00:00 2001 From: Harald Alvestrand Date: Wed, 9 Feb 2022 12:08:47 +0000 Subject: [PATCH] Move pc.transport_controller_ to be network thread only A pointer to the transport controller is now maintained on both the network thread and the signaling thread. We use thread specific accessors to make it explicit which copy we are accessing at any given time. We also move the initial offerer value to the SDP offer/answer class; this is determined on the basis of SDP offer/answer, so there is no need to hop to the network thread for that. Work in progress. Bug: webrtc:9987 Change-Id: Idbe5a7fbf44f667adcd119e486133cf6e43ab1f5 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/251382 Reviewed-by: Tomas Gunnarsson Commit-Queue: Harald Alvestrand Cr-Commit-Position: refs/heads/main@{#35965} --- pc/peer_connection.cc | 70 +++++++++++++++++++---------- pc/peer_connection.h | 22 ++++++--- pc/peer_connection_internal.h | 3 +- pc/sdp_offer_answer.cc | 59 ++++++++++++++++-------- pc/sdp_offer_answer.h | 23 +++++++++- pc/test/fake_peer_connection_base.h | 3 +- 6 files changed, 128 insertions(+), 52 deletions(-) diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 4e951a534b..b80725f21f 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -567,6 +567,7 @@ PeerConnection::~PeerConnection() { // port_allocator_ and transport_controller_ live on the network thread and // should be destroyed there. + transport_controller_copy_ = nullptr; network_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(network_thread()); TeardownDataChannelTransport_n(); @@ -615,20 +616,20 @@ RTCError PeerConnection::Initialize( } // Network thread initialization. - network_thread()->Invoke(RTC_FROM_HERE, [this, &stun_servers, - &turn_servers, &configuration, - &dependencies] { - RTC_DCHECK_RUN_ON(network_thread()); - network_thread_safety_ = PendingTaskSafetyFlag::Create(); - InitializePortAllocatorResult pa_result = - InitializePortAllocator_n(stun_servers, turn_servers, configuration); - // Send information about IPv4/IPv6 status. - PeerConnectionAddressFamilyCounter address_family = - pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4; - RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family, - kPeerConnectionAddressFamilyCounter_Max); - InitializeTransportController_n(configuration, dependencies); - }); + transport_controller_copy_ = + network_thread()->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(network_thread()); + network_thread_safety_ = PendingTaskSafetyFlag::Create(); + InitializePortAllocatorResult pa_result = InitializePortAllocator_n( + stun_servers, turn_servers, configuration); + // Send information about IPv4/IPv6 status. + PeerConnectionAddressFamilyCounter address_family = + pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4; + RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", + address_family, + kPeerConnectionAddressFamilyCounter_Max); + return InitializeTransportController_n(configuration, dependencies); + }); configuration_ = configuration; @@ -674,7 +675,7 @@ RTCError PeerConnection::Initialize( return RTCError::OK(); } -void PeerConnection::InitializeTransportController_n( +JsepTransportController* PeerConnection::InitializeTransportController_n( const RTCConfiguration& configuration, const PeerConnectionDependencies& dependencies) { JsepTransportController::Config config; @@ -793,6 +794,7 @@ void PeerConnection::InitializeTransportController_n( }); transport_controller_->SetIceConfig(ParseIceConfig(configuration)); + return transport_controller_.get(); } rtc::scoped_refptr PeerConnection::local_streams() { @@ -914,9 +916,12 @@ PeerConnection::AddTransceiver( } RtpTransportInternal* PeerConnection::GetRtpTransport(const std::string& mid) { + // TODO(bugs.webrtc.org/9987): Avoid the thread jump. + // This might be done by caching the value on the signaling thread. RTC_DCHECK_RUN_ON(signaling_thread()); return network_thread()->Invoke( RTC_FROM_HERE, [this, &mid] { + RTC_DCHECK_RUN_ON(network_thread()); auto rtp_transport = transport_controller_->GetRtpTransport(mid); RTC_DCHECK(rtp_transport); return rtp_transport; @@ -1506,6 +1511,7 @@ RTCError PeerConnection::SetConfiguration( RTC_FROM_HERE, [this, needs_ice_restart, &ice_config, &stun_servers, &turn_servers, &modified_config, has_local_description] { + RTC_DCHECK_RUN_ON(network_thread()); // As described in JSEP, calling setConfiguration with new ICE // servers or candidate policy must set a "needs-ice-restart" bit so // that the next offer triggers an ICE restart which will pick up @@ -1528,9 +1534,12 @@ RTCError PeerConnection::SetConfiguration( if (configuration_.active_reset_srtp_params != modified_config.active_reset_srtp_params) { - // TODO(tommi): move to the network thread - this hides an invoke. - transport_controller_->SetActiveResetSrtpParams( - modified_config.active_reset_srtp_params); + // TODO(tommi): merge invokes + network_thread()->Invoke(RTC_FROM_HERE, [this, &modified_config] { + RTC_DCHECK_RUN_ON(network_thread()); + transport_controller_->SetActiveResetSrtpParams( + modified_config.active_reset_srtp_params); + }); } if (modified_config.allow_codec_switching.has_value()) { @@ -1693,7 +1702,13 @@ PeerConnection::LookupDtlsTransportByMid(const std::string& mid) { rtc::scoped_refptr PeerConnection::LookupDtlsTransportByMidInternal(const std::string& mid) { RTC_DCHECK_RUN_ON(signaling_thread()); - return transport_controller_->LookupDtlsTransportByMid(mid); + // TODO(bugs.webrtc.org/9987): Avoid the thread jump. + // This might be done by caching the value on the signaling thread. + return network_thread()->Invoke>( + RTC_FROM_HERE, [this, mid]() { + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_->LookupDtlsTransportByMid(mid); + }); } rtc::scoped_refptr PeerConnection::GetSctpTransport() @@ -1793,6 +1808,7 @@ void PeerConnection::Close() { // TODO(tommi): ^^ That's not exactly optimal since this is yet another // blocking hop to the network thread during Close(). Further still, the // voice/video/data channels will be cleared on the worker thread. + RTC_DCHECK_RUN_ON(network_thread()); transport_controller_.reset(); port_allocator_->DiscardCandidatePool(); if (network_thread_safety_) { @@ -2146,7 +2162,11 @@ bool PeerConnection::GetSctpSslRole(rtc::SSLRole* role) { absl::optional dtls_role; if (sctp_mid_s_) { - dtls_role = transport_controller_->GetDtlsRole(*sctp_mid_s_); + dtls_role = network_thread()->Invoke>( + RTC_FROM_HERE, [this] { + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_->GetDtlsRole(*sctp_mid_n_); + }); if (!dtls_role && sdp_handler_->is_caller().has_value()) { dtls_role = *sdp_handler_->is_caller() ? rtc::SSL_SERVER : rtc::SSL_CLIENT; @@ -2167,7 +2187,11 @@ bool PeerConnection::GetSslRole(const std::string& content_name, return false; } - auto dtls_role = transport_controller_->GetDtlsRole(content_name); + auto dtls_role = network_thread()->Invoke>( + RTC_FROM_HERE, [this, content_name]() { + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_->GetDtlsRole(content_name); + }); if (dtls_role) { *role = *dtls_role; return true; @@ -2198,7 +2222,7 @@ std::vector PeerConnection::GetDataChannelStats() const { absl::optional PeerConnection::sctp_transport_name() const { RTC_DCHECK_RUN_ON(signaling_thread()); - if (sctp_mid_s_ && transport_controller_) + if (sctp_mid_s_ && transport_controller_copy_) return sctp_transport_name_s_; return absl::optional(); } @@ -2864,7 +2888,7 @@ void PeerConnection::StartSctpTransport(int local_port, network_thread_safety_, [this, mid = *sctp_mid_s_, local_port, remote_port, max_message_size] { rtc::scoped_refptr sctp_transport = - transport_controller()->GetSctpTransport(mid); + transport_controller_n()->GetSctpTransport(mid); if (sctp_transport) sctp_transport->Start(local_port, remote_port, max_message_size); })); diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 653f5f5482..4855d32be1 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -278,7 +278,7 @@ class PeerConnection : public PeerConnectionInternal, bool initial_offerer() const override { RTC_DCHECK_RUN_ON(signaling_thread()); - return transport_controller_ && transport_controller_->initial_offerer(); + return sdp_handler_->initial_offerer(); } std::vector< @@ -357,7 +357,12 @@ class PeerConnection : public PeerConnectionInternal, } cricket::ChannelManager* channel_manager(); - JsepTransportController* transport_controller() override { + JsepTransportController* transport_controller_s() override { + RTC_DCHECK_RUN_ON(signaling_thread()); + return transport_controller_copy_; + } + JsepTransportController* transport_controller_n() override { + RTC_DCHECK_RUN_ON(network_thread()); return transport_controller_.get(); } cricket::PortAllocator* port_allocator() override { @@ -463,7 +468,7 @@ class PeerConnection : public PeerConnectionInternal, RTCError Initialize( const PeerConnectionInterface::RTCConfiguration& configuration, PeerConnectionDependencies dependencies); - void InitializeTransportController_n( + JsepTransportController* InitializeTransportController_n( const RTCConfiguration& configuration, const PeerConnectionDependencies& dependencies) RTC_RUN_ON(network_thread()); @@ -664,9 +669,14 @@ class PeerConnection : public PeerConnectionInternal, const std::string session_id_; - std::unique_ptr - transport_controller_; // TODO(bugs.webrtc.org/9987): Accessed on both - // signaling and network thread. + // The transport controller is set and used on the network thread. + // Some functions pass the value of the transport_controller_ pointer + // around as arguments while running on the signaling thread; these + // use the transport_controller_copy. + std::unique_ptr transport_controller_ + RTC_GUARDED_BY(network_thread()); + JsepTransportController* transport_controller_copy_ + RTC_GUARDED_BY(signaling_thread()) = nullptr; // `sctp_mid_` is the content name (MID) in SDP. // Note: this is used as the data channel MID by both SCTP and data channel diff --git a/pc/peer_connection_internal.h b/pc/peer_connection_internal.h index 38a1fd1002..16caade6c9 100644 --- a/pc/peer_connection_internal.h +++ b/pc/peer_connection_internal.h @@ -71,7 +71,8 @@ class PeerConnectionSdpMethods { // return the RTCConfiguration.crypto_options if set and will only default // back to the PeerConnectionFactory settings if nothing was set. virtual CryptoOptions GetCryptoOptions() = 0; - virtual JsepTransportController* transport_controller() = 0; + virtual JsepTransportController* transport_controller_s() = 0; + virtual JsepTransportController* transport_controller_n() = 0; virtual DataChannelController* data_channel_controller() = 0; virtual cricket::PortAllocator* port_allocator() = 0; virtual StatsCollector* stats() = 0; diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 586c0e5f79..8acc19b4ca 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -1228,7 +1228,8 @@ void SdpOfferAnswerHandler::Initialize( pc_->dtls_enabled(), std::move(dependencies.cert_generator), certificate, [this](const rtc::scoped_refptr& certificate) { - transport_controller()->SetLocalCertificate(certificate); + RTC_DCHECK_RUN_ON(signaling_thread()); + transport_controller_s()->SetLocalCertificate(certificate); }); if (pc_->options()->disable_encryption) { @@ -1265,12 +1266,19 @@ const TransceiverList* SdpOfferAnswerHandler::transceivers() const { } return pc_->rtp_manager()->transceivers(); } -JsepTransportController* SdpOfferAnswerHandler::transport_controller() { - return pc_->transport_controller(); +JsepTransportController* SdpOfferAnswerHandler::transport_controller_s() { + return pc_->transport_controller_s(); } -const JsepTransportController* SdpOfferAnswerHandler::transport_controller() +JsepTransportController* SdpOfferAnswerHandler::transport_controller_n() { + return pc_->transport_controller_n(); +} +const JsepTransportController* SdpOfferAnswerHandler::transport_controller_s() const { - return pc_->transport_controller(); + return pc_->transport_controller_s(); +} +const JsepTransportController* SdpOfferAnswerHandler::transport_controller_n() + const { + return pc_->transport_controller_n(); } DataChannelController* SdpOfferAnswerHandler::data_channel_controller() { return pc_->data_channel_controller(); @@ -1314,6 +1322,10 @@ rtc::Thread* SdpOfferAnswerHandler::signaling_thread() const { return context_->signaling_thread(); } +rtc::Thread* SdpOfferAnswerHandler::network_thread() const { + return context_->network_thread(); +} + void SdpOfferAnswerHandler::CreateOffer( CreateSessionDescriptionObserver* observer, const PeerConnectionInterface::RTCOfferAnswerOptions& options) { @@ -1502,6 +1514,9 @@ RTCError SdpOfferAnswerHandler::ApplyLocalDescription( replaced_local_description = std::move(pending_local_description_); pending_local_description_ = std::move(desc); } + if (!initial_offerer_) { + initial_offerer_.emplace(type == SdpType::kOffer); + } // The session description to apply now must be accessed by // `local_description()`. RTC_DCHECK(local_description()); @@ -1547,7 +1562,7 @@ RTCError SdpOfferAnswerHandler::ApplyLocalDescription( // information about DTLS transports. if (transceiver->mid()) { auto dtls_transport = LookupDtlsTransportByMid( - context_->network_thread(), transport_controller(), + context_->network_thread(), transport_controller_s(), *transceiver->mid()); transceiver->sender_internal()->set_transport(dtls_transport); transceiver->receiver_internal()->set_transport(dtls_transport); @@ -1792,7 +1807,7 @@ RTCError SdpOfferAnswerHandler::ReplaceRemoteDescription( *session_desc); // NOTE: This will perform an Invoke() to the network thread. - return transport_controller()->SetRemoteDescription(sdp_type, session_desc); + return transport_controller_s()->SetRemoteDescription(sdp_type, session_desc); } void SdpOfferAnswerHandler::ApplyRemoteDescription( @@ -1982,7 +1997,7 @@ void SdpOfferAnswerHandler::ApplyRemoteDescriptionUpdateTransceiverState( // 2.2.8.1.11.[3-6]: Set the transport internal slots. if (transceiver->mid()) { auto dtls_transport = LookupDtlsTransportByMid( - context_->network_thread(), transport_controller(), + context_->network_thread(), transport_controller_s(), *transceiver->mid()); transceiver->sender_internal()->set_transport(dtls_transport); transceiver->receiver_internal()->set_transport(dtls_transport); @@ -2190,7 +2205,7 @@ void SdpOfferAnswerHandler::DoSetLocalDescription( // MaybeStartGathering needs to be called after informing the observer so that // we don't signal any candidates before signaling that SetLocalDescription // completed. - transport_controller()->MaybeStartGathering(); + transport_controller_s()->MaybeStartGathering(); } void SdpOfferAnswerHandler::DoCreateOffer( @@ -2572,7 +2587,7 @@ bool SdpOfferAnswerHandler::RemoveIceCandidates( } // Remove the candidates from the transport controller. - RTCError error = transport_controller()->RemoveRemoteCandidates(candidates); + RTCError error = transport_controller_s()->RemoveRemoteCandidates(candidates); if (!error.ok()) { RTC_LOG(LS_ERROR) << "RemoveIceCandidates: Error when removing remote candidates: " @@ -2920,7 +2935,7 @@ RTCError SdpOfferAnswerHandler::Rollback(SdpType desc_type) { transceiver->internal()->set_mid(state.mid()); transceiver->internal()->set_mline_index(state.mline_index()); } - RTCError e = transport_controller()->RollbackTransports(); + RTCError e = transport_controller_s()->RollbackTransports(); if (!e.ok()) { return e; } @@ -2995,7 +3010,8 @@ bool SdpOfferAnswerHandler::NeedsIceRestart( absl::optional SdpOfferAnswerHandler::GetDtlsRole( const std::string& mid) const { - return transport_controller()->GetDtlsRole(mid); + RTC_DCHECK_RUN_ON(signaling_thread()); + return transport_controller_s()->GetDtlsRole(mid); } void SdpOfferAnswerHandler::UpdateNegotiationNeeded() { @@ -3565,8 +3581,11 @@ RTCError SdpOfferAnswerHandler::UpdateTransceiverChannel( return RTCError(RTCErrorType::INTERNAL_ERROR, "Failed to create channel for mid=" + content.name); } + // Note: this is a thread hop; the lambda will be executed + // on the network thread. transceiver->internal()->SetChannel(channel, [&](const std::string& mid) { - return transport_controller()->GetRtpTransport(mid); + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_n()->GetRtpTransport(mid); }); } } @@ -4491,13 +4510,13 @@ RTCError SdpOfferAnswerHandler::PushdownTransportDescription( if (source == cricket::CS_LOCAL) { const SessionDescriptionInterface* sdesc = local_description(); RTC_DCHECK(sdesc); - return transport_controller()->SetLocalDescription(type, - sdesc->description()); + return transport_controller_s()->SetLocalDescription(type, + sdesc->description()); } else { const SessionDescriptionInterface* sdesc = remote_description(); RTC_DCHECK(sdesc); - return transport_controller()->SetRemoteDescription(type, - sdesc->description()); + return transport_controller_s()->SetRemoteDescription(type, + sdesc->description()); } } @@ -4749,7 +4768,8 @@ RTCError SdpOfferAnswerHandler::CreateChannels(const SessionDescription& desc) { } rtp_manager()->GetAudioTransceiver()->internal()->SetChannel( voice_channel, [&](const std::string& mid) { - return transport_controller()->GetRtpTransport(mid); + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_n()->GetRtpTransport(mid); }); } @@ -4763,7 +4783,8 @@ RTCError SdpOfferAnswerHandler::CreateChannels(const SessionDescription& desc) { } rtp_manager()->GetVideoTransceiver()->internal()->SetChannel( video_channel, [&](const std::string& mid) { - return transport_controller()->GetRtpTransport(mid); + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_n()->GetRtpTransport(mid); }); } diff --git a/pc/sdp_offer_answer.h b/pc/sdp_offer_answer.h index c9c30468c7..67ead47242 100644 --- a/pc/sdp_offer_answer.h +++ b/pc/sdp_offer_answer.h @@ -182,6 +182,14 @@ class SdpOfferAnswerHandler : public SdpStateProvider, rtc::scoped_refptr local_streams(); rtc::scoped_refptr remote_streams(); + bool initial_offerer() { + RTC_DCHECK_RUN_ON(signaling_thread()); + if (initial_offerer_) { + return *initial_offerer_; + } + return false; + } + private: class RemoteDescriptionOperation; class ImplicitCreateSessionDescriptionObserver; @@ -215,6 +223,7 @@ class SdpOfferAnswerHandler : public SdpStateProvider, PeerConnectionDependencies& dependencies); rtc::Thread* signaling_thread() const; + rtc::Thread* network_thread() const; // Non-const versions of local_description()/remote_description(), for use // internally. SessionDescriptionInterface* mutable_local_description() @@ -587,8 +596,14 @@ class SdpOfferAnswerHandler : public SdpStateProvider, const cricket::PortAllocator* port_allocator() const; RtpTransmissionManager* rtp_manager(); const RtpTransmissionManager* rtp_manager() const; - JsepTransportController* transport_controller(); - const JsepTransportController* transport_controller() const; + JsepTransportController* transport_controller_s() + RTC_RUN_ON(signaling_thread()); + const JsepTransportController* transport_controller_s() const + RTC_RUN_ON(signaling_thread()); + JsepTransportController* transport_controller_n() + RTC_RUN_ON(network_thread()); + const JsepTransportController* transport_controller_n() const + RTC_RUN_ON(network_thread()); // =================================================================== const cricket::AudioOptions& audio_options() { return audio_options_; } const cricket::VideoOptions& video_options() { return video_options_; } @@ -683,6 +698,10 @@ class SdpOfferAnswerHandler : public SdpStateProvider, std::unique_ptr video_bitrate_allocator_factory_ RTC_GUARDED_BY(signaling_thread()); + // Whether we are the initial offerer on the association. This + // determines the SSL role. + absl::optional initial_offerer_ RTC_GUARDED_BY(signaling_thread()); + rtc::WeakPtrFactory weak_ptr_factory_ RTC_GUARDED_BY(signaling_thread()); }; diff --git a/pc/test/fake_peer_connection_base.h b/pc/test/fake_peer_connection_base.h index 3462c8c78e..7d64ab8180 100644 --- a/pc/test/fake_peer_connection_base.h +++ b/pc/test/fake_peer_connection_base.h @@ -309,7 +309,8 @@ class FakePeerConnectionBase : public PeerConnectionInternal { } CryptoOptions GetCryptoOptions() override { return CryptoOptions(); } - JsepTransportController* transport_controller() override { return nullptr; } + JsepTransportController* transport_controller_s() override { return nullptr; } + JsepTransportController* transport_controller_n() override { return nullptr; } DataChannelController* data_channel_controller() override { return nullptr; } cricket::PortAllocator* port_allocator() override { return nullptr; } StatsCollector* stats() override { return nullptr; }