From ee6f4f67ef32e3eb095c6d1ce8b679fe191adcc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Bostr=C3=B6m?= Date: Wed, 6 Nov 2019 12:36:12 +0100 Subject: [PATCH] [PeerConnection] Implement asynchronous version of AddIceCandidate(). MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is the same as the existing version, except it uses the Operations Chain. As such, if an asynchronous operation that uses the chain is currently pending, such as CreateOffer() or CreateAnswer(), AddIceCandidate() will not happen until the previous operation completes. Bug: chromium:1019222 Change-Id: Ie6e5fc386fa9c29b5e2f8e3f65bfbaf9837d351c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158741 Commit-Queue: Henrik Boström Reviewed-by: Steve Anton Cr-Commit-Position: refs/heads/master@{#29704} --- api/peer_connection_interface.h | 7 ++ api/peer_connection_proxy.h | 4 + pc/peer_connection.cc | 31 ++++++ pc/peer_connection.h | 2 + pc/peer_connection_ice_unittest.cc | 140 +++++++++++++++++++++++++- rtc_base/operations_chain.h | 9 +- rtc_base/operations_chain_unittest.cc | 55 ++++++++++ 7 files changed, 242 insertions(+), 6 deletions(-) diff --git a/api/peer_connection_interface.h b/api/peer_connection_interface.h index 7567ab1758..52422c0d87 100644 --- a/api/peer_connection_interface.h +++ b/api/peer_connection_interface.h @@ -1039,7 +1039,14 @@ class RTC_EXPORT PeerConnectionInterface : public rtc::RefCountInterface { // A copy of the |candidate| will be created and added to the remote // description. So the caller of this method still has the ownership of the // |candidate|. + // TODO(hbos): The spec mandates chaining this operation onto the operations + // chain; deprecate and remove this version in favor of the callback-based + // signature. virtual bool AddIceCandidate(const IceCandidateInterface* candidate) = 0; + // TODO(hbos): Remove default implementation once implemented by downstream + // projects. + virtual void AddIceCandidate(std::unique_ptr candidate, + std::function callback) {} // Removes a group of remote candidates from the ICE agent. Needed mainly for // continual gathering, to avoid an ever-growing list of candidates as diff --git a/api/peer_connection_proxy.h b/api/peer_connection_proxy.h index 3b9cf792f4..1b4ceeaeff 100644 --- a/api/peer_connection_proxy.h +++ b/api/peer_connection_proxy.h @@ -114,6 +114,10 @@ PROXY_METHOD1(RTCError, SetConfiguration, const PeerConnectionInterface::RTCConfiguration&) PROXY_METHOD1(bool, AddIceCandidate, const IceCandidateInterface*) +PROXY_METHOD2(void, + AddIceCandidate, + std::unique_ptr, + std::function) PROXY_METHOD1(bool, RemoveIceCandidates, const std::vector&) PROXY_METHOD1(RTCError, SetBitrate, const BitrateSettings&) PROXY_METHOD1(void, SetAudioPlayout, bool) diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 46a61ab517..d09b9c56b5 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -4209,6 +4209,37 @@ bool PeerConnection::AddIceCandidate( } } +void PeerConnection::AddIceCandidate( + std::unique_ptr candidate, + std::function callback) { + RTC_DCHECK_RUN_ON(signaling_thread()); + // Chain this operation. If asynchronous operations are pending on the chain, + // this operation will be queued to be invoked, otherwise the contents of the + // lambda will execute immediately. + operations_chain_->ChainOperation( + [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), + candidate = std::move(candidate), callback = std::move(callback)]( + std::function operations_chain_callback) { + if (!this_weak_ptr) { + operations_chain_callback(); + callback(RTCError( + RTCErrorType::INVALID_STATE, + "AddIceCandidate failed because the session was shut down")); + return; + } + if (!this_weak_ptr->AddIceCandidate(candidate.get())) { + operations_chain_callback(); + // Fail with an error type and message consistent with Chromium. + // TODO(hbos): Fail with error types according to spec. + callback(RTCError(RTCErrorType::UNSUPPORTED_OPERATION, + "Error processing ICE candidate")); + return; + } + operations_chain_callback(); + callback(RTCError::OK()); + }); +} + bool PeerConnection::RemoveIceCandidates( const std::vector& candidates) { TRACE_EVENT0("webrtc", "PeerConnection::RemoveIceCandidates"); diff --git a/pc/peer_connection.h b/pc/peer_connection.h index dea05ac318..302ff3bfd4 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -221,6 +221,8 @@ class PeerConnection : public PeerConnectionInternal, RTCError SetConfiguration( const PeerConnectionInterface::RTCConfiguration& configuration) override; bool AddIceCandidate(const IceCandidateInterface* candidate) override; + void AddIceCandidate(std::unique_ptr candidate, + std::function callback) override; bool RemoveIceCandidates( const std::vector& candidates) override; diff --git a/pc/peer_connection_ice_unittest.cc b/pc/peer_connection_ice_unittest.cc index 61034d0138..18a053c51e 100644 --- a/pc/peer_connection_ice_unittest.cc +++ b/pc/peer_connection_ice_unittest.cc @@ -28,6 +28,7 @@ #include "api/video_codecs/builtin_video_decoder_factory.h" #include "api/video_codecs/builtin_video_encoder_factory.h" #include "pc/test/fake_audio_capture_module.h" +#include "pc/test/mock_peer_connection_observers.h" #include "rtc_base/fake_network.h" #include "rtc_base/gunit.h" #include "rtc_base/strings/string_builder.h" @@ -46,21 +47,26 @@ using ::testing::Pair; using ::testing::Values; constexpr int kIceCandidatesTimeout = 10000; +constexpr int64_t kWaitTimeout = 10000; class PeerConnectionWrapperForIceTest : public PeerConnectionWrapper { public: using PeerConnectionWrapper::PeerConnectionWrapper; - // Adds a new ICE candidate to the first transport. - bool AddIceCandidate(cricket::Candidate* candidate) { + std::unique_ptr CreateJsepCandidateForFirstTransport( + cricket::Candidate* candidate) { RTC_DCHECK(pc()->remote_description()); const auto* desc = pc()->remote_description()->description(); RTC_DCHECK(desc->contents().size() > 0); const auto& first_content = desc->contents()[0]; candidate->set_transport_name(first_content.name); - std::unique_ptr jsep_candidate = - CreateIceCandidate(first_content.name, -1, *candidate); - return pc()->AddIceCandidate(jsep_candidate.get()); + return CreateIceCandidate(first_content.name, -1, *candidate); + } + + // Adds a new ICE candidate to the first transport. + bool AddIceCandidate(cricket::Candidate* candidate) { + return pc()->AddIceCandidate( + CreateJsepCandidateForFirstTransport(candidate).get()); } // Returns ICE candidates from the remote session description. @@ -691,6 +697,130 @@ TEST_P(PeerConnectionIceTest, TwoTrickledCandidatesAddedToRemoteDescription) { candidates[1]->candidate()); } +TEST_P(PeerConnectionIceTest, AsyncAddIceCandidateIsAddedToRemoteDescription) { + auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111)); + + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnectionWithAudioVideo(); + + ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal())); + + auto jsep_candidate = + callee->CreateJsepCandidateForFirstTransport(&candidate); + bool operation_completed = false; + callee->pc()->AddIceCandidate(std::move(jsep_candidate), + [&operation_completed](RTCError result) { + EXPECT_TRUE(result.ok()); + operation_completed = true; + }); + EXPECT_TRUE_WAIT(operation_completed, kWaitTimeout); + + auto candidates = callee->GetIceCandidatesFromRemoteDescription(); + ASSERT_EQ(1u, candidates.size()); + EXPECT_PRED_FORMAT2(AssertCandidatesEqual, candidate, + candidates[0]->candidate()); +} + +TEST_P(PeerConnectionIceTest, + AsyncAddIceCandidateCompletesImmediatelyIfNoPendingOperation) { + auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111)); + + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnectionWithAudioVideo(); + + ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal())); + + auto jsep_candidate = + callee->CreateJsepCandidateForFirstTransport(&candidate); + bool operation_completed = false; + callee->pc()->AddIceCandidate( + std::move(jsep_candidate), + [&operation_completed](RTCError result) { operation_completed = true; }); + EXPECT_TRUE(operation_completed); +} + +TEST_P(PeerConnectionIceTest, + AsyncAddIceCandidateCompletesWhenPendingOperationCompletes) { + auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111)); + + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnectionWithAudioVideo(); + + ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal())); + + // Chain an operation that will block AddIceCandidate() from executing. + rtc::scoped_refptr answer_observer( + new rtc::RefCountedObject()); + callee->pc()->CreateAnswer(answer_observer, RTCOfferAnswerOptions()); + + auto jsep_candidate = + callee->CreateJsepCandidateForFirstTransport(&candidate); + bool operation_completed = false; + callee->pc()->AddIceCandidate( + std::move(jsep_candidate), + [&operation_completed](RTCError result) { operation_completed = true; }); + // The operation will not be able to complete until we EXPECT_TRUE_WAIT() + // allowing CreateAnswer() to complete. + EXPECT_FALSE(operation_completed); + EXPECT_TRUE_WAIT(answer_observer->called(), kWaitTimeout); + // As soon as it does, AddIceCandidate() will execute without delay, so it + // must also have completed. + EXPECT_TRUE(operation_completed); +} + +TEST_P(PeerConnectionIceTest, + AsyncAddIceCandidateFailsBeforeSetRemoteDescription) { + auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111)); + + auto caller = CreatePeerConnectionWithAudioVideo(); + std::unique_ptr jsep_candidate = + CreateIceCandidate(cricket::CN_AUDIO, 0, candidate); + + bool operation_completed = false; + caller->pc()->AddIceCandidate( + std::move(jsep_candidate), [&operation_completed](RTCError result) { + EXPECT_FALSE(result.ok()); + EXPECT_EQ(result.message(), + std::string("Error processing ICE candidate")); + operation_completed = true; + }); + EXPECT_TRUE_WAIT(operation_completed, kWaitTimeout); +} + +TEST_P(PeerConnectionIceTest, + AsyncAddIceCandidateFailsIfPeerConnectionDestroyed) { + auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111)); + + auto caller = CreatePeerConnectionWithAudioVideo(); + auto callee = CreatePeerConnectionWithAudioVideo(); + + ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal())); + + // Chain an operation that will block AddIceCandidate() from executing. + rtc::scoped_refptr answer_observer( + new rtc::RefCountedObject()); + callee->pc()->CreateAnswer(answer_observer, RTCOfferAnswerOptions()); + + auto jsep_candidate = + callee->CreateJsepCandidateForFirstTransport(&candidate); + bool operation_completed = false; + callee->pc()->AddIceCandidate( + std::move(jsep_candidate), [&operation_completed](RTCError result) { + EXPECT_FALSE(result.ok()); + EXPECT_EQ( + result.message(), + std::string( + "AddIceCandidate failed because the session was shut down")); + operation_completed = true; + }); + // The operation will not be able to run until EXPECT_TRUE_WAIT(), giving us + // time to remove all references to the PeerConnection. + EXPECT_FALSE(operation_completed); + // This should delete the callee PC. + callee = nullptr; + EXPECT_TRUE_WAIT(operation_completed, kWaitTimeout); +} + TEST_P(PeerConnectionIceTest, LocalDescriptionUpdatedWhenContinualGathering) { const SocketAddress kLocalAddress("1.1.1.1", 0); diff --git a/rtc_base/operations_chain.h b/rtc_base/operations_chain.h index 94ff57bef7..b6ec46e04a 100644 --- a/rtc_base/operations_chain.h +++ b/rtc_base/operations_chain.h @@ -56,7 +56,14 @@ class OperationWithFunctor final : public Operation { #ifdef RTC_DCHECK_IS_ON has_run_ = true; #endif // RTC_DCHECK_IS_ON - functor_(std::move(callback_)); + // The functor being executed may invoke the callback synchronously, + // marking the operation as complete. As such, |this| OperationWithFunctor + // object may get deleted here, including destroying |functor_|. To + // protect the functor from self-destruction while running, it is moved to + // a local variable. + auto functor = std::move(functor_); + functor(std::move(callback_)); + // |this| may now be deleted; don't touch any member variables. } private: diff --git a/rtc_base/operations_chain_unittest.cc b/rtc_base/operations_chain_unittest.cc index 8dbe607d6d..968f94c060 100644 --- a/rtc_base/operations_chain_unittest.cc +++ b/rtc_base/operations_chain_unittest.cc @@ -172,6 +172,38 @@ class OperationTrackerProxy { scoped_refptr operations_chain_; }; +// On destruction, sets a boolean flag to true. +class SignalOnDestruction final { + public: + SignalOnDestruction(bool* destructor_called) + : destructor_called_(destructor_called) { + RTC_DCHECK(destructor_called_); + } + ~SignalOnDestruction() { + // Moved objects will have |destructor_called_| set to null. Destroying a + // moved SignalOnDestruction should not signal. + if (destructor_called_) { + *destructor_called_ = true; + } + } + + // Move operators. + SignalOnDestruction(SignalOnDestruction&& other) + : SignalOnDestruction(other.destructor_called_) { + other.destructor_called_ = nullptr; + } + SignalOnDestruction& operator=(SignalOnDestruction&& other) { + destructor_called_ = other.destructor_called_; + other.destructor_called_ = nullptr; + return *this; + } + + private: + bool* destructor_called_; + + RTC_DISALLOW_COPY_AND_ASSIGN(SignalOnDestruction); +}; + TEST(OperationsChainTest, SynchronousOperation) { OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); @@ -312,6 +344,29 @@ TEST(OperationsChainTest, async_operation_completed_event->Wait(Event::kForever); } +TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) { + scoped_refptr operations_chain = OperationsChain::Create(); + + bool destructor_called = false; + SignalOnDestruction signal_on_destruction(&destructor_called); + + operations_chain->ChainOperation( + [signal_on_destruction = std::move(signal_on_destruction), + &destructor_called](std::function callback) { + EXPECT_FALSE(destructor_called); + // Invoking the callback marks the operation as complete, popping the + // Operation object from the OperationsChain internal queue. + callback(); + // Even though the internal Operation object has been destroyed, + // variables captured by this lambda expression must still be valid (the + // associated functor must not be deleted while executing). + EXPECT_FALSE(destructor_called); + }); + // The lambda having executed synchronously and completed, its captured + // variables should now have been deleted. + EXPECT_TRUE(destructor_called); +} + #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) TEST(OperationsChainTest, OperationNotInvokingCallbackShouldCrash) {