diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn index 070a30ee2c..7ccad67347 100644 --- a/p2p/BUILD.gn +++ b/p2p/BUILD.gn @@ -144,6 +144,8 @@ if (rtc_include_tests) { "../api:libjingle_peerconnection_api", "../rtc_base", "../rtc_base:rtc_base_approved", + "../rtc_base/task_utils:pending_task_safety_flag", + "../rtc_base/task_utils:to_queued_task", ] absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", diff --git a/p2p/base/fake_ice_transport.h b/p2p/base/fake_ice_transport.h index 58d83d761c..f8be8a9835 100644 --- a/p2p/base/fake_ice_transport.h +++ b/p2p/base/fake_ice_transport.h @@ -20,8 +20,9 @@ #include "absl/types/optional.h" #include "api/ice_transport_interface.h" #include "p2p/base/ice_transport_internal.h" -#include "rtc_base/async_invoker.h" #include "rtc_base/copy_on_write_buffer.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" namespace cricket { @@ -305,12 +306,12 @@ class FakeIceTransport : public IceTransportInternal { if (!combine_outgoing_packets_ || send_packet_.size() > len) { rtc::CopyOnWriteBuffer packet(std::move(send_packet_)); if (async_) { - invoker_.AsyncInvokeDelayed( - RTC_FROM_HERE, network_thread_, - [this, packet] { - RTC_DCHECK_RUN_ON(network_thread_); - FakeIceTransport::SendPacketInternal(packet); - }, + network_thread_->PostDelayedTask( + ToQueuedTask(task_safety_.flag(), + [this, packet] { + RTC_DCHECK_RUN_ON(network_thread_); + FakeIceTransport::SendPacketInternal(packet); + }), async_delay_ms_); } else { SendPacketInternal(packet); @@ -389,7 +390,6 @@ class FakeIceTransport : public IceTransportInternal { } } - rtc::AsyncInvoker invoker_; const std::string name_; const int component_; FakeIceTransport* dest_ RTC_GUARDED_BY(network_thread_) = nullptr; @@ -420,6 +420,7 @@ class FakeIceTransport : public IceTransportInternal { RTC_GUARDED_BY(network_thread_); rtc::CopyOnWriteBuffer last_sent_packet_ RTC_GUARDED_BY(network_thread_); rtc::Thread* const network_thread_; + webrtc::ScopedTaskSafetyDetached task_safety_; }; class FakeIceTransportWrapper : public webrtc::IceTransportInterface { diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc index acdbc006bb..595457dc0e 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.cc +++ b/rtc_base/task_utils/pending_task_safety_flag.cc @@ -19,6 +19,13 @@ rtc::scoped_refptr PendingTaskSafetyFlag::Create() { return new rtc::RefCountedObject(); } +rtc::scoped_refptr +PendingTaskSafetyFlag::CreateDetached() { + auto safety_flag = Create(); + safety_flag->main_sequence_.Detach(); + return safety_flag; +} + void PendingTaskSafetyFlag::SetNotAlive() { RTC_DCHECK_RUN_ON(&main_sequence_); alive_ = false; diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h index 4c9d3482af..abfce26ad8 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.h +++ b/rtc_base/task_utils/pending_task_safety_flag.h @@ -58,10 +58,28 @@ namespace webrtc { class PendingTaskSafetyFlag : public rtc::RefCountInterface { public: static rtc::scoped_refptr Create(); + // Creates a flag, but with its SequenceChecker initially detached. Hence, it + // may be created on a different thread than the flag will be used on. + static rtc::scoped_refptr CreateDetached(); ~PendingTaskSafetyFlag() = default; void SetNotAlive(); + // The SetAlive method is intended to support Start/Stop/Restart usecases. + // When a class has called SetNotAlive on a flag used for posted tasks, and + // decides it wants to post new tasks and have them run, there are two + // reasonable ways to do that: + // + // (i) Use the below SetAlive method. One subtlety is that any task posted + // prior to SetNotAlive, and still in the queue, is resurrected and will + // run. + // + // (ii) Create a fresh flag, and just drop the reference to the old one. This + // avoids the above problem, and ensures that tasks poster prior to + // SetNotAlive stay cancelled. Instead, there's a potential data race on + // the flag pointer itself. Some synchronization is required between the + // thread overwriting the flag pointer, and the threads that want to post + // tasks and therefore read that same pointer. void SetAlive(); bool alive() const; @@ -103,6 +121,21 @@ class ScopedTaskSafety { PendingTaskSafetyFlag::Create(); }; +// Like ScopedTaskSafety, but allows construction on a different thread than +// where the flag will be used. +class ScopedTaskSafetyDetached { + public: + ScopedTaskSafetyDetached() = default; + ~ScopedTaskSafetyDetached() { flag_->SetNotAlive(); } + + // Returns a new reference to the safety flag. + rtc::scoped_refptr flag() const { return flag_; } + + private: + rtc::scoped_refptr flag_ = + PendingTaskSafetyFlag::CreateDetached(); +}; + } // namespace webrtc #endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_