[PeerConnection] Use an OperationsChain in PeerConnection for async ops.

For background, motivation, requirements and implementation notes, see
https://docs.google.com/document/d/1XLwNN2kUIGGTwz9LQ0NwJNkcybi9oKnynUEZB1jGA14/edit?usp=sharing

Using the OperationsChain will unblock future CLs from chaining multiple
operations together such as implementing parameterless
setLocalDescription().

In this CL, the OperationsChain is used in existing signaling operations
with little intended side-effects. An operation that is chained onto an
empty OperationsChain will for instance execute immediately, and
SetLocalDescription() and SetRemoteDescription() are implemented as
"synchronous operations".

The lifetime of the PeerConnection is not indended to change as a result
of this CL: All chained operations use a raw pointer to the PC that is
ensured not to be used-after-free using an "IsAlive" object.

There is one notable change though: CreateOffer() and CreateAnswer() will
asynchronously delay other signaling methods from executing until they
have completed.

Drive-by fix: This CL also ensures that early failing
CreateOffer/CreateAnswer operation's observers are invoked if the
PeerConnection is destroyed while a PostCreateSessionDescriptionFailure
is pending.

Bug: webrtc:11019
Change-Id: I521333e41d20d9bbfb1e721609f2c9db2a5f93a9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/157305
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29605}
This commit is contained in:
Henrik Boström 2019-10-24 12:20:01 +02:00 committed by Commit Bot
parent ef0e4d0438
commit 1dddaa1a84
4 changed files with 327 additions and 15 deletions

View File

@ -255,7 +255,9 @@ rtc_library("peerconnection") {
"../rtc_base",
"../rtc_base:checks",
"../rtc_base:rtc_base_approved",
"../rtc_base:rtc_operations_chain",
"../rtc_base:safe_minmax",
"../rtc_base:weak_ptr",
"../rtc_base/experiments:field_trial_parser",
"../rtc_base/system:fallthrough",
"../rtc_base/system:file_wrapper",

View File

@ -647,6 +647,49 @@ const ContentInfo* FindTransceiverMSection(
: nullptr;
}
// Wraps a CreateSessionDescriptionObserver and an OperationsChain operation
// complete callback. When the observer is invoked, the wrapped observer is
// invoked followed by invoking the completion callback.
class CreateSessionDescriptionObserverOperationWrapper
: public CreateSessionDescriptionObserver {
public:
CreateSessionDescriptionObserverOperationWrapper(
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer,
std::function<void()> operation_complete_callback)
: observer_(std::move(observer)),
operation_complete_callback_(std::move(operation_complete_callback)) {
RTC_DCHECK(observer_);
}
~CreateSessionDescriptionObserverOperationWrapper() override {
RTC_DCHECK(was_called_);
}
void OnSuccess(SessionDescriptionInterface* desc) override {
RTC_DCHECK(!was_called_);
#ifdef RTC_DCHECK_IS_ON
was_called_ = true;
#endif // RTC_DCHECK_IS_ON
observer_->OnSuccess(desc);
operation_complete_callback_();
}
void OnFailure(RTCError error) override {
RTC_DCHECK(!was_called_);
#ifdef RTC_DCHECK_IS_ON
was_called_ = true;
#endif // RTC_DCHECK_IS_ON
observer_->OnFailure(std::move(error));
operation_complete_callback_();
}
private:
#ifdef RTC_DCHECK_IS_ON
bool was_called_ = false;
#endif // RTC_DCHECK_IS_ON
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer_;
std::function<void()> operation_complete_callback_;
};
} // namespace
class PeerConnection::LocalIceCredentialsToReplace {
@ -892,6 +935,7 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory,
: factory_(factory),
event_log_(std::move(event_log)),
event_log_ptr_(event_log_.get()),
operations_chain_(rtc::OperationsChain::Create()),
datagram_transport_config_(
field_trial::FindFullName(kDatagramTransportFieldTrial)),
datagram_transport_data_channel_config_(
@ -902,12 +946,15 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory,
call_(std::move(call)),
call_ptr_(call_.get()),
data_channel_transport_(nullptr),
local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()) {}
local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()),
weak_ptr_factory_(this) {}
PeerConnection::~PeerConnection() {
TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection");
RTC_DCHECK_RUN_ON(signaling_thread());
weak_ptr_factory_.InvalidateWeakPtrs();
// Need to stop transceivers before destroying the stats collector because
// AudioRtpSender has a reference to the StatsCollector it will update when
// stopping.
@ -944,6 +991,23 @@ PeerConnection::~PeerConnection() {
// The event log must outlive call (and any other object that uses it).
event_log_.reset();
});
// Process all pending notifications in the message queue. If we don't do
// this, requests will linger and not know they succeeded or failed.
rtc::MessageList list;
signaling_thread()->Clear(this, rtc::MQID_ANY, &list);
for (auto& msg : list) {
if (msg.message_id == MSG_CREATE_SESSIONDESCRIPTION_FAILED) {
// Processing CreateOffer() and CreateAnswer() messages ensures their
// observers are invoked even if the PeerConnection is destroyed early.
OnMessage(&msg);
} else {
// TODO(hbos): Consider processing all pending messages. This would mean
// that SetLocalDescription() and SetRemoteDescription() observers are
// informed of successes and failures; this is currently NOT the case.
delete msg.pdata;
}
}
}
void PeerConnection::DestroyAllChannels() {
@ -2050,7 +2114,37 @@ void PeerConnection::RestartIce() {
void PeerConnection::CreateOffer(CreateSessionDescriptionObserver* observer,
const RTCOfferAnswerOptions& options) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::CreateOffer");
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<CreateSessionDescriptionObserver>(observer),
options](std::function<void()> operations_chain_callback) {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
observer_refptr->OnFailure(
RTCError(RTCErrorType::INTERNAL_ERROR,
"CreateOffer failed because the session was shut down"));
operations_chain_callback();
return;
}
// The operation completes asynchronously when the wrapper is invoked.
rtc::scoped_refptr<CreateSessionDescriptionObserverOperationWrapper>
observer_wrapper(new rtc::RefCountedObject<
CreateSessionDescriptionObserverOperationWrapper>(
std::move(observer_refptr),
std::move(operations_chain_callback)));
this_weak_ptr->DoCreateOffer(options, observer_wrapper);
});
}
void PeerConnection::DoCreateOffer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoCreateOffer");
if (!observer) {
RTC_LOG(LS_ERROR) << "CreateOffer - observer is NULL.";
@ -2176,7 +2270,37 @@ PeerConnection::GetReceivingTransceiversOfType(cricket::MediaType media_type) {
void PeerConnection::CreateAnswer(CreateSessionDescriptionObserver* observer,
const RTCOfferAnswerOptions& options) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::CreateAnswer");
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<CreateSessionDescriptionObserver>(observer),
options](std::function<void()> operations_chain_callback) {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
observer_refptr->OnFailure(RTCError(
RTCErrorType::INTERNAL_ERROR,
"CreateAnswer failed because the session was shut down"));
operations_chain_callback();
return;
}
// The operation completes asynchronously when the wrapper is invoked.
rtc::scoped_refptr<CreateSessionDescriptionObserverOperationWrapper>
observer_wrapper(new rtc::RefCountedObject<
CreateSessionDescriptionObserverOperationWrapper>(
std::move(observer_refptr),
std::move(operations_chain_callback)));
this_weak_ptr->DoCreateAnswer(options, observer_wrapper);
});
}
void PeerConnection::DoCreateAnswer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoCreateAnswer");
if (!observer) {
RTC_LOG(LS_ERROR) << "CreateAnswer - observer is NULL.";
return;
@ -2230,13 +2354,44 @@ void PeerConnection::SetLocalDescription(
SetSessionDescriptionObserver* observer,
SessionDescriptionInterface* desc_ptr) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::SetLocalDescription");
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<SetSessionDescriptionObserver>(observer),
desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)](
std::function<void()> operations_chain_callback) mutable {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
// For consistency with DoSetLocalDescription(), we DO NOT inform the
// |observer_refptr| that the operation failed in this case.
// TODO(hbos): If/when we process SLD messages in ~PeerConnection,
// the consistent thing would be to inform the observer here.
operations_chain_callback();
return;
}
this_weak_ptr->DoSetLocalDescription(std::move(desc),
std::move(observer_refptr));
// DoSetLocalDescription() is currently implemented as a synchronous
// operation but where the |observer|'s callbacks are invoked
// asynchronously in a post to OnMessage().
// For backwards-compatability reasons, we declare the operation as
// completed here (rather than in OnMessage()). This ensures that:
// - This operation is not keeping the PeerConnection alive past this
// point.
// - Subsequent offer/answer operations can start immediately (without
// waiting for OnMessage()).
operations_chain_callback();
});
}
// The SetLocalDescription contract is that we take ownership of the session
// description regardless of the outcome, so wrap it in a unique_ptr right
// away. Ideally, SetLocalDescription's signature will be changed to take the
// description as a unique_ptr argument to formalize this agreement.
std::unique_ptr<SessionDescriptionInterface> desc(desc_ptr);
void PeerConnection::DoSetLocalDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetSessionDescriptionObserver> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoSetLocalDescription");
if (!observer) {
RTC_LOG(LS_ERROR) << "SetLocalDescription - observer is NULL.";
@ -2617,18 +2772,83 @@ void PeerConnection::FillInMissingRemoteMids(
void PeerConnection::SetRemoteDescription(
SetSessionDescriptionObserver* observer,
SessionDescriptionInterface* desc) {
SetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface>(desc),
SessionDescriptionInterface* desc_ptr) {
RTC_DCHECK_RUN_ON(signaling_thread());
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<SetSessionDescriptionObserver>(observer),
desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)](
std::function<void()> operations_chain_callback) mutable {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
// For consistency with SetRemoteDescriptionObserverAdapter, we DO NOT
// inform the |observer_refptr| that the operation failed in this
// case.
// TODO(hbos): If/when we process SRD messages in ~PeerConnection,
// the consistent thing would be to inform the observer here.
operations_chain_callback();
return;
}
this_weak_ptr->DoSetRemoteDescription(
std::move(desc),
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface>(
new SetRemoteDescriptionObserverAdapter(this, observer)));
new SetRemoteDescriptionObserverAdapter(
this_weak_ptr.get(), std::move(observer_refptr))));
// DoSetRemoteDescription() is currently implemented as a synchronous
// operation but where SetRemoteDescriptionObserverAdapter ensures that
// the |observer|'s callbacks are invoked asynchronously in a post to
// OnMessage().
// For backwards-compatability reasons, we declare the operation as
// completed here (rather than in OnMessage()). This ensures that:
// - This operation is not keeping the PeerConnection alive past this
// point.
// - Subsequent offer/answer operations can start immediately (without
// waiting for OnMessage()).
operations_chain_callback();
});
}
void PeerConnection::SetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::SetRemoteDescription");
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), observer,
desc = std::move(desc)](
std::function<void()> operations_chain_callback) mutable {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
// For consistency with DoSetRemoteDescription(), we DO inform the
// |observer| that the operation failed in this case.
observer->OnSetRemoteDescriptionComplete(RTCError(
RTCErrorType::INVALID_STATE,
"Failed to set remote offer sdp: failed because the session was "
"shut down"));
operations_chain_callback();
return;
}
this_weak_ptr->DoSetRemoteDescription(std::move(desc),
std::move(observer));
// DoSetRemoteDescription() is currently implemented as a synchronous
// operation. The |observer| will already have been informed that it
// completed, and we can mark this operation as complete without any
// loose ends.
operations_chain_callback();
});
}
void PeerConnection::DoSetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoSetRemoteDescription");
if (!observer) {
RTC_LOG(LS_ERROR) << "SetRemoteDescription - observer is NULL.";

View File

@ -34,8 +34,10 @@
#include "pc/stream_collection.h"
#include "pc/webrtc_session_description_factory.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/operations_chain.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/unique_id_generator.h"
#include "rtc_base/weak_ptr.h"
namespace webrtc {
@ -443,6 +445,22 @@ class PeerConnection : public PeerConnectionInternal,
rtc::scoped_refptr<RtpTransceiverProxyWithInternal<RtpTransceiver>>
GetFirstAudioTransceiver() const RTC_RUN_ON(signaling_thread());
// Implementation of the offer/answer exchange operations. These are chained
// onto the |operations_chain_| when the public CreateOffer(), CreateAnswer(),
// SetLocalDescription() and SetRemoteDescription() methods are invoked.
void DoCreateOffer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer);
void DoCreateAnswer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer);
void DoSetLocalDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetSessionDescriptionObserver> observer);
void DoSetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer);
void CreateAudioReceiver(MediaStreamInterface* stream,
const RtpSenderInfo& remote_sender_info)
RTC_RUN_ON(signaling_thread());
@ -1217,6 +1235,14 @@ class PeerConnection : public PeerConnectionInternal,
// pointer (but not touch the object) from any thread.
RtcEventLog* const event_log_ptr_ RTC_PT_GUARDED_BY(worker_thread());
// The operations chain is used by the offer/answer exchange methods to ensure
// they are executed in the right order. For example, if
// SetRemoteDescription() is invoked while CreateOffer() is still pending, the
// SRD operation will not start until CreateOffer() has completed. See
// https://w3c.github.io/webrtc-pc/#dfn-operations-chain.
rtc::scoped_refptr<rtc::OperationsChain> operations_chain_
RTC_GUARDED_BY(signaling_thread());
SignalingState signaling_state_ RTC_GUARDED_BY(signaling_thread()) = kStable;
IceConnectionState ice_connection_state_ RTC_GUARDED_BY(signaling_thread()) =
kIceConnectionNew;
@ -1446,6 +1472,9 @@ class PeerConnection : public PeerConnectionInternal,
std::unique_ptr<LocalIceCredentialsToReplace>
local_ice_credentials_to_replace_ RTC_GUARDED_BY(signaling_thread());
bool is_negotiation_needed_ RTC_GUARDED_BY(signaling_thread()) = false;
rtc::WeakPtrFactory<PeerConnection> weak_ptr_factory_
RTC_GUARDED_BY(signaling_thread());
};
} // namespace webrtc

View File

@ -41,6 +41,10 @@ using ::testing::Bool;
using ::testing::Combine;
using ::testing::Values;
namespace {
const int64_t kWaitTimeout = 10000;
} // namespace
class PeerConnectionWrapperForSignalingTest : public PeerConnectionWrapper {
public:
using PeerConnectionWrapper::PeerConnectionWrapper;
@ -522,6 +526,63 @@ TEST_P(PeerConnectionSignalingTest, CreateOffersAndShutdown) {
}
}
// Similar to the above test, but by closing the PC first the CreateOffer() will
// fail "early", which triggers a codepath where the PeerConnection is
// reponsible for invoking the observer, instead of the normal codepath where
// the WebRtcSessionDescriptionFactory is responsible for it.
TEST_P(PeerConnectionSignalingTest, CloseCreateOfferAndShutdown) {
auto caller = CreatePeerConnection();
rtc::scoped_refptr<MockCreateSessionDescriptionObserver> observer =
new rtc::RefCountedObject<MockCreateSessionDescriptionObserver>();
caller->pc()->Close();
caller->pc()->CreateOffer(observer, RTCOfferAnswerOptions());
caller.reset(nullptr);
EXPECT_TRUE(observer->called());
}
TEST_P(PeerConnectionSignalingTest, SetRemoteDescriptionExecutesImmediately) {
auto caller = CreatePeerConnectionWithAudioVideo();
auto callee = CreatePeerConnection();
// This offer will cause receivers to be created.
auto offer = caller->CreateOffer(RTCOfferAnswerOptions());
// By not waiting for the observer's callback we can verify that the operation
// executed immediately.
callee->pc()->SetRemoteDescription(std::move(offer),
new MockSetRemoteDescriptionObserver());
EXPECT_EQ(2u, callee->pc()->GetReceivers().size());
}
TEST_P(PeerConnectionSignalingTest, CreateOfferBlocksSetRemoteDescription) {
auto caller = CreatePeerConnectionWithAudioVideo();
auto callee = CreatePeerConnection();
// This offer will cause receivers to be created.
auto offer = caller->CreateOffer(RTCOfferAnswerOptions());
EXPECT_EQ(0u, callee->pc()->GetReceivers().size());
rtc::scoped_refptr<MockCreateSessionDescriptionObserver> offer_observer(
new rtc::RefCountedObject<MockCreateSessionDescriptionObserver>());
// Synchronously invoke CreateOffer() and SetRemoteDescription(). The
// SetRemoteDescription() operation should be chained to be executed
// asynchronously, when CreateOffer() completes.
callee->pc()->CreateOffer(offer_observer, RTCOfferAnswerOptions());
callee->pc()->SetRemoteDescription(std::move(offer),
new MockSetRemoteDescriptionObserver());
// CreateOffer() is asynchronous; without message processing this operation
// should not have completed.
EXPECT_FALSE(offer_observer->called());
// Due to chaining, the receivers should not have been created by the offer
// yet.
EXPECT_EQ(0u, callee->pc()->GetReceivers().size());
// EXPECT_EQ_WAIT causes messages to be processed...
EXPECT_EQ_WAIT(true, offer_observer->called(), kWaitTimeout);
// Now that the offer has been completed, SetRemoteDescription() will have
// been executed next in the chain.
EXPECT_EQ(2u, callee->pc()->GetReceivers().size());
}
INSTANTIATE_TEST_SUITE_P(PeerConnectionSignalingTest,
PeerConnectionSignalingTest,
Values(SdpSemantics::kPlanB,