diff --git a/pc/connection_context.h b/pc/connection_context.h index 02d08a191e..71e2f1eeae 100644 --- a/pc/connection_context.h +++ b/pc/connection_context.h @@ -62,7 +62,6 @@ class ConnectionContext : public rtc::RefCountInterface { // Functions called from PeerConnection and friends SctpTransportFactoryInterface* sctp_transport_factory() const { - RTC_DCHECK_RUN_ON(signaling_thread_); return sctp_factory_.get(); } @@ -123,8 +122,7 @@ class ConnectionContext : public rtc::RefCountInterface { RTC_GUARDED_BY(signaling_thread_); std::unique_ptr media_engine_ RTC_GUARDED_BY(signaling_thread_); - std::unique_ptr const sctp_factory_ - RTC_GUARDED_BY(signaling_thread_); + std::unique_ptr const sctp_factory_; // Accessed both on signaling thread and worker thread. std::unique_ptr const trials_; }; diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index 0ded1de84f..28ba899cb3 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -84,13 +84,11 @@ webrtc::RTCError VerifyCandidates(const cricket::Candidates& candidates) { namespace webrtc { JsepTransportController::JsepTransportController( - rtc::Thread* signaling_thread, rtc::Thread* network_thread, cricket::PortAllocator* port_allocator, AsyncResolverFactory* async_resolver_factory, Config config) - : signaling_thread_(signaling_thread), - network_thread_(network_thread), + : network_thread_(network_thread), port_allocator_(port_allocator), async_resolver_factory_(async_resolver_factory), config_(config), @@ -222,12 +220,6 @@ void JsepTransportController::SetNeedsIceRestartFlag() { bool JsepTransportController::NeedsIceRestart( const std::string& transport_name) const { - if (!network_thread_->IsCurrent()) { - RTC_DCHECK_RUN_ON(signaling_thread_); - return network_thread_->Invoke( - RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); }); - } - RTC_DCHECK_RUN_ON(network_thread_); const cricket::JsepTransport* transport = @@ -414,11 +406,6 @@ RTCError JsepTransportController::RemoveRemoteCandidates( bool JsepTransportController::GetStats(const std::string& transport_name, cricket::TransportStats* stats) { - if (!network_thread_->IsCurrent()) { - return network_thread_->Invoke( - RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); }); - } - RTC_DCHECK_RUN_ON(network_thread_); cricket::JsepTransport* transport = GetJsepTransportByName(transport_name); @@ -1194,35 +1181,24 @@ void JsepTransportController::OnTransportCandidateGathered_n( RTC_NOTREACHED(); return; } - std::string transport_name = transport->transport_name(); - // TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be - // able to just call this directly here. - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] { - signal_ice_candidates_gathered_.Send( - transport_name, std::vector{candidate}); - }); + + signal_ice_candidates_gathered_.Send( + transport->transport_name(), std::vector{candidate}); } void JsepTransportController::OnTransportCandidateError_n( cricket::IceTransportInternal* transport, const cricket::IceCandidateErrorEvent& event) { - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this, event] { - signal_ice_candidate_error_.Send(event); - }); + signal_ice_candidate_error_.Send(event); } void JsepTransportController::OnTransportCandidatesRemoved_n( cricket::IceTransportInternal* transport, const cricket::Candidates& candidates) { - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, - [this, candidates] { signal_ice_candidates_removed_.Send(candidates); }); + signal_ice_candidates_removed_.Send(candidates); } void JsepTransportController::OnTransportCandidatePairChanged_n( const cricket::CandidatePairChangeEvent& event) { - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this, event] { - signal_ice_candidate_pair_changed_.Send(event); - }); + signal_ice_candidate_pair_changed_.Send(event); } void JsepTransportController::OnTransportRoleConflict_n( @@ -1298,10 +1274,7 @@ void JsepTransportController::UpdateAggregateStates_n() { if (ice_connection_state_ != new_connection_state) { ice_connection_state_ = new_connection_state; - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, new_connection_state] { - signal_ice_connection_state_.Send(new_connection_state); - }); + signal_ice_connection_state_.Send(new_connection_state); } // Compute the current RTCIceConnectionState as described in @@ -1357,17 +1330,11 @@ void JsepTransportController::UpdateAggregateStates_n() { new_ice_connection_state == PeerConnectionInterface::kIceConnectionCompleted) { // Ensure that we never skip over the "connected" state. - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this] { - signal_standardized_ice_connection_state_.Send( - PeerConnectionInterface::kIceConnectionConnected); - }); + signal_standardized_ice_connection_state_.Send( + PeerConnectionInterface::kIceConnectionConnected); } standardized_ice_connection_state_ = new_ice_connection_state; - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, - [this, new_ice_connection_state] { - signal_standardized_ice_connection_state_.Send( - new_ice_connection_state); - }); + signal_standardized_ice_connection_state_.Send(new_ice_connection_state); } // Compute the current RTCPeerConnectionState as described in @@ -1418,10 +1385,7 @@ void JsepTransportController::UpdateAggregateStates_n() { if (combined_connection_state_ != new_combined_state) { combined_connection_state_ = new_combined_state; - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, new_combined_state] { - signal_connection_state_.Send(new_combined_state); - }); + signal_connection_state_.Send(new_combined_state); } // Compute the gathering state. @@ -1434,10 +1398,7 @@ void JsepTransportController::UpdateAggregateStates_n() { } if (ice_gathering_state_ != new_gathering_state) { ice_gathering_state_ = new_gathering_state; - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, new_gathering_state] { - signal_ice_gathering_state_.Send(new_gathering_state); - }); + signal_ice_gathering_state_.Send(new_gathering_state); } } diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h index 59d66a24f2..949c9ad1dc 100644 --- a/pc/jsep_transport_controller.h +++ b/pc/jsep_transport_controller.h @@ -54,7 +54,6 @@ #include "pc/session_description.h" #include "pc/srtp_transport.h" #include "pc/transport_stats.h" -#include "rtc_base/async_invoker.h" #include "rtc_base/callback_list.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/copy_on_write_buffer.h" @@ -137,10 +136,11 @@ class JsepTransportController : public sigslot::has_slots<> { std::function on_dtls_handshake_error_; }; - // The ICE related events are signaled on the |signaling_thread|. - // All the transport related methods are called on the |network_thread|. - JsepTransportController(rtc::Thread* signaling_thread, - rtc::Thread* network_thread, + // The ICE related events are fired on the |network_thread|. + // All the transport related methods are called on the |network_thread| + // and destruction of the JsepTransportController must occur on the + // |network_thread|. + JsepTransportController(rtc::Thread* network_thread, cricket::PortAllocator* port_allocator, AsyncResolverFactory* async_resolver_factory, Config config); @@ -227,26 +227,28 @@ class JsepTransportController : public sigslot::has_slots<> { // F: void(const std::string&, const std::vector&) template void SubscribeIceCandidateGathered(F&& callback) { - // TODO(bugs.webrtc.org/12427): Post this subscription to the network - // thread. + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidates_gathered_.AddReceiver(std::forward(callback)); } // F: void(cricket::IceConnectionState) template void SubscribeIceConnectionState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_connection_state_.AddReceiver(std::forward(callback)); } // F: void(PeerConnectionInterface::PeerConnectionState) template void SubscribeConnectionState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_connection_state_.AddReceiver(std::forward(callback)); } // F: void(PeerConnectionInterface::IceConnectionState) template void SubscribeStandardizedIceConnectionState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_standardized_ice_connection_state_.AddReceiver( std::forward(callback)); } @@ -254,60 +256,65 @@ class JsepTransportController : public sigslot::has_slots<> { // F: void(cricket::IceGatheringState) template void SubscribeIceGatheringState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_gathering_state_.AddReceiver(std::forward(callback)); } // F: void(const cricket::IceCandidateErrorEvent&) template void SubscribeIceCandidateError(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidate_error_.AddReceiver(std::forward(callback)); } // F: void(const std::vector&) template void SubscribeIceCandidatesRemoved(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidates_removed_.AddReceiver(std::forward(callback)); } // F: void(const cricket::CandidatePairChangeEvent&) template void SubscribeIceCandidatePairChanged(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidate_pair_changed_.AddReceiver(std::forward(callback)); } private: - // All of these callbacks are fired on the signaling thread. + // All of these callbacks are fired on the network thread. // If any transport failed => failed, // Else if all completed => completed, // Else if all connected => connected, // Else => connecting - CallbackList signal_ice_connection_state_; + CallbackList signal_ice_connection_state_ + RTC_GUARDED_BY(network_thread_); CallbackList - signal_connection_state_; + signal_connection_state_ RTC_GUARDED_BY(network_thread_); CallbackList - signal_standardized_ice_connection_state_; + signal_standardized_ice_connection_state_ RTC_GUARDED_BY(network_thread_); // If all transports done gathering => complete, // Else if any are gathering => gathering, // Else => new - CallbackList signal_ice_gathering_state_; + CallbackList signal_ice_gathering_state_ + RTC_GUARDED_BY(network_thread_); // [mid, candidates] - // TODO(bugs.webrtc.org/12427): Protect this with network_thread_. CallbackList&> - signal_ice_candidates_gathered_; + signal_ice_candidates_gathered_ RTC_GUARDED_BY(network_thread_); CallbackList - signal_ice_candidate_error_; + signal_ice_candidate_error_ RTC_GUARDED_BY(network_thread_); CallbackList&> - signal_ice_candidates_removed_; + signal_ice_candidates_removed_ RTC_GUARDED_BY(network_thread_); CallbackList - signal_ice_candidate_pair_changed_; + signal_ice_candidate_pair_changed_ RTC_GUARDED_BY(network_thread_); RTCError ApplyDescription_n(bool local, SdpType type, @@ -452,7 +459,6 @@ class JsepTransportController : public sigslot::has_slots<> { void OnDtlsHandshakeError(rtc::SSLHandshakeError error); - rtc::Thread* const signaling_thread_ = nullptr; rtc::Thread* const network_thread_ = nullptr; cricket::PortAllocator* const port_allocator_ = nullptr; AsyncResolverFactory* const async_resolver_factory_ = nullptr; @@ -490,7 +496,6 @@ class JsepTransportController : public sigslot::has_slots<> { cricket::IceRole ice_role_ = cricket::ICEROLE_CONTROLLING; uint64_t ice_tiebreaker_ = rtc::CreateRandomId64(); rtc::scoped_refptr certificate_; - rtc::AsyncInvoker invoker_; RTC_DISALLOW_COPY_AND_ASSIGN(JsepTransportController); }; diff --git a/pc/jsep_transport_controller_unittest.cc b/pc/jsep_transport_controller_unittest.cc index 9efa205368..0424afe876 100644 --- a/pc/jsep_transport_controller_unittest.cc +++ b/pc/jsep_transport_controller_unittest.cc @@ -74,7 +74,6 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, void CreateJsepTransportController( JsepTransportController::Config config, - rtc::Thread* signaling_thread = rtc::Thread::Current(), rtc::Thread* network_thread = rtc::Thread::Current(), cricket::PortAllocator* port_allocator = nullptr) { config.transport_observer = this; @@ -84,9 +83,10 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, config.dtls_transport_factory = fake_dtls_transport_factory_.get(); config.on_dtls_handshake_error_ = [](rtc::SSLHandshakeError s) {}; transport_controller_ = std::make_unique( - signaling_thread, network_thread, port_allocator, - nullptr /* async_resolver_factory */, config); - ConnectTransportControllerSignals(); + network_thread, port_allocator, nullptr /* async_resolver_factory */, + config); + network_thread->Invoke(RTC_FROM_HERE, + [&] { ConnectTransportControllerSignals(); }); } void ConnectTransportControllerSignals() { @@ -276,18 +276,14 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, protected: void OnConnectionState(cricket::IceConnectionState state) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); connection_state_ = state; ++connection_state_signal_count_; } void OnStandardizedIceConnectionState( PeerConnectionInterface::IceConnectionState state) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); ice_connection_state_ = state; ++ice_connection_state_signal_count_; } @@ -296,26 +292,20 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, PeerConnectionInterface::PeerConnectionState state) { RTC_LOG(LS_INFO) << "OnCombinedConnectionState: " << static_cast(state); - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); combined_connection_state_ = state; ++combined_connection_state_signal_count_; } void OnGatheringState(cricket::IceGatheringState state) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); gathering_state_ = state; ++gathering_state_signal_count_; } void OnCandidatesGathered(const std::string& transport_name, const Candidates& candidates) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); candidates_[transport_name].insert(candidates_[transport_name].end(), candidates.begin(), candidates.end()); ++candidates_signal_count_; @@ -360,7 +350,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, std::unique_ptr fake_ice_transport_factory_; std::unique_ptr fake_dtls_transport_factory_; rtc::Thread* const signaling_thread_ = nullptr; - bool signaled_on_non_signaling_thread_ = false; + rtc::Thread* ice_signaled_on_thread_ = nullptr; // Used to verify the SignalRtpTransportChanged/SignalDtlsTransportChanged are // signaled correctly. std::map changed_rtp_transport_by_mid_; @@ -883,11 +873,12 @@ TEST_F(JsepTransportControllerTest, SignalCandidatesGathered) { EXPECT_EQ(1u, candidates_[kAudioMid1].size()); } -TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) { +TEST_F(JsepTransportControllerTest, IceSignalingOccursOnNetworkThread) { network_thread_ = rtc::Thread::CreateWithSocketServer(); network_thread_->Start(); + EXPECT_EQ(ice_signaled_on_thread_, nullptr); CreateJsepTransportController(JsepTransportController::Config(), - signaling_thread_, network_thread_.get(), + network_thread_.get(), /*port_allocator=*/nullptr); CreateLocalDescriptionAndCompleteConnectionOnNetworkThread(); @@ -903,7 +894,7 @@ TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) { EXPECT_EQ_WAIT(1u, candidates_[kVideoMid1].size(), kTimeout); EXPECT_EQ(2, candidates_signal_count_); - EXPECT_TRUE(!signaled_on_non_signaling_thread_); + EXPECT_EQ(ice_signaled_on_thread_, network_thread_.get()); network_thread_->Invoke(RTC_FROM_HERE, [&] { transport_controller_.reset(); }); diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index f82fe35c6d..b4e4246766 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -88,7 +88,6 @@ const char kSimulcastNumberOfEncodings[] = static const int REPORT_USAGE_PATTERN_DELAY_MS = 60000; - uint32_t ConvertIceTransportTypeToCandidateFilter( PeerConnectionInterface::IceTransportsType type) { switch (type) { @@ -264,6 +263,20 @@ bool HasRtcpMuxEnabled(const cricket::ContentInfo* content) { return content->media_description()->rtcp_mux(); } +bool DtlsEnabled(const PeerConnectionInterface::RTCConfiguration& configuration, + const PeerConnectionFactoryInterface::Options& options, + const PeerConnectionDependencies& dependencies) { + if (options.disable_encryption) + return false; + + // Enable DTLS by default if we have an identity store or a certificate. + bool default_enabled = + (dependencies.cert_generator || !configuration.certificates.empty()); + + // The |configuration| can override the default value. + return configuration.enable_dtls_srtp.value_or(default_enabled); +} + } // namespace bool PeerConnectionInterface::RTCConfiguration::operator==( @@ -421,11 +434,12 @@ RTCErrorOr> PeerConnection::Create( bool is_unified_plan = configuration.sdp_semantics == SdpSemantics::kUnifiedPlan; + bool dtls_enabled = DtlsEnabled(configuration, options, dependencies); // The PeerConnection constructor consumes some, but not all, dependencies. rtc::scoped_refptr pc( new rtc::RefCountedObject( context, options, is_unified_plan, std::move(event_log), - std::move(call), dependencies)); + std::move(call), dependencies, dtls_enabled)); RTCError init_error = pc->Initialize(configuration, std::move(dependencies)); if (!init_error.ok()) { RTC_LOG(LS_ERROR) << "PeerConnection initialization failed"; @@ -440,7 +454,8 @@ PeerConnection::PeerConnection( bool is_unified_plan, std::unique_ptr event_log, std::unique_ptr call, - PeerConnectionDependencies& dependencies) + PeerConnectionDependencies& dependencies, + bool dtls_enabled) : context_(context), options_(options), observer_(dependencies.observer), @@ -453,9 +468,17 @@ PeerConnection::PeerConnection( tls_cert_verifier_(std::move(dependencies.tls_cert_verifier)), call_(std::move(call)), call_ptr_(call_.get()), + dtls_enabled_(dtls_enabled), data_channel_controller_(this), message_handler_(signaling_thread()), - weak_factory_(this) {} + weak_factory_(this) { + worker_thread()->Invoke(RTC_FROM_HERE, [this] { + RTC_DCHECK_RUN_ON(worker_thread()); + worker_thread_safety_ = PendingTaskSafetyFlag::Create(); + if (!call_) + worker_thread_safety_->SetNotAlive(); + }); +} PeerConnection::~PeerConnection() { TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection"); @@ -496,15 +519,13 @@ PeerConnection::~PeerConnection() { RTC_DCHECK_RUN_ON(network_thread()); transport_controller_.reset(); port_allocator_.reset(); - if (network_thread_safety_) { + if (network_thread_safety_) network_thread_safety_->SetNotAlive(); - network_thread_safety_ = nullptr; - } }); // call_ and event_log_ must be destroyed on the worker thread. worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); - call_safety_.reset(); + worker_thread_safety_->SetNotAlive(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -531,20 +552,6 @@ RTCError PeerConnection::Initialize( turn_server.turn_logging_id = configuration.turn_logging_id; } - // The port allocator lives on the network thread and should be initialized - // there. Also set up the task safety flag for canceling pending tasks on - // the network thread when closing. - // TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and - // initialize all the |transport_controller_->Subscribe*| calls below on the - // network thread via this invoke. - const auto pa_result = - network_thread()->Invoke( - RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] { - network_thread_safety_ = PendingTaskSafetyFlag::Create(); - return InitializePortAllocator_n(stun_servers, turn_servers, - configuration); - }); - // Note if STUN or TURN servers were supplied. if (!stun_servers.empty()) { NoteUsageEvent(UsageEvent::STUN_SERVER_ADDED); @@ -553,52 +560,11 @@ RTCError PeerConnection::Initialize( NoteUsageEvent(UsageEvent::TURN_SERVER_ADDED); } - // Send information about IPv4/IPv6 status. - PeerConnectionAddressFamilyCounter address_family; - if (pa_result.enable_ipv6) { - address_family = kPeerConnection_IPv6; - } else { - address_family = kPeerConnection_IPv4; - } - RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family, - kPeerConnectionAddressFamilyCounter_Max); - // RFC 3264: The numeric value of the session id and version in the // o line MUST be representable with a "64 bit signed integer". // Due to this constraint session id |session_id_| is max limited to // LLONG_MAX. session_id_ = rtc::ToString(rtc::CreateRandomId64() & LLONG_MAX); - JsepTransportController::Config config; - config.redetermine_role_on_ice_restart = - configuration.redetermine_role_on_ice_restart; - config.ssl_max_version = options_.ssl_max_version; - config.disable_encryption = options_.disable_encryption; - config.bundle_policy = configuration.bundle_policy; - config.rtcp_mux_policy = configuration.rtcp_mux_policy; - // TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove - // this stub. - config.crypto_options = configuration.crypto_options.has_value() - ? *configuration.crypto_options - : options_.crypto_options; - config.transport_observer = this; - config.rtcp_handler = InitializeRtcpCallback(); - config.event_log = event_log_ptr_; -#if defined(ENABLE_EXTERNAL_AUTH) - config.enable_external_auth = true; -#endif - config.active_reset_srtp_params = configuration.active_reset_srtp_params; - - if (options_.disable_encryption) { - dtls_enabled_ = false; - } else { - // Enable DTLS by default if we have an identity store or a certificate. - dtls_enabled_ = - (dependencies.cert_generator || !configuration.certificates.empty()); - // |configuration| can override the default |dtls_enabled_| value. - if (configuration.enable_dtls_srtp) { - dtls_enabled_ = *(configuration.enable_dtls_srtp); - } - } if (configuration.enable_rtp_data_channel) { // Enable creation of RTP data channels if the kEnableRtpDataChannels is @@ -609,77 +575,27 @@ RTCError PeerConnection::Initialize( // DTLS has to be enabled to use SCTP. if (!options_.disable_sctp_data_channels && dtls_enabled_) { data_channel_controller_.set_data_channel_type(cricket::DCT_SCTP); - config.sctp_factory = context_->sctp_transport_factory(); } } - config.ice_transport_factory = ice_transport_factory_.get(); - config.on_dtls_handshake_error_ = - [weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) { - if (weak_ptr) { - weak_ptr->OnTransportControllerDtlsHandshakeError(s); - } - }; - - transport_controller_.reset(new JsepTransportController( - signaling_thread(), network_thread(), port_allocator_.get(), - async_resolver_factory_.get(), config)); - - // The following RTC_DCHECKs are added by looking at the caller thread. - // If this is incorrect there might not be test failures - // due to lack of unit tests which trigger these scenarios. - // TODO(bugs.webrtc.org/12160): Remove above comments. - // callbacks for signaling_thread. - // TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network - // Invoke(), then perhaps we could post these subscription calls to the - // network thread so that the transport controller doesn't have to do the - // signaling/network handling internally and use AsyncInvoker. - transport_controller_->SubscribeIceConnectionState( - [this](cricket::IceConnectionState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerConnectionState(s); - }); - transport_controller_->SubscribeConnectionState( - [this](PeerConnectionInterface::PeerConnectionState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - SetConnectionState(s); - }); - transport_controller_->SubscribeStandardizedIceConnectionState( - [this](PeerConnectionInterface::IceConnectionState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - SetStandardizedIceConnectionState(s); - }); - transport_controller_->SubscribeIceGatheringState( - [this](cricket::IceGatheringState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerGatheringState(s); - }); - transport_controller_->SubscribeIceCandidateGathered( - [this](const std::string& transport, - const std::vector& candidates) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidatesGathered(transport, candidates); - }); - transport_controller_->SubscribeIceCandidateError( - [this](const cricket::IceCandidateErrorEvent& event) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidateError(event); - }); - transport_controller_->SubscribeIceCandidatesRemoved( - [this](const std::vector& c) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidatesRemoved(c); - }); - transport_controller_->SubscribeIceCandidatePairChanged( - [this](const cricket::CandidatePairChangeEvent& event) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidateChanged(event); - }); + // Network thread initialization. + network_thread()->Invoke(RTC_FROM_HERE, [this, &stun_servers, + &turn_servers, &configuration, + &dependencies] { + RTC_DCHECK_RUN_ON(network_thread()); + network_thread_safety_ = PendingTaskSafetyFlag::Create(); + InitializePortAllocatorResult pa_result = + InitializePortAllocator_n(stun_servers, turn_servers, configuration); + // Send information about IPv4/IPv6 status. + PeerConnectionAddressFamilyCounter address_family = + pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4; + RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family, + kPeerConnectionAddressFamilyCounter_Max); + InitializeTransportController_n(configuration, dependencies); + }); configuration_ = configuration; - transport_controller_->SetIceConfig(ParseIceConfig(configuration)); - stats_ = std::make_unique(this); stats_collector_ = RTCStatsCollector::Create(this); @@ -716,6 +632,125 @@ RTCError PeerConnection::Initialize( return RTCError::OK(); } +void PeerConnection::InitializeTransportController_n( + const RTCConfiguration& configuration, + const PeerConnectionDependencies& dependencies) { + JsepTransportController::Config config; + config.redetermine_role_on_ice_restart = + configuration.redetermine_role_on_ice_restart; + config.ssl_max_version = options_.ssl_max_version; + config.disable_encryption = options_.disable_encryption; + config.bundle_policy = configuration.bundle_policy; + config.rtcp_mux_policy = configuration.rtcp_mux_policy; + // TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove + // this stub. + config.crypto_options = configuration.crypto_options.has_value() + ? *configuration.crypto_options + : options_.crypto_options; + config.transport_observer = this; + config.rtcp_handler = InitializeRtcpCallback(); + config.event_log = event_log_ptr_; +#if defined(ENABLE_EXTERNAL_AUTH) + config.enable_external_auth = true; +#endif + config.active_reset_srtp_params = configuration.active_reset_srtp_params; + + // DTLS has to be enabled to use SCTP. + if (!configuration.enable_rtp_data_channel && + !options_.disable_sctp_data_channels && dtls_enabled_) { + config.sctp_factory = context_->sctp_transport_factory(); + } + + config.ice_transport_factory = ice_transport_factory_.get(); + config.on_dtls_handshake_error_ = + [weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) { + if (weak_ptr) { + weak_ptr->OnTransportControllerDtlsHandshakeError(s); + } + }; + + transport_controller_.reset( + new JsepTransportController(network_thread(), port_allocator_.get(), + async_resolver_factory_.get(), config)); + + transport_controller_->SubscribeIceConnectionState( + [this](cricket::IceConnectionState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerConnectionState(s); + })); + }); + transport_controller_->SubscribeConnectionState( + [this](PeerConnectionInterface::PeerConnectionState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + SetConnectionState(s); + })); + }); + transport_controller_->SubscribeStandardizedIceConnectionState( + [this](PeerConnectionInterface::IceConnectionState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + SetStandardizedIceConnectionState(s); + })); + }); + transport_controller_->SubscribeIceGatheringState( + [this](cricket::IceGatheringState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerGatheringState(s); + })); + }); + transport_controller_->SubscribeIceCandidateGathered( + [this](const std::string& transport, + const std::vector& candidates) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), + [this, t = transport, c = candidates]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidatesGathered(t, c); + })); + }); + transport_controller_->SubscribeIceCandidateError( + [this](const cricket::IceCandidateErrorEvent& event) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask(ToQueuedTask( + signaling_thread_safety_.flag(), [this, event = event]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidateError(event); + })); + }); + transport_controller_->SubscribeIceCandidatesRemoved( + [this](const std::vector& c) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidatesRemoved(c); + })); + }); + transport_controller_->SubscribeIceCandidatePairChanged( + [this](const cricket::CandidatePairChangeEvent& event) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask(ToQueuedTask( + signaling_thread_safety_.flag(), [this, event = event]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidateChanged(event); + })); + }); + + transport_controller_->SetIceConfig(ParseIceConfig(configuration)); +} + rtc::scoped_refptr PeerConnection::local_streams() { RTC_DCHECK_RUN_ON(signaling_thread()); RTC_CHECK(!IsUnifiedPlan()) << "local_streams is not available with Unified " @@ -1440,6 +1475,7 @@ RTCError PeerConnection::SetConfiguration( if (configuration_.active_reset_srtp_params != modified_config.active_reset_srtp_params) { + // TODO(tommi): move to the network thread - this hides an invoke. transport_controller_->SetActiveResetSrtpParams( modified_config.active_reset_srtp_params); } @@ -1594,6 +1630,7 @@ void PeerConnection::StopRtcEventLog() { rtc::scoped_refptr PeerConnection::LookupDtlsTransportByMid(const std::string& mid) { RTC_DCHECK_RUN_ON(signaling_thread()); + // TODO(tommi): Move to the network thread - this hides an invoke. return transport_controller_->LookupDtlsTransportByMid(mid); } @@ -1697,13 +1734,12 @@ void PeerConnection::Close() { port_allocator_->DiscardCandidatePool(); if (network_thread_safety_) { network_thread_safety_->SetNotAlive(); - network_thread_safety_ = nullptr; } }); worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); - call_safety_.reset(); + worker_thread_safety_->SetNotAlive(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -2144,7 +2180,10 @@ bool PeerConnection::IceRestartPending(const std::string& content_name) const { } bool PeerConnection::NeedsIceRestart(const std::string& content_name) const { - return transport_controller_->NeedsIceRestart(content_name); + return network_thread()->Invoke(RTC_FROM_HERE, [this, &content_name] { + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_->NeedsIceRestart(content_name); + }); } void PeerConnection::OnTransportControllerConnectionState( @@ -2487,6 +2526,7 @@ void PeerConnection::OnTransportControllerGatheringState( } void PeerConnection::ReportTransportStats() { + rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; std::map> media_types_by_transport_name; for (const auto& transceiver : rtp_manager()->transceivers()->List()) { @@ -2508,18 +2548,25 @@ void PeerConnection::ReportTransportStats() { cricket::MEDIA_TYPE_DATA); } - for (const auto& entry : media_types_by_transport_name) { - const std::string& transport_name = entry.first; - const std::set media_types = entry.second; - cricket::TransportStats stats; - if (transport_controller_->GetStats(transport_name, &stats)) { - ReportBestConnectionState(stats); - ReportNegotiatedCiphers(stats, media_types); - } - } + // Run the loop that reports the state on the network thread since the + // transport controller requires the stats to be read there (GetStats()). + network_thread()->PostTask(ToQueuedTask( + network_thread_safety_, [this, media_types_by_transport_name = std::move( + media_types_by_transport_name)] { + for (const auto& entry : media_types_by_transport_name) { + const std::string& transport_name = entry.first; + const std::set media_types = entry.second; + cricket::TransportStats stats; + if (transport_controller_->GetStats(transport_name, &stats)) { + ReportBestConnectionState(stats); + ReportNegotiatedCiphers(dtls_enabled_, stats, media_types); + } + } + })); } // Walk through the ConnectionInfos to gather best connection usage // for IPv4 and IPv6. +// static (no member state required) void PeerConnection::ReportBestConnectionState( const cricket::TransportStats& stats) { for (const cricket::TransportChannelStats& channel_stats : @@ -2567,10 +2614,12 @@ void PeerConnection::ReportBestConnectionState( } } +// static void PeerConnection::ReportNegotiatedCiphers( + bool dtls_enabled, const cricket::TransportStats& stats, const std::set& media_types) { - if (!dtls_enabled_ || stats.channel_stats.empty()) { + if (!dtls_enabled || stats.channel_stats.empty()) { return; } @@ -2721,24 +2770,9 @@ void PeerConnection::RequestUsagePatternReportForTesting() { std::function PeerConnection::InitializeRtcpCallback() { - RTC_DCHECK_RUN_ON(signaling_thread()); - - auto flag = - worker_thread()->Invoke>( - RTC_FROM_HERE, [this] { - RTC_DCHECK_RUN_ON(worker_thread()); - if (!call_) - return rtc::scoped_refptr(); - if (!call_safety_) - call_safety_.reset(new ScopedTaskSafety()); - return call_safety_->flag(); - }); - - if (!flag) - return [](const rtc::CopyOnWriteBuffer&, int64_t) {}; - - return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet, - int64_t packet_time_us) { + RTC_DCHECK_RUN_ON(network_thread()); + return [this, flag = worker_thread_safety_]( + const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) { RTC_DCHECK_RUN_ON(network_thread()); // TODO(bugs.webrtc.org/11993): We should actually be delivering this call // directly to the Call class somehow directly on the network thread and not diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 92e33d2858..75af0ae170 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -455,7 +455,8 @@ class PeerConnection : public PeerConnectionInternal, bool is_unified_plan, std::unique_ptr event_log, std::unique_ptr call, - PeerConnectionDependencies& dependencies); + PeerConnectionDependencies& dependencies, + bool dtls_enabled); ~PeerConnection() override; @@ -463,6 +464,10 @@ class PeerConnection : public PeerConnectionInternal, RTCError Initialize( const PeerConnectionInterface::RTCConfiguration& configuration, PeerConnectionDependencies dependencies); + void InitializeTransportController_n( + const RTCConfiguration& configuration, + const PeerConnectionDependencies& dependencies) + RTC_RUN_ON(network_thread()); rtc::scoped_refptr> FindTransceiverBySender(rtc::scoped_refptr sender) @@ -573,11 +578,12 @@ class PeerConnection : public PeerConnectionInternal, void ReportTransportStats() RTC_RUN_ON(signaling_thread()); // Gather the usage of IPv4/IPv6 as best connection. - void ReportBestConnectionState(const cricket::TransportStats& stats); + static void ReportBestConnectionState(const cricket::TransportStats& stats); - void ReportNegotiatedCiphers(const cricket::TransportStats& stats, - const std::set& media_types) - RTC_RUN_ON(signaling_thread()); + static void ReportNegotiatedCiphers( + bool dtls_enabled, + const cricket::TransportStats& stats, + const std::set& media_types); void ReportIceCandidateCollected(const cricket::Candidate& candidate) RTC_RUN_ON(signaling_thread()); @@ -627,8 +633,9 @@ class PeerConnection : public PeerConnectionInternal, // TODO(zstein): |async_resolver_factory_| can currently be nullptr if it // is not injected. It should be required once chromium supplies it. - const std::unique_ptr async_resolver_factory_ - RTC_GUARDED_BY(signaling_thread()); + // This member variable is only used by JsepTransportController so we should + // consider moving ownership to there. + const std::unique_ptr async_resolver_factory_; std::unique_ptr port_allocator_; // TODO(bugs.webrtc.org/9987): Accessed on both // signaling and network thread. @@ -646,8 +653,7 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr call_ RTC_GUARDED_BY(worker_thread()); ScopedTaskSafety signaling_thread_safety_; rtc::scoped_refptr network_thread_safety_; - std::unique_ptr call_safety_ - RTC_GUARDED_BY(worker_thread()); + rtc::scoped_refptr worker_thread_safety_; // Points to the same thing as `call_`. Since it's const, we may read the // pointer from any thread. @@ -681,7 +687,7 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr sdp_handler_ RTC_GUARDED_BY(signaling_thread()); - bool dtls_enabled_ RTC_GUARDED_BY(signaling_thread()) = false; + const bool dtls_enabled_; UsagePattern usage_pattern_ RTC_GUARDED_BY(signaling_thread()); bool return_histogram_very_quickly_ RTC_GUARDED_BY(signaling_thread()) = diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 9fa4188e10..8588ca8dbf 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -2794,7 +2794,7 @@ bool SdpOfferAnswerHandler::IceRestartPending( bool SdpOfferAnswerHandler::NeedsIceRestart( const std::string& content_name) const { - return transport_controller()->NeedsIceRestart(content_name); + return pc_->NeedsIceRestart(content_name); } absl::optional SdpOfferAnswerHandler::GetDtlsRole( diff --git a/test/peer_scenario/scenario_connection.cc b/test/peer_scenario/scenario_connection.cc index 8e5b3162cb..fefaa00c72 100644 --- a/test/peer_scenario/scenario_connection.cc +++ b/test/peer_scenario/scenario_connection.cc @@ -97,8 +97,7 @@ ScenarioIceConnectionImpl::ScenarioIceConnectionImpl( port_allocator_( new cricket::BasicPortAllocator(manager_->network_manager())), jsep_controller_( - new JsepTransportController(signaling_thread_, - network_thread_, + new JsepTransportController(network_thread_, port_allocator_.get(), /*async_resolver_factory*/ nullptr, CreateJsepConfig())) {