From 1dddaa1a84330091ca083c950ef2e24a85a48fc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Bostr=C3=B6m?= Date: Thu, 24 Oct 2019 12:20:01 +0200 Subject: [PATCH] [PeerConnection] Use an OperationsChain in PeerConnection for async ops. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For background, motivation, requirements and implementation notes, see https://docs.google.com/document/d/1XLwNN2kUIGGTwz9LQ0NwJNkcybi9oKnynUEZB1jGA14/edit?usp=sharing Using the OperationsChain will unblock future CLs from chaining multiple operations together such as implementing parameterless setLocalDescription(). In this CL, the OperationsChain is used in existing signaling operations with little intended side-effects. An operation that is chained onto an empty OperationsChain will for instance execute immediately, and SetLocalDescription() and SetRemoteDescription() are implemented as "synchronous operations". The lifetime of the PeerConnection is not indended to change as a result of this CL: All chained operations use a raw pointer to the PC that is ensured not to be used-after-free using an "IsAlive" object. There is one notable change though: CreateOffer() and CreateAnswer() will asynchronously delay other signaling methods from executing until they have completed. Drive-by fix: This CL also ensures that early failing CreateOffer/CreateAnswer operation's observers are invoked if the PeerConnection is destroyed while a PostCreateSessionDescriptionFailure is pending. Bug: webrtc:11019 Change-Id: I521333e41d20d9bbfb1e721609f2c9db2a5f93a9 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/157305 Reviewed-by: Steve Anton Commit-Queue: Henrik Boström Cr-Commit-Position: refs/heads/master@{#29605} --- pc/BUILD.gn | 2 + pc/peer_connection.cc | 250 +++++++++++++++++++++-- pc/peer_connection.h | 29 +++ pc/peer_connection_signaling_unittest.cc | 61 ++++++ 4 files changed, 327 insertions(+), 15 deletions(-) diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 0da5150faa..2dcbd91363 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -255,7 +255,9 @@ rtc_library("peerconnection") { "../rtc_base", "../rtc_base:checks", "../rtc_base:rtc_base_approved", + "../rtc_base:rtc_operations_chain", "../rtc_base:safe_minmax", + "../rtc_base:weak_ptr", "../rtc_base/experiments:field_trial_parser", "../rtc_base/system:fallthrough", "../rtc_base/system:file_wrapper", diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 494a649e8f..ffeb260f2b 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -647,6 +647,49 @@ const ContentInfo* FindTransceiverMSection( : nullptr; } +// Wraps a CreateSessionDescriptionObserver and an OperationsChain operation +// complete callback. When the observer is invoked, the wrapped observer is +// invoked followed by invoking the completion callback. +class CreateSessionDescriptionObserverOperationWrapper + : public CreateSessionDescriptionObserver { + public: + CreateSessionDescriptionObserverOperationWrapper( + rtc::scoped_refptr observer, + std::function operation_complete_callback) + : observer_(std::move(observer)), + operation_complete_callback_(std::move(operation_complete_callback)) { + RTC_DCHECK(observer_); + } + ~CreateSessionDescriptionObserverOperationWrapper() override { + RTC_DCHECK(was_called_); + } + + void OnSuccess(SessionDescriptionInterface* desc) override { + RTC_DCHECK(!was_called_); +#ifdef RTC_DCHECK_IS_ON + was_called_ = true; +#endif // RTC_DCHECK_IS_ON + observer_->OnSuccess(desc); + operation_complete_callback_(); + } + + void OnFailure(RTCError error) override { + RTC_DCHECK(!was_called_); +#ifdef RTC_DCHECK_IS_ON + was_called_ = true; +#endif // RTC_DCHECK_IS_ON + observer_->OnFailure(std::move(error)); + operation_complete_callback_(); + } + + private: +#ifdef RTC_DCHECK_IS_ON + bool was_called_ = false; +#endif // RTC_DCHECK_IS_ON + rtc::scoped_refptr observer_; + std::function operation_complete_callback_; +}; + } // namespace class PeerConnection::LocalIceCredentialsToReplace { @@ -892,6 +935,7 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, : factory_(factory), event_log_(std::move(event_log)), event_log_ptr_(event_log_.get()), + operations_chain_(rtc::OperationsChain::Create()), datagram_transport_config_( field_trial::FindFullName(kDatagramTransportFieldTrial)), datagram_transport_data_channel_config_( @@ -902,12 +946,15 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, call_(std::move(call)), call_ptr_(call_.get()), data_channel_transport_(nullptr), - local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()) {} + local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()), + weak_ptr_factory_(this) {} PeerConnection::~PeerConnection() { TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection"); RTC_DCHECK_RUN_ON(signaling_thread()); + weak_ptr_factory_.InvalidateWeakPtrs(); + // Need to stop transceivers before destroying the stats collector because // AudioRtpSender has a reference to the StatsCollector it will update when // stopping. @@ -944,6 +991,23 @@ PeerConnection::~PeerConnection() { // The event log must outlive call (and any other object that uses it). event_log_.reset(); }); + + // Process all pending notifications in the message queue. If we don't do + // this, requests will linger and not know they succeeded or failed. + rtc::MessageList list; + signaling_thread()->Clear(this, rtc::MQID_ANY, &list); + for (auto& msg : list) { + if (msg.message_id == MSG_CREATE_SESSIONDESCRIPTION_FAILED) { + // Processing CreateOffer() and CreateAnswer() messages ensures their + // observers are invoked even if the PeerConnection is destroyed early. + OnMessage(&msg); + } else { + // TODO(hbos): Consider processing all pending messages. This would mean + // that SetLocalDescription() and SetRemoteDescription() observers are + // informed of successes and failures; this is currently NOT the case. + delete msg.pdata; + } + } } void PeerConnection::DestroyAllChannels() { @@ -2050,7 +2114,37 @@ void PeerConnection::RestartIce() { void PeerConnection::CreateOffer(CreateSessionDescriptionObserver* observer, const RTCOfferAnswerOptions& options) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::CreateOffer"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + options](std::function operations_chain_callback) { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + observer_refptr->OnFailure( + RTCError(RTCErrorType::INTERNAL_ERROR, + "CreateOffer failed because the session was shut down")); + operations_chain_callback(); + return; + } + // The operation completes asynchronously when the wrapper is invoked. + rtc::scoped_refptr + observer_wrapper(new rtc::RefCountedObject< + CreateSessionDescriptionObserverOperationWrapper>( + std::move(observer_refptr), + std::move(operations_chain_callback))); + this_weak_ptr->DoCreateOffer(options, observer_wrapper); + }); +} + +void PeerConnection::DoCreateOffer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoCreateOffer"); if (!observer) { RTC_LOG(LS_ERROR) << "CreateOffer - observer is NULL."; @@ -2176,7 +2270,37 @@ PeerConnection::GetReceivingTransceiversOfType(cricket::MediaType media_type) { void PeerConnection::CreateAnswer(CreateSessionDescriptionObserver* observer, const RTCOfferAnswerOptions& options) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::CreateAnswer"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + options](std::function operations_chain_callback) { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + observer_refptr->OnFailure(RTCError( + RTCErrorType::INTERNAL_ERROR, + "CreateAnswer failed because the session was shut down")); + operations_chain_callback(); + return; + } + // The operation completes asynchronously when the wrapper is invoked. + rtc::scoped_refptr + observer_wrapper(new rtc::RefCountedObject< + CreateSessionDescriptionObserverOperationWrapper>( + std::move(observer_refptr), + std::move(operations_chain_callback))); + this_weak_ptr->DoCreateAnswer(options, observer_wrapper); + }); +} + +void PeerConnection::DoCreateAnswer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoCreateAnswer"); if (!observer) { RTC_LOG(LS_ERROR) << "CreateAnswer - observer is NULL."; return; @@ -2230,13 +2354,44 @@ void PeerConnection::SetLocalDescription( SetSessionDescriptionObserver* observer, SessionDescriptionInterface* desc_ptr) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::SetLocalDescription"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + desc = std::unique_ptr(desc_ptr)]( + std::function operations_chain_callback) mutable { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + // For consistency with DoSetLocalDescription(), we DO NOT inform the + // |observer_refptr| that the operation failed in this case. + // TODO(hbos): If/when we process SLD messages in ~PeerConnection, + // the consistent thing would be to inform the observer here. + operations_chain_callback(); + return; + } + this_weak_ptr->DoSetLocalDescription(std::move(desc), + std::move(observer_refptr)); + // DoSetLocalDescription() is currently implemented as a synchronous + // operation but where the |observer|'s callbacks are invoked + // asynchronously in a post to OnMessage(). + // For backwards-compatability reasons, we declare the operation as + // completed here (rather than in OnMessage()). This ensures that: + // - This operation is not keeping the PeerConnection alive past this + // point. + // - Subsequent offer/answer operations can start immediately (without + // waiting for OnMessage()). + operations_chain_callback(); + }); +} - // The SetLocalDescription contract is that we take ownership of the session - // description regardless of the outcome, so wrap it in a unique_ptr right - // away. Ideally, SetLocalDescription's signature will be changed to take the - // description as a unique_ptr argument to formalize this agreement. - std::unique_ptr desc(desc_ptr); +void PeerConnection::DoSetLocalDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoSetLocalDescription"); if (!observer) { RTC_LOG(LS_ERROR) << "SetLocalDescription - observer is NULL."; @@ -2617,18 +2772,83 @@ void PeerConnection::FillInMissingRemoteMids( void PeerConnection::SetRemoteDescription( SetSessionDescriptionObserver* observer, - SessionDescriptionInterface* desc) { - SetRemoteDescription( - std::unique_ptr(desc), - rtc::scoped_refptr( - new SetRemoteDescriptionObserverAdapter(this, observer))); + SessionDescriptionInterface* desc_ptr) { + RTC_DCHECK_RUN_ON(signaling_thread()); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + observer_refptr = + rtc::scoped_refptr(observer), + desc = std::unique_ptr(desc_ptr)]( + std::function operations_chain_callback) mutable { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + // For consistency with SetRemoteDescriptionObserverAdapter, we DO NOT + // inform the |observer_refptr| that the operation failed in this + // case. + // TODO(hbos): If/when we process SRD messages in ~PeerConnection, + // the consistent thing would be to inform the observer here. + operations_chain_callback(); + return; + } + this_weak_ptr->DoSetRemoteDescription( + std::move(desc), + rtc::scoped_refptr( + new SetRemoteDescriptionObserverAdapter( + this_weak_ptr.get(), std::move(observer_refptr)))); + // DoSetRemoteDescription() is currently implemented as a synchronous + // operation but where SetRemoteDescriptionObserverAdapter ensures that + // the |observer|'s callbacks are invoked asynchronously in a post to + // OnMessage(). + // For backwards-compatability reasons, we declare the operation as + // completed here (rather than in OnMessage()). This ensures that: + // - This operation is not keeping the PeerConnection alive past this + // point. + // - Subsequent offer/answer operations can start immediately (without + // waiting for OnMessage()). + operations_chain_callback(); + }); } void PeerConnection::SetRemoteDescription( std::unique_ptr desc, rtc::scoped_refptr observer) { RTC_DCHECK_RUN_ON(signaling_thread()); - TRACE_EVENT0("webrtc", "PeerConnection::SetRemoteDescription"); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), observer, + desc = std::move(desc)]( + std::function operations_chain_callback) mutable { + // Abort early if |this_weak_ptr| is no longer valid. + if (!this_weak_ptr) { + // For consistency with DoSetRemoteDescription(), we DO inform the + // |observer| that the operation failed in this case. + observer->OnSetRemoteDescriptionComplete(RTCError( + RTCErrorType::INVALID_STATE, + "Failed to set remote offer sdp: failed because the session was " + "shut down")); + operations_chain_callback(); + return; + } + this_weak_ptr->DoSetRemoteDescription(std::move(desc), + std::move(observer)); + // DoSetRemoteDescription() is currently implemented as a synchronous + // operation. The |observer| will already have been informed that it + // completed, and we can mark this operation as complete without any + // loose ends. + operations_chain_callback(); + }); +} + +void PeerConnection::DoSetRemoteDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer) { + RTC_DCHECK_RUN_ON(signaling_thread()); + TRACE_EVENT0("webrtc", "PeerConnection::DoSetRemoteDescription"); if (!observer) { RTC_LOG(LS_ERROR) << "SetRemoteDescription - observer is NULL."; diff --git a/pc/peer_connection.h b/pc/peer_connection.h index baaa14d0bd..7a576f310b 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -34,8 +34,10 @@ #include "pc/stream_collection.h" #include "pc/webrtc_session_description_factory.h" #include "rtc_base/experiments/field_trial_parser.h" +#include "rtc_base/operations_chain.h" #include "rtc_base/race_checker.h" #include "rtc_base/unique_id_generator.h" +#include "rtc_base/weak_ptr.h" namespace webrtc { @@ -443,6 +445,22 @@ class PeerConnection : public PeerConnectionInternal, rtc::scoped_refptr> GetFirstAudioTransceiver() const RTC_RUN_ON(signaling_thread()); + // Implementation of the offer/answer exchange operations. These are chained + // onto the |operations_chain_| when the public CreateOffer(), CreateAnswer(), + // SetLocalDescription() and SetRemoteDescription() methods are invoked. + void DoCreateOffer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer); + void DoCreateAnswer( + const RTCOfferAnswerOptions& options, + rtc::scoped_refptr observer); + void DoSetLocalDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer); + void DoSetRemoteDescription( + std::unique_ptr desc, + rtc::scoped_refptr observer); + void CreateAudioReceiver(MediaStreamInterface* stream, const RtpSenderInfo& remote_sender_info) RTC_RUN_ON(signaling_thread()); @@ -1217,6 +1235,14 @@ class PeerConnection : public PeerConnectionInternal, // pointer (but not touch the object) from any thread. RtcEventLog* const event_log_ptr_ RTC_PT_GUARDED_BY(worker_thread()); + // The operations chain is used by the offer/answer exchange methods to ensure + // they are executed in the right order. For example, if + // SetRemoteDescription() is invoked while CreateOffer() is still pending, the + // SRD operation will not start until CreateOffer() has completed. See + // https://w3c.github.io/webrtc-pc/#dfn-operations-chain. + rtc::scoped_refptr operations_chain_ + RTC_GUARDED_BY(signaling_thread()); + SignalingState signaling_state_ RTC_GUARDED_BY(signaling_thread()) = kStable; IceConnectionState ice_connection_state_ RTC_GUARDED_BY(signaling_thread()) = kIceConnectionNew; @@ -1446,6 +1472,9 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr local_ice_credentials_to_replace_ RTC_GUARDED_BY(signaling_thread()); bool is_negotiation_needed_ RTC_GUARDED_BY(signaling_thread()) = false; + + rtc::WeakPtrFactory weak_ptr_factory_ + RTC_GUARDED_BY(signaling_thread()); }; } // namespace webrtc diff --git a/pc/peer_connection_signaling_unittest.cc b/pc/peer_connection_signaling_unittest.cc index 9916539431..f544b4a788 100644 --- a/pc/peer_connection_signaling_unittest.cc +++ b/pc/peer_connection_signaling_unittest.cc @@ -41,6 +41,10 @@ using ::testing::Bool; using ::testing::Combine; using ::testing::Values; +namespace { +const int64_t kWaitTimeout = 10000; +} // namespace + class PeerConnectionWrapperForSignalingTest : public PeerConnectionWrapper { public: using PeerConnectionWrapper::PeerConnectionWrapper; @@ -522,6 +526,63 @@ TEST_P(PeerConnectionSignalingTest, CreateOffersAndShutdown) { } } +// Similar to the above test, but by closing the PC first the CreateOffer() will +// fail "early", which triggers a codepath where the PeerConnection is +// reponsible for invoking the observer, instead of the normal codepath where +// the WebRtcSessionDescriptionFactory is responsible for it. +TEST_P(PeerConnectionSignalingTest, CloseCreateOfferAndShutdown) { + auto caller = CreatePeerConnection(); + rtc::scoped_refptr observer = + new rtc::RefCountedObject(); + caller->pc()->Close(); + caller->pc()->CreateOffer(observer, RTCOfferAnswerOptions()); + caller.reset(nullptr); + EXPECT_TRUE(observer->called()); +} + +TEST_P(PeerConnectionSignalingTest, SetRemoteDescriptionExecutesImmediately) { + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnection(); + + // This offer will cause receivers to be created. + auto offer = caller->CreateOffer(RTCOfferAnswerOptions()); + + // By not waiting for the observer's callback we can verify that the operation + // executed immediately. + callee->pc()->SetRemoteDescription(std::move(offer), + new MockSetRemoteDescriptionObserver()); + EXPECT_EQ(2u, callee->pc()->GetReceivers().size()); +} + +TEST_P(PeerConnectionSignalingTest, CreateOfferBlocksSetRemoteDescription) { + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnection(); + + // This offer will cause receivers to be created. + auto offer = caller->CreateOffer(RTCOfferAnswerOptions()); + + EXPECT_EQ(0u, callee->pc()->GetReceivers().size()); + rtc::scoped_refptr offer_observer( + new rtc::RefCountedObject()); + // Synchronously invoke CreateOffer() and SetRemoteDescription(). The + // SetRemoteDescription() operation should be chained to be executed + // asynchronously, when CreateOffer() completes. + callee->pc()->CreateOffer(offer_observer, RTCOfferAnswerOptions()); + callee->pc()->SetRemoteDescription(std::move(offer), + new MockSetRemoteDescriptionObserver()); + // CreateOffer() is asynchronous; without message processing this operation + // should not have completed. + EXPECT_FALSE(offer_observer->called()); + // Due to chaining, the receivers should not have been created by the offer + // yet. + EXPECT_EQ(0u, callee->pc()->GetReceivers().size()); + // EXPECT_EQ_WAIT causes messages to be processed... + EXPECT_EQ_WAIT(true, offer_observer->called(), kWaitTimeout); + // Now that the offer has been completed, SetRemoteDescription() will have + // been executed next in the chain. + EXPECT_EQ(2u, callee->pc()->GetReceivers().size()); +} + INSTANTIATE_TEST_SUITE_P(PeerConnectionSignalingTest, PeerConnectionSignalingTest, Values(SdpSemantics::kPlanB,