diff --git a/modules/BUILD.gn b/modules/BUILD.gn index bd20f73516..ffb7491b31 100644 --- a/modules/BUILD.gn +++ b/modules/BUILD.gn @@ -227,6 +227,7 @@ if (rtc_include_tests && !build_with_chromium) { "pacing:pacing_unittests", "remote_bitrate_estimator:remote_bitrate_estimator_unittests", "rtp_rtcp:rtp_rtcp_unittests", + "utility:utility_unittests", "video_coding:video_coding_unittests", "video_coding/timing:timing_unittests", ] diff --git a/modules/utility/BUILD.gn b/modules/utility/BUILD.gn index 2b560943a3..3fe4ca8c92 100644 --- a/modules/utility/BUILD.gn +++ b/modules/utility/BUILD.gn @@ -8,18 +8,39 @@ import("../../webrtc.gni") -if (is_android) { - rtc_library("utility") { - visibility = [ "*" ] +rtc_source_set("utility") { + sources = [ + "maybe_worker_thread.cc", + "maybe_worker_thread.h", + ] - sources = [ + deps = [ + "../../api:field_trials_view", + "../../api:sequence_checker", + "../../api/task_queue", + "../../api/task_queue:pending_task_safety_flag", + "../../rtc_base:checks", + "../../rtc_base:logging", + "../../rtc_base:macromagic", + "../../rtc_base:rtc_event", + "../../rtc_base:rtc_task_queue", + ] + + absl_deps = [ + "//third_party/abseil-cpp/absl/functional:any_invocable", + "//third_party/abseil-cpp/absl/strings:strings", + ] + + if (is_android) { + visibility = [ "*" ] + sources += [ "include/helpers_android.h", "include/jvm_android.h", "source/helpers_android.cc", "source/jvm_android.cc", ] - deps = [ + deps += [ "../../api:sequence_checker", "../../rtc_base:checks", "../../rtc_base:logging", @@ -27,10 +48,27 @@ if (is_android) { "../../rtc_base/system:arch", ] } -} else { - # Add an empty source set so that dependent targets may include utility - # unconditionally. - rtc_source_set("utility") { - visibility = [ "*" ] +} + +if (rtc_include_tests) { + rtc_library("utility_unittests") { + testonly = true + + sources = [ "maybe_worker_thread_unittests.cc" ] + deps = [ + ":utility", + "../../api:sequence_checker", + "../../api/task_queue", + "../../api/task_queue:default_task_queue_factory", + "../../api/task_queue:pending_task_safety_flag", + "../../api/units:time_delta", + "../../rtc_base:rtc_event", + "../../rtc_base:threading", + "../../test:explicit_key_value_config", + "../../test:field_trial", + "../../test:test_main", + "../../test:test_support", + "../../test/time_controller", + ] } } diff --git a/modules/utility/maybe_worker_thread.cc b/modules/utility/maybe_worker_thread.cc new file mode 100644 index 0000000000..3d761c5334 --- /dev/null +++ b/modules/utility/maybe_worker_thread.cc @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "modules/utility/maybe_worker_thread.h" + +#include + +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/logging.h" +#include "rtc_base/task_queue.h" + +namespace webrtc { + +MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials, + absl::string_view task_queue_name, + TaskQueueFactory* factory) + : owned_task_queue_( + field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread") + ? nullptr + : factory->CreateTaskQueue(task_queue_name, + rtc::TaskQueue::Priority::NORMAL)), + worker_thread_(TaskQueueBase::Current()) { + RTC_DCHECK(worker_thread_); + RTC_LOG(LS_INFO) << "WebRTC-SendPacketsOnWorkerThread" + << (owned_task_queue_ ? " Disabled" : " Enabled"); +} + +MaybeWorkerThread::~MaybeWorkerThread() { + RTC_DCHECK_RUN_ON(&sequence_checker_); +} + +void MaybeWorkerThread::RunSynchronous(absl::AnyInvocable task) { + if (owned_task_queue_) { + rtc::Event thread_sync_event; + auto closure = [&thread_sync_event, task = std::move(task)]() mutable { + std::move(task)(); + thread_sync_event.Set(); + }; + owned_task_queue_->PostTask(std::move(closure)); + thread_sync_event.Wait(rtc::Event::kForever); + } else { + RTC_DCHECK_RUN_ON(&sequence_checker_); + std::move(task)(); + } +} + +void MaybeWorkerThread::RunOrPost(absl::AnyInvocable task) { + if (owned_task_queue_) { + owned_task_queue_->PostTask(std::move(task)); + } else { + RTC_DCHECK_RUN_ON(&sequence_checker_); + std::move(task)(); + } +} + +TaskQueueBase* MaybeWorkerThread::TaskQueueForDelayedTasks() const { + RTC_DCHECK(IsCurrent()); + return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_; +} + +TaskQueueBase* MaybeWorkerThread::TaskQueueForPost() const { + RTC_DLOG_IF(LS_WARNING, IsCurrent()) + << "TaskQueueForPost called on the current thread. Ok only in unit " + "tests."; + return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_; +} + +bool MaybeWorkerThread::IsCurrent() const { + if (owned_task_queue_) { + return owned_task_queue_->IsCurrent(); + } + return worker_thread_->IsCurrent(); +} + +absl::AnyInvocable MaybeWorkerThread::MaybeSafeTask( + rtc::scoped_refptr flag, + absl::AnyInvocable task) { + if (owned_task_queue_) { + return task; + } else { + return SafeTask(std::move(flag), std::move(task)); + } +} + +} // namespace webrtc diff --git a/modules/utility/maybe_worker_thread.h b/modules/utility/maybe_worker_thread.h new file mode 100644 index 0000000000..6d41ad5189 --- /dev/null +++ b/modules/utility/maybe_worker_thread.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef MODULES_UTILITY_MAYBE_WORKER_THREAD_H_ +#define MODULES_UTILITY_MAYBE_WORKER_THREAD_H_ + +#include + +#include "absl/strings/string_view.h" +#include "api/field_trials_view.h" +#include "api/sequence_checker.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" +#include "api/task_queue/task_queue_factory.h" +#include "rtc_base/thread_annotations.h" + +namespace webrtc { + +// Helper class used by experiment to replace usage of the +// RTP worker task queue owned by RtpTransportControllerSend, and the pacer task +// queue owned by TaskQueuePacedSender with the one and only worker thread. +// Tasks will run on the target sequence which is either the worker thread or +// one of these task queues depending on the field trial +// "WebRTC-SendPacketsOnWorkerThread". +// This class is assumed to be created on the worker thread and the worker +// thread is assumed to outlive an instance of this class. +// +// Experiment can be tracked in +// https://bugs.chromium.org/p/webrtc/issues/detail?id=14502 +// +// After experiment evaluation, this class should be deleted. +// Calls to RunOrPost and RunSynchronous should be removed and the task should +// be invoked immediately. +// Instead of MaybeSafeTask a SafeTask should be used when posting tasks. +class RTC_LOCKABLE MaybeWorkerThread { + public: + MaybeWorkerThread(const FieldTrialsView& field_trials, + absl::string_view task_queue_name, + TaskQueueFactory* factory); + ~MaybeWorkerThread(); + + // Runs `task` immediately on the worker thread if in experiment, otherwise + // post the task on the task queue. + void RunOrPost(absl::AnyInvocable task); + // Runs `task` immediately on the worker thread if in experiment, otherwise + // post the task on the task queue and use an even to wait for completion. + void RunSynchronous(absl::AnyInvocable task); + + // Used for posting delayed or repeated tasks on the worker thread or task + // queue depending on the field trial. DCHECKs that this method is called on + // the target sequence. + TaskQueueBase* TaskQueueForDelayedTasks() const; + + // Used when a task has to be posted from one sequence to the target + // sequence. A task should only be posted if a sequence hop is needed. + TaskQueueBase* TaskQueueForPost() const; + + // Workaround to use a SafeTask only if the target sequence is the worker + // thread. This is used when a SafeTask can not be used because the object + // that posted the task is not destroyed on the target sequence. Instead, the + // caller has to guarantee that this MaybeWorkerThread is destroyed first + // since that guarantee that the posted task is deleted or run before the + // owning class. + absl::AnyInvocable MaybeSafeTask( + rtc::scoped_refptr flag, + absl::AnyInvocable task); + + // To implement macro RTC_DCHECK_RUN_ON. + // Implementation delegate to the actual used sequence. + bool IsCurrent() const; + + private: + SequenceChecker sequence_checker_; + const std::unique_ptr owned_task_queue_; + TaskQueueBase* const worker_thread_; +}; + +} // namespace webrtc + +#endif // MODULES_UTILITY_MAYBE_WORKER_THREAD_H_ diff --git a/modules/utility/maybe_worker_thread_unittests.cc b/modules/utility/maybe_worker_thread_unittests.cc new file mode 100644 index 0000000000..ec051242e7 --- /dev/null +++ b/modules/utility/maybe_worker_thread_unittests.cc @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include + +#include "api/sequence_checker.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_factory.h" +#include "api/units/time_delta.h" +#include "modules/utility/maybe_worker_thread.h" +#include "rtc_base/event.h" +#include "test/explicit_key_value_config.h" +#include "test/gtest.h" +#include "test/time_controller/real_time_controller.h" + +namespace webrtc { + +namespace { + +constexpr char kFieldTrialString[] = + "WebRTC-SendPacketsOnWorkerThread/Enabled/"; + +TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) { + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + bool run = false; + m.RunOrPost([&] { + EXPECT_TRUE(checker.IsCurrent()); + run = true; + }); + EXPECT_TRUE(run); +} + +TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) { + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + rtc::Event event; + m.RunOrPost([&] { + EXPECT_FALSE(checker.IsCurrent()); + event.Set(); + }); + EXPECT_TRUE(event.Wait(TimeDelta::Seconds(10))); +} + +TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) { + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + bool run = false; + m.RunSynchronous([&] { + EXPECT_TRUE(checker.IsCurrent()); + run = true; + }); + EXPECT_TRUE(run); +} + +TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) { + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + SequenceChecker checker; + bool run = false; + m.RunSynchronous([&] { + EXPECT_FALSE(checker.IsCurrent()); + run = true; + }); + EXPECT_TRUE(run); +} + +TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskPerDefault) { + // We cant really test that the return value from MaybeSafeTask is a SafeTask. + // But we can test that the safety flag does not have more references after a + // call. + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + rtc::scoped_refptr flag = + PendingTaskSafetyFlag::Create(); + auto closure = m.MaybeSafeTask(flag, [] {}); + EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kDroppedLastRef); + flag.release(); +} + +TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) { + // We cant really test that the return value from MaybeSafeTask is a SafeTask. + // But we can test that the safety flag does have one more references after a + // call. + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + rtc::scoped_refptr flag = + PendingTaskSafetyFlag::Create(); + auto closure = m.MaybeSafeTask(flag, [] {}); + EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kOtherRefsRemained); + flag.release(); +} + +TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) { + test::ExplicitKeyValueConfig field_trial(""); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + EXPECT_FALSE(m.IsCurrent()); + m.RunSynchronous([&] { EXPECT_TRUE(m.IsCurrent()); }); +} + +TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) { + test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + RealTimeController controller; + MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); + + EXPECT_TRUE(m.IsCurrent()); + auto tq = controller.GetTaskQueueFactory()->CreateTaskQueue( + "tq", TaskQueueFactory::Priority::NORMAL); + rtc::Event event; + tq->PostTask([&] { + EXPECT_FALSE(m.IsCurrent()); + event.Set(); + }); + ASSERT_TRUE(event.Wait(TimeDelta::Seconds(10))); +} + +} // namespace +} // namespace webrtc