From edcae05bd4d66a564dca12741cc4912458d1c8a5 Mon Sep 17 00:00:00 2001 From: Per Kjellander Date: Mon, 3 Oct 2022 11:48:23 +0200 Subject: [PATCH] Add utility class MaybeWorkerThread The class will be used in experiment aiming at reducing the number of used threads. The experiment will remove the need for the Pacer TQ and RTP module worker TQ. The helper ensure calls are made on either the worker thread a TQ depending on the field trial "WebRTC-SendPacketsOnWorkerThread" Bug: webrtc:14502 Change-Id: I47581e3e3203712a244f1cb76952cd94734cc3f1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/277444 Reviewed-by: Evan Shrubsole Commit-Queue: Per Kjellander Reviewed-by: Mirko Bonadei Cr-Commit-Position: refs/heads/main@{#38289} --- modules/BUILD.gn | 1 + modules/utility/BUILD.gn | 58 +++++-- modules/utility/maybe_worker_thread.cc | 94 ++++++++++++ modules/utility/maybe_worker_thread.h | 86 +++++++++++ .../utility/maybe_worker_thread_unittests.cc | 141 ++++++++++++++++++ 5 files changed, 370 insertions(+), 10 deletions(-) create mode 100644 modules/utility/maybe_worker_thread.cc create mode 100644 modules/utility/maybe_worker_thread.h create mode 100644 modules/utility/maybe_worker_thread_unittests.cc diff --git a/modules/BUILD.gn b/modules/BUILD.gn index bd20f73516..ffb7491b31 100644 --- a/modules/BUILD.gn +++ b/modules/BUILD.gn @@ -227,6 +227,7 @@ if (rtc_include_tests && !build_with_chromium) { "pacing:pacing_unittests", "remote_bitrate_estimator:remote_bitrate_estimator_unittests", "rtp_rtcp:rtp_rtcp_unittests", + "utility:utility_unittests", "video_coding:video_coding_unittests", "video_coding/timing:timing_unittests", ] diff --git a/modules/utility/BUILD.gn b/modules/utility/BUILD.gn index 2b560943a3..3fe4ca8c92 100644 --- a/modules/utility/BUILD.gn +++ b/modules/utility/BUILD.gn @@ -8,18 +8,39 @@ import("../../webrtc.gni") -if (is_android) { - rtc_library("utility") { - visibility = [ "*" ] +rtc_source_set("utility") { + sources = [ + "maybe_worker_thread.cc", + "maybe_worker_thread.h", + ] - sources = [ + deps = [ + "../../api:field_trials_view", + "../../api:sequence_checker", + "../../api/task_queue", + "../../api/task_queue:pending_task_safety_flag", + "../../rtc_base:checks", + "../../rtc_base:logging", + "../../rtc_base:macromagic", + "../../rtc_base:rtc_event", + "../../rtc_base:rtc_task_queue", + ] + + absl_deps = [ + "//third_party/abseil-cpp/absl/functional:any_invocable", + "//third_party/abseil-cpp/absl/strings:strings", + ] + + if (is_android) { + visibility = [ "*" ] + sources += [ "include/helpers_android.h", "include/jvm_android.h", "source/helpers_android.cc", "source/jvm_android.cc", ] - deps = [ + deps += [ "../../api:sequence_checker", "../../rtc_base:checks", "../../rtc_base:logging", @@ -27,10 +48,27 @@ if (is_android) { "../../rtc_base/system:arch", ] } -} else { - # Add an empty source set so that dependent targets may include utility - # unconditionally. - rtc_source_set("utility") { - visibility = [ "*" ] +} + +if (rtc_include_tests) { + rtc_library("utility_unittests") { + testonly = true + + sources = [ "maybe_worker_thread_unittests.cc" ] + deps = [ + ":utility", + "../../api:sequence_checker", + "../../api/task_queue", + "../../api/task_queue:default_task_queue_factory", + "../../api/task_queue:pending_task_safety_flag", + "../../api/units:time_delta", + "../../rtc_base:rtc_event", + "../../rtc_base:threading", + "../../test:explicit_key_value_config", + "../../test:field_trial", + "../../test:test_main", + "../../test:test_support", + "../../test/time_controller", + ] } } diff --git a/modules/utility/maybe_worker_thread.cc b/modules/utility/maybe_worker_thread.cc new file mode 100644 index 0000000000..3d761c5334 --- /dev/null +++ b/modules/utility/maybe_worker_thread.cc @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "modules/utility/maybe_worker_thread.h" + +#include + +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/logging.h" +#include "rtc_base/task_queue.h" + +namespace webrtc { + +MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials, + absl::string_view task_queue_name, + TaskQueueFactory* factory) + : owned_task_queue_( + field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread") + ? nullptr + : factory->CreateTaskQueue(task_queue_name, + rtc::TaskQueue::Priority::NORMAL)), + worker_thread_(TaskQueueBase::Current()) { + RTC_DCHECK(worker_thread_); + RTC_LOG(LS_INFO) << "WebRTC-SendPacketsOnWorkerThread" + << (owned_task_queue_ ? " Disabled" : " Enabled"); +} + +MaybeWorkerThread::~MaybeWorkerThread() { + RTC_DCHECK_RUN_ON(&sequence_checker_); +} + +void MaybeWorkerThread::RunSynchronous(absl::AnyInvocable task) { + if (owned_task_queue_) { + rtc::Event thread_sync_event; + auto closure = [&thread_sync_event, task = std::move(task)]() mutable { + std::move(task)(); + thread_sync_event.Set(); + }; + owned_task_queue_->PostTask(std::move(closure)); + thread_sync_event.Wait(rtc::Event::kForever); + } else { + RTC_DCHECK_RUN_ON(&sequence_checker_); + std::move(task)(); + } +} + +void MaybeWorkerThread::RunOrPost(absl::AnyInvocable task) { + if (owned_task_queue_) { + owned_task_queue_->PostTask(std::move(task)); + } else { + RTC_DCHECK_RUN_ON(&sequence_checker_); + std::move(task)(); + } +} + +TaskQueueBase* MaybeWorkerThread::TaskQueueForDelayedTasks() const { + RTC_DCHECK(IsCurrent()); + return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_; +} + +TaskQueueBase* MaybeWorkerThread::TaskQueueForPost() const { + RTC_DLOG_IF(LS_WARNING, IsCurrent()) + << "TaskQueueForPost called on the current thread. Ok only in unit " + "tests."; + return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_; +} + +bool MaybeWorkerThread::IsCurrent() const { + if (owned_task_queue_) { + return owned_task_queue_->IsCurrent(); + } + return worker_thread_->IsCurrent(); +} + +absl::AnyInvocable MaybeWorkerThread::MaybeSafeTask( + rtc::scoped_refptr flag, + absl::AnyInvocable task) { + if (owned_task_queue_) { + return task; + } else { + return SafeTask(std::move(flag), std::move(task)); + } +} + +} // namespace webrtc diff --git a/modules/utility/maybe_worker_thread.h b/modules/utility/maybe_worker_thread.h new file mode 100644 index 0000000000..6d41ad5189 --- /dev/null +++ b/modules/utility/maybe_worker_thread.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef MODULES_UTILITY_MAYBE_WORKER_THREAD_H_ +#define MODULES_UTILITY_MAYBE_WORKER_THREAD_H_ + +#include + +#include "absl/strings/string_view.h" +#include "api/field_trials_view.h" +#include "api/sequence_checker.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" +#include "api/task_queue/task_queue_factory.h" +#include "rtc_base/thread_annotations.h" + +namespace webrtc { + +// Helper class used by experiment to replace usage of the +// RTP worker task queue owned by RtpTransportControllerSend, and the pacer task +// queue owned by TaskQueuePacedSender with the one and only worker thread. +// Tasks will run on the target sequence which is either the worker thread or +// one of these task queues depending on the field trial +// "WebRTC-SendPacketsOnWorkerThread". +// This class is assumed to be created on the worker thread and the worker +// thread is assumed to outlive an instance of this class. +// +// Experiment can be tracked in +// https://bugs.chromium.org/p/webrtc/issues/detail?id=14502 +// +// After experiment evaluation, this class should be deleted. +// Calls to RunOrPost and RunSynchronous should be removed and the task should +// be invoked immediately. +// Instead of MaybeSafeTask a SafeTask should be used when posting tasks. +class RTC_LOCKABLE MaybeWorkerThread { + public: + MaybeWorkerThread(const FieldTrialsView& field_trials, + absl::string_view task_queue_name, + TaskQueueFactory* factory); + ~MaybeWorkerThread(); + + // Runs `task` immediately on the worker thread if in experiment, otherwise + // post the task on the task queue. + void RunOrPost(absl::AnyInvocable task); + // Runs `task` immediately on the worker thread if in experiment, otherwise + // post the task on the task queue and use an even to wait for completion. + void RunSynchronous(absl::AnyInvocable task); + + // Used for posting delayed or repeated tasks on the worker thread or task + // queue depending on the field trial. DCHECKs that this method is called on + // the target sequence. + TaskQueueBase* TaskQueueForDelayedTasks() const; + + // Used when a task has to be posted from one sequence to the target + // sequence. A task should only be posted if a sequence hop is needed. + TaskQueueBase* TaskQueueForPost() const; + + // Workaround to use a SafeTask only if the target sequence is the worker + // thread. This is used when a SafeTask can not be used because the object + // that posted the task is not destroyed on the target sequence. Instead, the + // caller has to guarantee that this MaybeWorkerThread is destroyed first + // since that guarantee that the posted task is deleted or run before the + // owning class. + absl::AnyInvocable MaybeSafeTask( + rtc::scoped_refptr flag, + absl::AnyInvocable task); + + // To implement macro RTC_DCHECK_RUN_ON. + // Implementation delegate to the actual used sequence. + bool IsCurrent() const; + + private: + SequenceChecker sequence_checker_; + const std::unique_ptr owned_task_queue_; + TaskQueueBase* const worker_thread_; +}; + +} // namespace webrtc + +#endif // MODULES_UTILITY_MAYBE_WORKER_THREAD_H_ diff --git a/modules/utility/maybe_worker_thread_unittests.cc b/modules/utility/maybe_worker_thread_unittests.cc new file mode 100644 index 0000000000..ec051242e7 --- /dev/null +++ b/modules/utility/maybe_worker_thread_unittests.cc @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include + +#include "api/sequence_checker.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_factory.h" +#include "api/units/time_delta.h" +#include "modules/utility/maybe_worker_thread.h" +#include "rtc_base/event.h" +#include "test/explicit_key_value_config.h" +#include "test/gtest.h" +#include "test/time_controller/real_time_controller.h" + +namespace webrtc { + +namespace { + +constexpr char kFieldTrialString[] = + "WebRTC-SendPacketsOnWorkerThread/Enabled/"; + +TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) { + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + bool run = false; + m.RunOrPost([&] { + EXPECT_TRUE(checker.IsCurrent()); + run = true; + }); + EXPECT_TRUE(run); +} + +TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) { + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + rtc::Event event; + m.RunOrPost([&] { + EXPECT_FALSE(checker.IsCurrent()); + event.Set(); + }); + EXPECT_TRUE(event.Wait(TimeDelta::Seconds(10))); +} + +TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) { + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + bool run = false; + m.RunSynchronous([&] { + EXPECT_TRUE(checker.IsCurrent()); + run = true; + }); + EXPECT_TRUE(run); +} + +TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) { + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + bool run = false; + m.RunSynchronous([&] { + EXPECT_FALSE(checker.IsCurrent()); + run = true; + }); + EXPECT_TRUE(run); +} + +TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskPerDefault) { + // We cant really test that the return value from MaybeSafeTask is a SafeTask. + // But we can test that the safety flag does not have more references after a + // call. + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + rtc::scoped_refptr flag = + PendingTaskSafetyFlag::Create(); + auto closure = m.MaybeSafeTask(flag, [] {}); + EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kDroppedLastRef); + flag.release(); +} + +TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) { + // We cant really test that the return value from MaybeSafeTask is a SafeTask. + // But we can test that the safety flag does have one more references after a + // call. + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + rtc::scoped_refptr flag = + PendingTaskSafetyFlag::Create(); + auto closure = m.MaybeSafeTask(flag, [] {}); + EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kOtherRefsRemained); + flag.release(); +} + +TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) { + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + EXPECT_FALSE(m.IsCurrent()); + m.RunSynchronous([&] { EXPECT_TRUE(m.IsCurrent()); }); +} + +TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) { + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + EXPECT_TRUE(m.IsCurrent()); + auto tq = controller.GetTaskQueueFactory()->CreateTaskQueue( + "tq", TaskQueueFactory::Priority::NORMAL); + rtc::Event event; + tq->PostTask([&] { + EXPECT_FALSE(m.IsCurrent()); + event.Set(); + }); + ASSERT_TRUE(event.Wait(TimeDelta::Seconds(10))); +} + +} // namespace +} // namespace webrtc