diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 0da5150faa..2dcbd91363 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -255,7 +255,9 @@ rtc_library("peerconnection") { "../rtc_base", "../rtc_base:checks", "../rtc_base:rtc_base_approved", + "../rtc_base:rtc_operations_chain", "../rtc_base:safe_minmax", + "../rtc_base:weak_ptr", "../rtc_base/experiments:field_trial_parser", "../rtc_base/system:fallthrough", "../rtc_base/system:file_wrapper", diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 494a649e8f..ffeb260f2b 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -647,6 +647,49 @@ const ContentInfo* FindTransceiverMSection( : nullptr; } +// Wraps a CreateSessionDescriptionObserver and an OperationsChain operation +// complete callback. When the observer is invoked, the wrapped observer is +// invoked followed by invoking the completion callback. +class CreateSessionDescriptionObserverOperationWrapper + : public CreateSessionDescriptionObserver { + public: + CreateSessionDescriptionObserverOperationWrapper( + rtc::scoped_refptr observer, + std::function operation_complete_callback) + : observer_(std::move(observer)), + operation_complete_callback_(std::move(operation_complete_callback)) { + RTC_DCHECK(observer_); + } + ~CreateSessionDescriptionObserverOperationWrapper() override { + RTC_DCHECK(was_called_); + } + + void OnSuccess(SessionDescriptionInterface* desc) override { + RTC_DCHECK(!was_called_); +#ifdef RTC_DCHECK_IS_ON + was_called_ = true; +#endif // RTC_DCHECK_IS_ON + observer_->OnSuccess(desc); + operation_complete_callback_(); + } + + void OnFailure(RTCError error) override { + RTC_DCHECK(!was_called_); +#ifdef RTC_DCHECK_IS_ON + was_called_ = true; +#endif // RTC_DCHECK_IS_ON + observer_->OnFailure(std::move(error)); + operation_complete_callback_(); + } + + private: +#ifdef RTC_DCHECK_IS_ON + bool was_called_ = false; +#endif // RTC_DCHECK_IS_ON + rtc::scoped_refptr observer_; + std::function operation_complete_callback_; +}; + } // namespace class PeerConnection::LocalIceCredentialsToReplace { @@ -892,6 +935,7 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, : factory_(factory), event_log_(std::move(event_log)), event_log_ptr_(event_log_.get()), + operations_chain_(rtc::OperationsChain::Create()), datagram_transport_config_( field_trial::FindFullName(kDatagramTransportFieldTrial)), datagram_transport_data_channel_config_( @@ -902,12 +946,15 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, call_(std::move(call)), call_ptr_(call_.get()), data_channel_transport_(nullptr), - local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()) {} + local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()), + weak_ptr_factory_(this) {} PeerConnection::~PeerConnection() { TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection"); RTC_DCHECK_RUN_ON(signaling_thread()); + weak_ptr_factory_.InvalidateWeakPtrs(); + // Need to stop transceivers before destroying the stats collector because // AudioRtpSender has a reference to the StatsCollector it will update when // stopping. @@ -944,6 +991,23 @@ PeerConnection::~PeerConnection() { // The event log must outlive call (and any other object that uses it). event_log_.reset(); }); + + // Process all pending notifications in the message queue. If we don't do + // this, requests will linger and not know they succeeded or failed. + rtc::MessageList list; + signaling_thread()->Clear(this, rtc::MQID_ANY, &list); + for (auto& msg : list) { + if (msg.message_id == MSG_CREATE_SESSIONDESCRIPTION_FAILED) { + // Processing CreateOffer() and CreateAnswer() messages ensures their + // observers are invoked even if the PeerConnection is destroyed early. + OnMessage(&msg); + } else { + // TODO(hbos): Consider processing all pending messages. This would mean + // that SetLocalDescription() and SetRemoteDescription() observers are + // informed of successes and failures; this is currently NOT the case. + delete msg.pdata; + } + } } void PeerConnection::DestroyAllChannels() { @@ -2050,7 +2114,37 @@ void PeerConnection::RestartIce() { void PeerConnection::CreateOffer(CreateSessionDescriptionObserver* observer, const RTCOfferAnswerOptions& options) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::CreateOffer"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + options](std::function operations_chain_callback) { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + observer_refptr->OnFailure( + RTCError(RTCErrorType::INTERNAL_ERROR, + "CreateOffer failed because the session was shut down")); + operations_chain_callback(); + return; + } + // The operation completes asynchronously when the wrapper is invoked. + rtc::scoped_refptr + observer_wrapper(new rtc::RefCountedObject< + CreateSessionDescriptionObserverOperationWrapper>( + std::move(observer_refptr), + std::move(operations_chain_callback))); + this_weak_ptr->DoCreateOffer(options, observer_wrapper); + }); +} + +void PeerConnection::DoCreateOffer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoCreateOffer"); if (!observer) { RTC_LOG(LS_ERROR) << "CreateOffer - observer is NULL."; @@ -2176,7 +2270,37 @@ PeerConnection::GetReceivingTransceiversOfType(cricket::MediaType media_type) { void PeerConnection::CreateAnswer(CreateSessionDescriptionObserver* observer, const RTCOfferAnswerOptions& options) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::CreateAnswer"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + options](std::function operations_chain_callback) { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + observer_refptr->OnFailure(RTCError( + RTCErrorType::INTERNAL_ERROR, + "CreateAnswer failed because the session was shut down")); + operations_chain_callback(); + return; + } + // The operation completes asynchronously when the wrapper is invoked. + rtc::scoped_refptr + observer_wrapper(new rtc::RefCountedObject< + CreateSessionDescriptionObserverOperationWrapper>( + std::move(observer_refptr), + std::move(operations_chain_callback))); + this_weak_ptr->DoCreateAnswer(options, observer_wrapper); + }); +} + +void PeerConnection::DoCreateAnswer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoCreateAnswer"); if (!observer) { RTC_LOG(LS_ERROR) << "CreateAnswer - observer is NULL."; return; @@ -2230,13 +2354,44 @@ void PeerConnection::SetLocalDescription( SetSessionDescriptionObserver* observer, SessionDescriptionInterface* desc_ptr) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::SetLocalDescription"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + desc = std::unique_ptr(desc_ptr)]( + std::function operations_chain_callback) mutable { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + // For consistency with DoSetLocalDescription(), we DO NOT inform the + // |observer_refptr| that the operation failed in this case. + // TODO(hbos): If/when we process SLD messages in ~PeerConnection, + // the consistent thing would be to inform the observer here. + operations_chain_callback(); + return; + } + this_weak_ptr->DoSetLocalDescription(std::move(desc), + std::move(observer_refptr)); + // DoSetLocalDescription() is currently implemented as a synchronous + // operation but where the |observer|'s callbacks are invoked + // asynchronously in a post to OnMessage(). + // For backwards-compatability reasons, we declare the operation as + // completed here (rather than in OnMessage()). This ensures that: + // - This operation is not keeping the PeerConnection alive past this + // point. + // - Subsequent offer/answer operations can start immediately (without + // waiting for OnMessage()). + operations_chain_callback(); + }); +} - // The SetLocalDescription contract is that we take ownership of the session - // description regardless of the outcome, so wrap it in a unique_ptr right - // away. Ideally, SetLocalDescription's signature will be changed to take the - // description as a unique_ptr argument to formalize this agreement. - std::unique_ptr desc(desc_ptr); +void PeerConnection::DoSetLocalDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoSetLocalDescription"); if (!observer) { RTC_LOG(LS_ERROR) << "SetLocalDescription - observer is NULL."; @@ -2617,18 +2772,83 @@ void PeerConnection::FillInMissingRemoteMids( void PeerConnection::SetRemoteDescription( SetSessionDescriptionObserver* observer, - SessionDescriptionInterface* desc) { - SetRemoteDescription( - std::unique_ptr(desc), - rtc::scoped_refptr( - new SetRemoteDescriptionObserverAdapter(this, observer))); + SessionDescriptionInterface* desc_ptr) { + RTC_DCHECK_RUN_ON(signaling_thread()); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + desc = std::unique_ptr(desc_ptr)]( + std::function operations_chain_callback) mutable { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + // For consistency with SetRemoteDescriptionObserverAdapter, we DO NOT + // inform the |observer_refptr| that the operation failed in this + // case. + // TODO(hbos): If/when we process SRD messages in ~PeerConnection, + // the consistent thing would be to inform the observer here. + operations_chain_callback(); + return; + } + this_weak_ptr->DoSetRemoteDescription( + std::move(desc), + rtc::scoped_refptr( + new SetRemoteDescriptionObserverAdapter( + this_weak_ptr.get(), std::move(observer_refptr)))); + // DoSetRemoteDescription() is currently implemented as a synchronous + // operation but where SetRemoteDescriptionObserverAdapter ensures that + // the |observer|'s callbacks are invoked asynchronously in a post to + // OnMessage(). + // For backwards-compatability reasons, we declare the operation as + // completed here (rather than in OnMessage()). This ensures that: + // - This operation is not keeping the PeerConnection alive past this + // point. + // - Subsequent offer/answer operations can start immediately (without + // waiting for OnMessage()). + operations_chain_callback(); + }); } void PeerConnection::SetRemoteDescription( std::unique_ptr desc, rtc::scoped_refptr observer) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::SetRemoteDescription"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), observer, + desc = std::move(desc)]( + std::function operations_chain_callback) mutable { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + // For consistency with DoSetRemoteDescription(), we DO inform the + // |observer| that the operation failed in this case. + observer->OnSetRemoteDescriptionComplete(RTCError( + RTCErrorType::INVALID_STATE, + "Failed to set remote offer sdp: failed because the session was " + "shut down")); + operations_chain_callback(); + return; + } + this_weak_ptr->DoSetRemoteDescription(std::move(desc), + std::move(observer)); + // DoSetRemoteDescription() is currently implemented as a synchronous + // operation. The |observer| will already have been informed that it + // completed, and we can mark this operation as complete without any + // loose ends. + operations_chain_callback(); + }); +} + +void PeerConnection::DoSetRemoteDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoSetRemoteDescription"); if (!observer) { RTC_LOG(LS_ERROR) << "SetRemoteDescription - observer is NULL."; diff --git a/pc/peer_connection.h b/pc/peer_connection.h index baaa14d0bd..7a576f310b 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -34,8 +34,10 @@ #include "pc/stream_collection.h" #include "pc/webrtc_session_description_factory.h" #include "rtc_base/experiments/field_trial_parser.h" +#include "rtc_base/operations_chain.h" #include "rtc_base/race_checker.h" #include "rtc_base/unique_id_generator.h" +#include "rtc_base/weak_ptr.h" namespace webrtc { @@ -443,6 +445,22 @@ class PeerConnection : public PeerConnectionInternal, rtc::scoped_refptr> GetFirstAudioTransceiver() const RTC_RUN_ON(signaling_thread()); + // Implementation of the offer/answer exchange operations. These are chained + // onto the |operations_chain_| when the public CreateOffer(), CreateAnswer(), + // SetLocalDescription() and SetRemoteDescription() methods are invoked. + void DoCreateOffer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer); + void DoCreateAnswer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer); + void DoSetLocalDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer); + void DoSetRemoteDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer); + void CreateAudioReceiver(MediaStreamInterface* stream, const RtpSenderInfo& remote_sender_info) RTC_RUN_ON(signaling_thread()); @@ -1217,6 +1235,14 @@ class PeerConnection : public PeerConnectionInternal, // pointer (but not touch the object) from any thread. RtcEventLog* const event_log_ptr_ RTC_PT_GUARDED_BY(worker_thread()); + // The operations chain is used by the offer/answer exchange methods to ensure + // they are executed in the right order. For example, if + // SetRemoteDescription() is invoked while CreateOffer() is still pending, the + // SRD operation will not start until CreateOffer() has completed. See + // https://w3c.github.io/webrtc-pc/#dfn-operations-chain. + rtc::scoped_refptr operations_chain_ + RTC_GUARDED_BY(signaling_thread()); + SignalingState signaling_state_ RTC_GUARDED_BY(signaling_thread()) = kStable; IceConnectionState ice_connection_state_ RTC_GUARDED_BY(signaling_thread()) = kIceConnectionNew; @@ -1446,6 +1472,9 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr local_ice_credentials_to_replace_ RTC_GUARDED_BY(signaling_thread()); bool is_negotiation_needed_ RTC_GUARDED_BY(signaling_thread()) = false; + + rtc::WeakPtrFactory weak_ptr_factory_ + RTC_GUARDED_BY(signaling_thread()); }; } // namespace webrtc diff --git a/pc/peer_connection_signaling_unittest.cc b/pc/peer_connection_signaling_unittest.cc index 9916539431..f544b4a788 100644 --- a/pc/peer_connection_signaling_unittest.cc +++ b/pc/peer_connection_signaling_unittest.cc @@ -41,6 +41,10 @@ using ::testing::Bool; using ::testing::Combine; using ::testing::Values; +namespace { +const int64_t kWaitTimeout = 10000; +} // namespace + class PeerConnectionWrapperForSignalingTest : public PeerConnectionWrapper { public: using PeerConnectionWrapper::PeerConnectionWrapper; @@ -522,6 +526,63 @@ TEST_P(PeerConnectionSignalingTest, CreateOffersAndShutdown) { } } +// Similar to the above test, but by closing the PC first the CreateOffer() will +// fail "early", which triggers a codepath where the PeerConnection is +// reponsible for invoking the observer, instead of the normal codepath where +// the WebRtcSessionDescriptionFactory is responsible for it. +TEST_P(PeerConnectionSignalingTest, CloseCreateOfferAndShutdown) { + auto caller = CreatePeerConnection(); + rtc::scoped_refptr observer = + new rtc::RefCountedObject(); + caller->pc()->Close(); + caller->pc()->CreateOffer(observer, RTCOfferAnswerOptions()); + caller.reset(nullptr); + EXPECT_TRUE(observer->called()); +} + +TEST_P(PeerConnectionSignalingTest, SetRemoteDescriptionExecutesImmediately) { + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnection(); + + // This offer will cause receivers to be created. + auto offer = caller->CreateOffer(RTCOfferAnswerOptions()); + + // By not waiting for the observer's callback we can verify that the operation + // executed immediately. + callee->pc()->SetRemoteDescription(std::move(offer), + new MockSetRemoteDescriptionObserver()); + EXPECT_EQ(2u, callee->pc()->GetReceivers().size()); +} + +TEST_P(PeerConnectionSignalingTest, CreateOfferBlocksSetRemoteDescription) { + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnection(); + + // This offer will cause receivers to be created. + auto offer = caller->CreateOffer(RTCOfferAnswerOptions()); + + EXPECT_EQ(0u, callee->pc()->GetReceivers().size()); + rtc::scoped_refptr offer_observer( + new rtc::RefCountedObject()); + // Synchronously invoke CreateOffer() and SetRemoteDescription(). The + // SetRemoteDescription() operation should be chained to be executed + // asynchronously, when CreateOffer() completes. + callee->pc()->CreateOffer(offer_observer, RTCOfferAnswerOptions()); + callee->pc()->SetRemoteDescription(std::move(offer), + new MockSetRemoteDescriptionObserver()); + // CreateOffer() is asynchronous; without message processing this operation + // should not have completed. + EXPECT_FALSE(offer_observer->called()); + // Due to chaining, the receivers should not have been created by the offer + // yet. + EXPECT_EQ(0u, callee->pc()->GetReceivers().size()); + // EXPECT_EQ_WAIT causes messages to be processed... + EXPECT_EQ_WAIT(true, offer_observer->called(), kWaitTimeout); + // Now that the offer has been completed, SetRemoteDescription() will have + // been executed next in the chain. + EXPECT_EQ(2u, callee->pc()->GetReceivers().size()); +} + INSTANTIATE_TEST_SUITE_P(PeerConnectionSignalingTest, PeerConnectionSignalingTest, Values(SdpSemantics::kPlanB,