diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 440dd0e655..e99fc01025 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -1389,17 +1389,17 @@ rtc_source_set("webrtc_session_description_factory") { "../api:rtc_error", "../api:scoped_refptr", "../api:sequence_checker", + "../api/task_queue", "../p2p:rtc_p2p", "../rtc_base:checks", - "../rtc_base:location", "../rtc_base:logging", "../rtc_base:rtc_base", "../rtc_base:stringutils", - "../rtc_base:threading", "../rtc_base:weak_ptr", ] absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/types:optional", ] } diff --git a/pc/webrtc_session_description_factory.cc b/pc/webrtc_session_description_factory.cc index 363a7f71a0..3d398e3b6e 100644 --- a/pc/webrtc_session_description_factory.cc +++ b/pc/webrtc_session_description_factory.cc @@ -12,7 +12,7 @@ #include -#include +#include #include #include #include @@ -28,7 +28,6 @@ #include "pc/sdp_state_provider.h" #include "pc/session_description.h" #include "rtc_base/checks.h" -#include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/ssl_identity.h" #include "rtc_base/ssl_stream_adapter.h" @@ -67,22 +66,6 @@ static bool ValidMediaSessionOptions( return sender1.track_id == sender2.track_id; }) == sorted_senders.end(); } - -enum { - MSG_CREATE_SESSIONDESCRIPTION_SUCCESS, - MSG_CREATE_SESSIONDESCRIPTION_FAILED, -}; - -struct CreateSessionDescriptionMsg : public rtc::MessageData { - explicit CreateSessionDescriptionMsg( - webrtc::CreateSessionDescriptionObserver* observer, - RTCError error_in) - : observer(observer), error(std::move(error_in)) {} - - rtc::scoped_refptr observer; - RTCError error; - std::unique_ptr description; -}; } // namespace // static @@ -192,12 +175,14 @@ WebRtcSessionDescriptionFactory::~WebRtcSessionDescriptionFactory() { // Fail any requests that were asked for before identity generation completed. FailPendingRequests(kFailedDueToSessionShutdown); - // Process all pending notifications in the message queue. If we don't do - // this, requests will linger and not know they succeeded or failed. - rtc::MessageList list; - signaling_thread_->Clear(this, rtc::MQID_ANY, &list); - for (auto& msg : list) { - OnMessage(&msg); + // Process all pending notifications. If we don't do this, requests will + // linger and not know they succeeded or failed. + // All tasks that suppose to run them are protected with weak_factory_ and + // will be cancelled. If we don't protect them, they might trigger after peer + // connection is destroyed, which might be surprising. + while (!callbacks_.empty()) { + std::move(callbacks_.front())(); + callbacks_.pop(); } } @@ -282,28 +267,6 @@ cricket::SecurePolicy WebRtcSessionDescriptionFactory::SdesPolicy() const { return session_desc_factory_.secure(); } -void WebRtcSessionDescriptionFactory::OnMessage(rtc::Message* msg) { - switch (msg->message_id) { - case MSG_CREATE_SESSIONDESCRIPTION_SUCCESS: { - CreateSessionDescriptionMsg* param = - static_cast(msg->pdata); - param->observer->OnSuccess(param->description.release()); - delete param; - break; - } - case MSG_CREATE_SESSIONDESCRIPTION_FAILED: { - CreateSessionDescriptionMsg* param = - static_cast(msg->pdata); - param->observer->OnFailure(std::move(param->error)); - delete param; - break; - } - default: - RTC_DCHECK_NOTREACHED(); - break; - } -} - void WebRtcSessionDescriptionFactory::InternalCreateOffer( CreateSessionDescriptionRequest request) { if (sdp_info_->local_description()) { @@ -435,21 +398,39 @@ void WebRtcSessionDescriptionFactory::FailPendingRequests( void WebRtcSessionDescriptionFactory::PostCreateSessionDescriptionFailed( CreateSessionDescriptionObserver* observer, const std::string& error) { - CreateSessionDescriptionMsg* msg = new CreateSessionDescriptionMsg( - observer, RTCError(RTCErrorType::INTERNAL_ERROR, std::string(error))); - signaling_thread_->Post(RTC_FROM_HERE, this, - MSG_CREATE_SESSIONDESCRIPTION_FAILED, msg); + Post([observer = + rtc::scoped_refptr(observer), + error]() mutable { + observer->OnFailure( + RTCError(RTCErrorType::INTERNAL_ERROR, std::move(error))); + }); RTC_LOG(LS_ERROR) << "Create SDP failed: " << error; } void WebRtcSessionDescriptionFactory::PostCreateSessionDescriptionSucceeded( CreateSessionDescriptionObserver* observer, std::unique_ptr description) { - CreateSessionDescriptionMsg* msg = - new CreateSessionDescriptionMsg(observer, RTCError::OK()); - msg->description = std::move(description); - signaling_thread_->Post(RTC_FROM_HERE, this, - MSG_CREATE_SESSIONDESCRIPTION_SUCCESS, msg); + Post([observer = + rtc::scoped_refptr(observer), + description = std::move(description)]() mutable { + observer->OnSuccess(description.release()); + }); +} + +void WebRtcSessionDescriptionFactory::Post( + absl::AnyInvocable callback) { + RTC_DCHECK_RUN_ON(signaling_thread_); + callbacks_.push(std::move(callback)); + signaling_thread_->PostTask([weak_ptr = weak_factory_.GetWeakPtr()] { + if (weak_ptr) { + auto& callbacks = weak_ptr->callbacks_; + // Callbacks are pushed from the same thread, thus this task should + // corresond to the first entry in the queue. + RTC_DCHECK(!callbacks.empty()); + std::move(callbacks.front())(); + callbacks.pop(); + } + }); } void WebRtcSessionDescriptionFactory::OnCertificateRequestFailed() { diff --git a/pc/webrtc_session_description_factory.h b/pc/webrtc_session_description_factory.h index f85f712e59..122a720162 100644 --- a/pc/webrtc_session_description_factory.h +++ b/pc/webrtc_session_description_factory.h @@ -18,18 +18,17 @@ #include #include +#include "absl/functional/any_invocable.h" #include "api/jsep.h" #include "api/peer_connection_interface.h" #include "api/scoped_refptr.h" +#include "api/task_queue/task_queue_base.h" #include "p2p/base/transport_description.h" #include "p2p/base/transport_description_factory.h" #include "pc/media_session.h" #include "pc/sdp_state_provider.h" -#include "rtc_base/message_handler.h" #include "rtc_base/rtc_certificate.h" #include "rtc_base/rtc_certificate_generator.h" -#include "rtc_base/thread.h" -#include "rtc_base/thread_message.h" #include "rtc_base/unique_id_generator.h" #include "rtc_base/weak_ptr.h" @@ -39,7 +38,7 @@ namespace webrtc { // asynchronously. It queues the create offer/answer request until the // certificate generation has completed, i.e. when OnCertificateRequestFailed or // OnCertificateReady is called. -class WebRtcSessionDescriptionFactory : public rtc::MessageHandler { +class WebRtcSessionDescriptionFactory { public: // Can specify either a `cert_generator` or `certificate` to enable DTLS. If // a certificate generator is given, starts generating the certificate @@ -55,7 +54,7 @@ class WebRtcSessionDescriptionFactory : public rtc::MessageHandler { std::function&)> on_certificate_ready, const FieldTrialsView& field_trials); - virtual ~WebRtcSessionDescriptionFactory(); + ~WebRtcSessionDescriptionFactory(); WebRtcSessionDescriptionFactory(const WebRtcSessionDescriptionFactory&) = delete; @@ -114,9 +113,6 @@ class WebRtcSessionDescriptionFactory : public rtc::MessageHandler { cricket::MediaSessionOptions options; }; - // MessageHandler implementation. - virtual void OnMessage(rtc::Message* msg); - void InternalCreateOffer(CreateSessionDescriptionRequest request); void InternalCreateAnswer(CreateSessionDescriptionRequest request); // Posts failure notifications for all pending session description requests. @@ -127,13 +123,16 @@ class WebRtcSessionDescriptionFactory : public rtc::MessageHandler { void PostCreateSessionDescriptionSucceeded( CreateSessionDescriptionObserver* observer, std::unique_ptr description); + // Posts `callback` to `signaling_thread_`, and ensures it will be called no + // later than in the destructor. + void Post(absl::AnyInvocable callback); void OnCertificateRequestFailed(); void SetCertificate(rtc::scoped_refptr certificate); std::queue create_session_description_requests_; - rtc::Thread* const signaling_thread_; + TaskQueueBase* const signaling_thread_; cricket::TransportDescriptionFactory transport_desc_factory_; cricket::MediaSessionDescriptionFactory session_desc_factory_; uint64_t session_version_; @@ -141,6 +140,7 @@ class WebRtcSessionDescriptionFactory : public rtc::MessageHandler { const SdpStateProvider* sdp_info_; const std::string session_id_; CertificateRequestState certificate_request_state_; + std::queue> callbacks_; std::function&)> on_certificate_ready_;