From 5286dcfab671b32841e089fa22fa920dbdbd4955 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Mon, 18 Jul 2022 17:04:56 +0200 Subject: [PATCH] Migrate rtc_base and rtc_tools to absl::AnyInvocable based TaskQueueBase interface Bug: webrtc:14245 Change-Id: I71abe3db7a23ad33bd175297e23fa8e927fa9628 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268768 Reviewed-by: Mirko Bonadei Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#37553} --- rtc_base/BUILD.gn | 15 ++-- rtc_base/async_resolver.cc | 20 ++--- rtc_base/fake_mdns_responder.h | 6 +- rtc_base/memory/BUILD.gn | 1 - rtc_base/memory/fifo_buffer.h | 7 +- rtc_base/network.cc | 21 +++-- rtc_base/openssl_stream_adapter.cc | 7 +- rtc_base/ssl_stream_adapter_unittest.cc | 6 +- rtc_base/task_queue_for_test.h | 32 ++----- rtc_base/task_utils/BUILD.gn | 1 - rtc_base/thread.cc | 5 +- rtc_base/thread.h | 1 - rtc_base/time_utils_unittest.cc | 9 +- rtc_base/unique_id_generator_unittest.cc | 10 ++- rtc_tools/network_tester/BUILD.gn | 6 +- rtc_tools/network_tester/packet_sender.cc | 96 +++++++++------------ rtc_tools/network_tester/packet_sender.h | 2 +- rtc_tools/network_tester/test_controller.cc | 2 +- rtc_tools/video_replay.cc | 12 +-- 19 files changed, 116 insertions(+), 143 deletions(-) diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 3ee88f4c4a..322fd7a70f 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -948,7 +948,6 @@ rtc_library("threading") { "../api:sequence_checker", "../api/task_queue", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/units:time_delta", "synchronization:mutex", "system:no_unique_address", @@ -1092,8 +1091,8 @@ rtc_library("rtc_base") { "../api/numerics", "../api/task_queue", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/transport:field_trial_based_config", + "../api/units:time_delta", "../system_wrappers:field_trial", "memory:always_valid_pointer", "network:sent_packet", @@ -1378,7 +1377,6 @@ rtc_library("rtc_base_tests_utils") { ":stringutils", ":threading", ":timeutils", - "../api/task_queue:to_queued_task", "../api/units:time_delta", "../api/units:timestamp", "../test:scoped_key_value_config", @@ -1407,11 +1405,14 @@ rtc_library("task_queue_for_test") { ":macromagic", ":rtc_event", ":rtc_task_queue", + "../api:function_view", "../api/task_queue", "../api/task_queue:default_task_queue_factory", - "../api/task_queue:to_queued_task", ] - absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] + absl_deps = [ + "//third_party/abseil-cpp/absl/cleanup", + "//third_party/abseil-cpp/absl/strings", + ] } if (rtc_include_tests) { @@ -1594,7 +1595,6 @@ if (rtc_include_tests) { "../api:make_ref_counted", "../api:scoped_refptr", "../api/numerics", - "../api/task_queue:to_queued_task", "../api/units:time_delta", "../system_wrappers", "../test:fileutils", @@ -1631,6 +1631,7 @@ if (rtc_include_tests) { ":rtc_task_queue", ":task_queue_for_test", ":timeutils", + "../api/units:time_delta", "../test:test_main", "../test:test_support", ] @@ -1736,7 +1737,6 @@ if (rtc_include_tests) { "../api/task_queue", "../api/task_queue:pending_task_safety_flag", "../api/task_queue:task_queue_test", - "../api/task_queue:to_queued_task", "../api/units:time_delta", "../test:field_trial", "../test:fileutils", @@ -1770,6 +1770,7 @@ if (rtc_include_tests) { } absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", diff --git a/rtc_base/async_resolver.cc b/rtc_base/async_resolver.cc index 2975d9f078..198013c2bc 100644 --- a/rtc_base/async_resolver.cc +++ b/rtc_base/async_resolver.cc @@ -34,7 +34,6 @@ #endif // defined(WEBRTC_POSIX) && !defined(__native_client__) #include "api/task_queue/task_queue_base.h" -#include "api/task_queue/to_queued_task.h" #include "rtc_base/ip_address.h" #include "rtc_base/logging.h" #include "rtc_base/platform_thread.h" @@ -51,17 +50,17 @@ namespace rtc { namespace { void GlobalGcdRunTask(void* context) { - std::unique_ptr task( - static_cast(context)); - task->Run(); + std::unique_ptr> task( + static_cast*>(context)); + std::move (*task)(); } // Post a task into the system-defined global concurrent queue. -void PostTaskToGlobalQueue(std::unique_ptr task) { +void PostTaskToGlobalQueue( + std::unique_ptr> task) { dispatch_queue_global_t global_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); - webrtc::QueuedTask* context = task.release(); - dispatch_async_f(global_queue, context, &GlobalGcdRunTask); + dispatch_async_f(global_queue, task.release(), &GlobalGcdRunTask); } } // namespace @@ -156,7 +155,7 @@ void AsyncResolver::Start(const SocketAddress& addr) { int error = ResolveHostname(addr.hostname(), addr.family(), &addresses); webrtc::MutexLock lock(&state->mutex); if (state->status == State::Status::kLive) { - caller_task_queue->PostTask(webrtc::ToQueuedTask( + caller_task_queue->PostTask( [this, error, addresses = std::move(addresses), state] { bool live; { @@ -169,11 +168,12 @@ void AsyncResolver::Start(const SocketAddress& addr) { RTC_DCHECK_RUN_ON(&sequence_checker_); ResolveDone(std::move(addresses), error); } - })); + }); } }; #if defined(WEBRTC_MAC) || defined(WEBRTC_IOS) - PostTaskToGlobalQueue(webrtc::ToQueuedTask(std::move(thread_function))); + PostTaskToGlobalQueue( + std::make_unique>(thread_function)); #else PlatformThread::SpawnDetached(std::move(thread_function), "AsyncResolver"); #endif diff --git a/rtc_base/fake_mdns_responder.h b/rtc_base/fake_mdns_responder.h index a7dcb96329..8be6f1ccda 100644 --- a/rtc_base/fake_mdns_responder.h +++ b/rtc_base/fake_mdns_responder.h @@ -16,7 +16,6 @@ #include #include "absl/strings/string_view.h" -#include "api/task_queue/to_queued_task.h" #include "rtc_base/ip_address.h" #include "rtc_base/location.h" #include "rtc_base/mdns_responder_interface.h" @@ -41,8 +40,7 @@ class FakeMdnsResponder : public MdnsResponderInterface { name = std::to_string(next_available_id_++) + ".local"; addr_name_map_[addr] = name; } - thread_->PostTask( - ToQueuedTask([callback, addr, name]() { callback(addr, name); })); + thread_->PostTask([callback, addr, name]() { callback(addr, name); }); } void RemoveNameForAddress(const rtc::IPAddress& addr, NameRemovedCallback callback) override { @@ -51,7 +49,7 @@ class FakeMdnsResponder : public MdnsResponderInterface { addr_name_map_.erase(it); } bool result = it != addr_name_map_.end(); - thread_->PostTask(ToQueuedTask([callback, result]() { callback(result); })); + thread_->PostTask([callback, result]() { callback(result); }); } rtc::IPAddress GetMappedAddressForName(absl::string_view name) const { diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn index a42c2e16d0..778a396cd8 100644 --- a/rtc_base/memory/BUILD.gn +++ b/rtc_base/memory/BUILD.gn @@ -36,7 +36,6 @@ rtc_library("fifo_buffer") { "..:rtc_base", "..:threading", "../../api/task_queue:pending_task_safety_flag", - "../../api/task_queue:to_queued_task", "../synchronization:mutex", ] } diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h index 2fa8213cde..aa3164f09a 100644 --- a/rtc_base/memory/fifo_buffer.h +++ b/rtc_base/memory/fifo_buffer.h @@ -14,7 +14,6 @@ #include #include "api/task_queue/pending_task_safety_flag.h" -#include "api/task_queue/to_queued_task.h" #include "rtc_base/stream.h" #include "rtc_base/synchronization/mutex.h" @@ -81,9 +80,9 @@ class FifoBuffer final : public StreamInterface { private: void PostEvent(int events, int err) { - owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() { - SignalEvent(this, events, err); - })); + owner_->PostTask(webrtc::SafeTask( + task_safety_.flag(), + [this, events, err]() { SignalEvent(this, events, err); })); } // Helper method that implements Read. Caller must acquire a lock diff --git a/rtc_base/network.cc b/rtc_base/network.cc index 364a52d6a9..9b53eb36c9 100644 --- a/rtc_base/network.cc +++ b/rtc_base/network.cc @@ -30,8 +30,9 @@ #include "absl/memory/memory.h" #include "absl/strings/match.h" #include "absl/strings/string_view.h" -#include "api/task_queue/to_queued_task.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "api/transport/field_trial_based_config.h" +#include "api/units/time_delta.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/memory/always_valid_pointer.h" @@ -44,6 +45,8 @@ namespace rtc { namespace { +using ::webrtc::SafeTask; +using ::webrtc::TimeDelta; // List of MAC addresses of known VPN (for windows). constexpr uint8_t kVpns[2][6] = { @@ -906,14 +909,14 @@ void BasicNetworkManager::StartUpdating() { // we should trigger network signal immediately for the new clients // to start allocating ports. if (sent_first_update_) - thread_->PostTask(ToQueuedTask(task_safety_flag_, [this] { + thread_->PostTask(SafeTask(task_safety_flag_, [this] { RTC_DCHECK_RUN_ON(thread_); SignalNetworksChanged(); })); } else { RTC_DCHECK(task_safety_flag_ == nullptr); task_safety_flag_ = webrtc::PendingTaskSafetyFlag::Create(); - thread_->PostTask(ToQueuedTask(task_safety_flag_, [this] { + thread_->PostTask(SafeTask(task_safety_flag_, [this] { RTC_DCHECK_RUN_ON(thread_); UpdateNetworksContinually(); })); @@ -1029,12 +1032,12 @@ void BasicNetworkManager::UpdateNetworksOnce() { void BasicNetworkManager::UpdateNetworksContinually() { UpdateNetworksOnce(); - thread_->PostDelayedTask(ToQueuedTask(task_safety_flag_, - [this] { - RTC_DCHECK_RUN_ON(thread_); - UpdateNetworksContinually(); - }), - kNetworksUpdateIntervalMs); + thread_->PostDelayedTask(SafeTask(task_safety_flag_, + [this] { + RTC_DCHECK_RUN_ON(thread_); + UpdateNetworksContinually(); + }), + TimeDelta::Millis(kNetworksUpdateIntervalMs)); } void BasicNetworkManager::DumpNetworks() { diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc index bc1c5be66d..da484ad3bf 100644 --- a/rtc_base/openssl_stream_adapter.cc +++ b/rtc_base/openssl_stream_adapter.cc @@ -39,7 +39,6 @@ #else #include "rtc_base/openssl_identity.h" #endif -#include "api/task_queue/to_queued_task.h" #include "rtc_base/openssl_utility.h" #include "rtc_base/ssl_certificate.h" #include "rtc_base/stream.h" @@ -60,6 +59,7 @@ namespace rtc { namespace { +using ::webrtc::SafeTask; // SRTP cipher suite table. `internal_name` is used to construct a // colon-separated profile strings which is needed by // SSL_CTX_set_tlsext_use_srtp(). @@ -821,8 +821,9 @@ void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream, } void OpenSSLStreamAdapter::PostEvent(int events, int err) { - owner_->PostTask(webrtc::ToQueuedTask( - task_safety_, [this, events, err]() { SignalEvent(this, events, err); })); + owner_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() { + SignalEvent(this, events, err); + })); } void OpenSSLStreamAdapter::SetTimeout(int delay_ms) { diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc index 8b794a01f8..49cbbe0a02 100644 --- a/rtc_base/ssl_stream_adapter_unittest.cc +++ b/rtc_base/ssl_stream_adapter_unittest.cc @@ -18,7 +18,6 @@ #include "absl/memory/memory.h" #include "absl/strings/string_view.h" #include "api/task_queue/pending_task_safety_flag.h" -#include "api/task_queue/to_queued_task.h" #include "rtc_base/buffer_queue.h" #include "rtc_base/checks.h" #include "rtc_base/gunit.h" @@ -36,6 +35,7 @@ using ::testing::Combine; using ::testing::tuple; using ::testing::Values; using ::testing::WithParamInterface; +using ::webrtc::SafeTask; static const int kBlockSize = 4096; static const char kExporterLabel[] = "label"; @@ -220,7 +220,7 @@ class SSLDummyStreamBase : public rtc::StreamInterface, private: void PostEvent(int events, int err) { - thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() { + thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() { SignalEvent(this, events, err); })); } @@ -292,7 +292,7 @@ class BufferQueueStream : public rtc::StreamInterface { private: void PostEvent(int events, int err) { - thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() { + thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() { SignalEvent(this, events, err); })); } diff --git a/rtc_base/task_queue_for_test.h b/rtc_base/task_queue_for_test.h index c1de87412c..450050a549 100644 --- a/rtc_base/task_queue_for_test.h +++ b/rtc_base/task_queue_for_test.h @@ -13,9 +13,10 @@ #include +#include "absl/cleanup/cleanup.h" #include "absl/strings/string_view.h" +#include "api/function_view.h" #include "api/task_queue/task_queue_base.h" -#include "api/task_queue/to_queued_task.h" #include "rtc_base/checks.h" #include "rtc_base/event.h" #include "rtc_base/location.h" @@ -24,13 +25,14 @@ namespace webrtc { -template -void SendTask(rtc::Location loc, TaskQueueBase* task_queue, Closure&& task) { +inline void SendTask(rtc::Location loc, + TaskQueueBase* task_queue, + rtc::FunctionView task) { RTC_CHECK(!task_queue->IsCurrent()) << "Called SendTask to a queue from the same queue at " << loc.ToString(); rtc::Event event; - task_queue->PostTask( - ToQueuedTask(std::forward(task), [&event] { event.Set(); })); + absl::Cleanup cleanup = [&event] { event.Set(); }; + task_queue->PostTask([task, cleanup = std::move(cleanup)] { task(); }); RTC_CHECK(event.Wait(/*give_up_after_ms=*/rtc::Event::kForever, /*warn_after_ms=*/10'000)) << "Waited too long at " << loc.ToString(); @@ -47,24 +49,8 @@ class RTC_LOCKABLE TaskQueueForTest : public rtc::TaskQueue { // A convenience, test-only method that blocks the current thread while // a task executes on the task queue. - // This variant is specifically for posting custom QueuedTask derived - // implementations that tests do not want to pass ownership of over to the - // task queue (i.e. the Run() method always returns `false`.). - template - void SendTask(Closure* task) { - RTC_CHECK(!IsCurrent()); - rtc::Event event; - PostTask(ToQueuedTask( - [&task] { RTC_CHECK_EQ(false, static_cast(task)->Run()); }, - [&event] { event.Set(); })); - event.Wait(rtc::Event::kForever); - } - - // A convenience, test-only method that blocks the current thread while - // a task executes on the task queue. - template - void SendTask(Closure&& task, rtc::Location loc) { - ::webrtc::SendTask(loc, Get(), std::forward(task)); + void SendTask(rtc::FunctionView task, rtc::Location loc) { + ::webrtc::SendTask(loc, Get(), task); } // Wait for the completion of all tasks posted prior to the diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index 732f6d7b0a..fef30eb0e2 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -46,7 +46,6 @@ if (rtc_include_tests) { "..:rtc_task_queue", "..:task_queue_for_test", "../../api/task_queue", - "../../api/task_queue:to_queued_task", "../../api/units:time_delta", "../../api/units:timestamp", "../../system_wrappers:system_wrappers", diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index ef165ffca8..a4eaf2c05e 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -1039,8 +1039,7 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) { void Thread::AllowInvokesToThread(Thread* thread) { #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) if (!IsCurrent()) { - PostTask(webrtc::ToQueuedTask( - [thread, this]() { AllowInvokesToThread(thread); })); + PostTask([thread, this]() { AllowInvokesToThread(thread); }); return; } RTC_DCHECK_RUN_ON(this); @@ -1052,7 +1051,7 @@ void Thread::AllowInvokesToThread(Thread* thread) { void Thread::DisallowAllInvokes() { #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) if (!IsCurrent()) { - PostTask(webrtc::ToQueuedTask([this]() { DisallowAllInvokes(); })); + PostTask([this]() { DisallowAllInvokes(); }); return; } RTC_DCHECK_RUN_ON(this); diff --git a/rtc_base/thread.h b/rtc_base/thread.h index e87248c01f..7606b37621 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -32,7 +32,6 @@ #include "api/function_view.h" #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" -#include "api/task_queue/to_queued_task.h" #include "api/units/time_delta.h" #include "rtc_base/checks.h" #include "rtc_base/deprecated/recursive_critical_section.h" diff --git a/rtc_base/time_utils_unittest.cc b/rtc_base/time_utils_unittest.cc index ced6e357c4..33b84d5346 100644 --- a/rtc_base/time_utils_unittest.cc +++ b/rtc_base/time_utils_unittest.cc @@ -12,7 +12,6 @@ #include -#include "api/task_queue/to_queued_task.h" #include "api/units/time_delta.h" #include "rtc_base/event.h" #include "rtc_base/fake_clock.h" @@ -23,6 +22,7 @@ #include "test/gtest.h" namespace rtc { +using ::webrtc::TimeDelta; TEST(TimeTest, TimeInMs) { int64_t ts_earlier = TimeMillis(); @@ -270,10 +270,9 @@ TEST(FakeClock, SettingTimeWakesThreads) { // Post an event that won't be executed for 10 seconds. Event message_handler_dispatched; - worker->PostDelayedTask(webrtc::ToQueuedTask([&message_handler_dispatched] { - message_handler_dispatched.Set(); - }), - /*milliseconds=*/60000); + worker->PostDelayedTask( + [&message_handler_dispatched] { message_handler_dispatched.Set(); }, + TimeDelta::Seconds(60)); // Wait for a bit for the worker thread to be started and enter its socket // select(). Otherwise this test would be trivial since the worker thread diff --git a/rtc_base/unique_id_generator_unittest.cc b/rtc_base/unique_id_generator_unittest.cc index dc5e9c245f..c6eb511e57 100644 --- a/rtc_base/unique_id_generator_unittest.cc +++ b/rtc_base/unique_id_generator_unittest.cc @@ -14,8 +14,10 @@ #include #include "absl/algorithm/container.h" +#include "absl/functional/any_invocable.h" #include "api/array_view.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" #include "rtc_base/gunit.h" #include "rtc_base/helpers.h" #include "test/gmock.h" @@ -31,9 +33,11 @@ class FakeTaskQueue : public webrtc::TaskQueueBase { FakeTaskQueue() : task_queue_setter_(this) {} void Delete() override {} - void PostTask(std::unique_ptr task) override {} - void PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) override {} + void PostTask(absl::AnyInvocable task) override {} + void PostDelayedTask(absl::AnyInvocable task, + webrtc::TimeDelta delay) override {} + void PostDelayedHighPrecisionTask(absl::AnyInvocable task, + webrtc::TimeDelta delay) override {} private: CurrentTaskQueueSetter task_queue_setter_; diff --git a/rtc_tools/network_tester/BUILD.gn b/rtc_tools/network_tester/BUILD.gn index bb1f5d9f64..47932ded5c 100644 --- a/rtc_tools/network_tester/BUILD.gn +++ b/rtc_tools/network_tester/BUILD.gn @@ -42,6 +42,7 @@ if (rtc_enable_protobuf) { "../../api:sequence_checker", "../../api/task_queue", "../../api/task_queue:default_task_queue_factory", + "../../api/task_queue:pending_task_safety_flag", "../../p2p:rtc_p2p", "../../rtc_base", "../../rtc_base:checks", @@ -58,7 +59,10 @@ if (rtc_enable_protobuf) { "../../rtc_base/system:no_unique_address", "../../rtc_base/third_party/sigslot", ] - absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] + absl_deps = [ + "//third_party/abseil-cpp/absl/functional:any_invocable", + "//third_party/abseil-cpp/absl/types:optional", + ] } network_tester_unittests_resources = [ diff --git a/rtc_tools/network_tester/packet_sender.cc b/rtc_tools/network_tester/packet_sender.cc index b80bb9872e..c9917373fe 100644 --- a/rtc_tools/network_tester/packet_sender.cc +++ b/rtc_tools/network_tester/packet_sender.cc @@ -15,7 +15,8 @@ #include #include -#include "api/task_queue/queued_task.h" +#include "absl/functional/any_invocable.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/time_utils.h" #include "rtc_tools/network_tester/config_reader.h" @@ -25,65 +26,47 @@ namespace webrtc { namespace { -class SendPacketTask : public QueuedTask { - public: - explicit SendPacketTask( - PacketSender* packet_sender, - rtc::scoped_refptr task_safety_flag) - : target_time_ms_(rtc::TimeMillis()), - packet_sender_(packet_sender), - task_safety_flag_(task_safety_flag) {} - - private: - bool Run() override { - if (task_safety_flag_->alive() && packet_sender_->IsSending()) { - packet_sender_->SendPacket(); - target_time_ms_ += packet_sender_->GetSendIntervalMs(); - int64_t delay_ms = std::max(static_cast(0), - target_time_ms_ - rtc::TimeMillis()); +absl::AnyInvocable SendPacketTask( + PacketSender* packet_sender, + rtc::scoped_refptr task_safety_flag, + int64_t target_time_ms = rtc::TimeMillis()) { + return [target_time_ms, packet_sender, + task_safety_flag = std::move(task_safety_flag)]() mutable { + if (task_safety_flag->alive() && packet_sender->IsSending()) { + packet_sender->SendPacket(); + target_time_ms += packet_sender->GetSendIntervalMs(); + int64_t delay_ms = + std::max(static_cast(0), target_time_ms - rtc::TimeMillis()); TaskQueueBase::Current()->PostDelayedTask( - std::unique_ptr(this), delay_ms); - return false; - } else { - return true; + SendPacketTask(packet_sender, std::move(task_safety_flag), + target_time_ms), + TimeDelta::Millis(delay_ms)); } - } - int64_t target_time_ms_; - PacketSender* const packet_sender_; - rtc::scoped_refptr task_safety_flag_; -}; + }; +} -class UpdateTestSettingTask : public QueuedTask { - public: - UpdateTestSettingTask( - PacketSender* packet_sender, - std::unique_ptr config_reader, - rtc::scoped_refptr task_safety_flag) - : packet_sender_(packet_sender), - config_reader_(std::move(config_reader)), - task_safety_flag_(task_safety_flag) {} - - private: - bool Run() override { - if (!task_safety_flag_->alive()) { - return true; +absl::AnyInvocable UpdateTestSettingTask( + PacketSender* packet_sender, + std::unique_ptr config_reader, + rtc::scoped_refptr task_safety_flag) { + return [packet_sender, config_reader = std::move(config_reader), + task_safety_flag = std::move(task_safety_flag)]() mutable { + if (!task_safety_flag->alive()) { + return; } - auto config = config_reader_->GetNextConfig(); - if (config) { - packet_sender_->UpdateTestSetting((*config).packet_size, - (*config).packet_send_interval_ms); + if (absl::optional config = + config_reader->GetNextConfig()) { + packet_sender->UpdateTestSetting(config->packet_size, + config->packet_send_interval_ms); TaskQueueBase::Current()->PostDelayedTask( - std::unique_ptr(this), (*config).execution_time_ms); - return false; + UpdateTestSettingTask(packet_sender, std::move(config_reader), + std::move(task_safety_flag)), + TimeDelta::Millis(config->execution_time_ms)); } else { - packet_sender_->StopSending(); - return true; + packet_sender->StopSending(); } - } - PacketSender* const packet_sender_; - const std::unique_ptr config_reader_; - rtc::scoped_refptr task_safety_flag_; -}; + }; +} } // namespace @@ -105,15 +88,14 @@ PacketSender::~PacketSender() = default; void PacketSender::StartSending() { worker_queue_checker_.Detach(); - worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() { + worker_queue_->PostTask(SafeTask(task_safety_flag_, [this]() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); sending_ = true; })); - worker_queue_->PostTask(std::make_unique( + worker_queue_->PostTask(UpdateTestSettingTask( this, std::make_unique(config_file_path_), task_safety_flag_)); - worker_queue_->PostTask( - std::make_unique(this, task_safety_flag_)); + worker_queue_->PostTask(SendPacketTask(this, task_safety_flag_)); } void PacketSender::StopSending() { diff --git a/rtc_tools/network_tester/packet_sender.h b/rtc_tools/network_tester/packet_sender.h index 323f75bd0c..7fd500f27c 100644 --- a/rtc_tools/network_tester/packet_sender.h +++ b/rtc_tools/network_tester/packet_sender.h @@ -15,10 +15,10 @@ #include #include "api/sequence_checker.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/task_queue_factory.h" #include "rtc_base/ignore_wundef.h" #include "rtc_base/system/no_unique_address.h" -#include "rtc_base/task_queue.h" #ifdef WEBRTC_NETWORK_TESTER_PROTO RTC_PUSH_IGNORING_WUNDEF() diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc index 6b16708080..4ba43cc166 100644 --- a/rtc_tools/network_tester/test_controller.cc +++ b/rtc_tools/network_tester/test_controller.cc @@ -67,7 +67,7 @@ void TestController::SendConnectTo(const std::string& hostname, int port) { void TestController::SendData(const NetworkTesterPacket& packet, absl::optional data_size) { if (!packet_sender_thread_->IsCurrent()) { - packet_sender_thread_->PostTask(ToQueuedTask( + packet_sender_thread_->PostTask(SafeTask( task_safety_flag_, [this, packet, data_size]() { this->SendData(packet, data_size); })); return; diff --git a/rtc_tools/video_replay.cc b/rtc_tools/video_replay.cc index 346d962685..3f35a5525d 100644 --- a/rtc_tools/video_replay.cc +++ b/rtc_tools/video_replay.cc @@ -355,7 +355,7 @@ class RtpReplayer final { // Creation of the streams must happen inside a task queue because it is // resued as a worker thread. - worker_thread->PostTask(ToQueuedTask([&]() { + worker_thread->PostTask([&]() { call.reset(Call::Create(call_config)); // Attempt to load the configuration @@ -375,7 +375,7 @@ class RtpReplayer final { receive_stream->Start(); } sync_event.Set(); - })); + }); // Attempt to create an RtpReader from the input file. std::unique_ptr rtp_reader = @@ -392,7 +392,7 @@ class RtpReplayer final { // Destruction of streams and the call must happen on the same thread as // their creation. - worker_thread->PostTask(ToQueuedTask([&]() { + worker_thread->PostTask([&]() { for (const auto& receive_stream : stream_state->receive_streams) { call->DestroyVideoReceiveStream(receive_stream); } @@ -401,7 +401,7 @@ class RtpReplayer final { } call.reset(); sync_event.Set(); - })); + }); sync_event.Wait(/*give_up_after_ms=*/10000); } @@ -606,12 +606,12 @@ class RtpReplayer final { ++num_packets; PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK; - worker_thread->PostTask(ToQueuedTask([&]() { + worker_thread->PostTask([&]() { result = call->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO, std::move(packet_buffer), /* packet_time_us */ -1); event.Set(); - })); + }); event.Wait(/*give_up_after_ms=*/10000); switch (result) { case PacketReceiver::DELIVERY_OK: