From 0fd2ed516bf2abfb7c646f3e57291f1f004d63d2 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Fri, 1 Jul 2022 13:38:59 +0200 Subject: [PATCH] Delete ProcessThread and related Module interface Bug: webrtc:7219 Change-Id: Id71430a24b21e591494557cf54419d2bc8b3f8c6 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267400 Reviewed-by: Tomas Gunnarsson Commit-Queue: Danil Chapovalov Auto-Submit: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#37416} --- audio/BUILD.gn | 1 - audio/voip/BUILD.gn | 3 - audio/voip/test/BUILD.gn | 2 - examples/androidvoip/BUILD.gn | 1 - modules/BUILD.gn | 6 +- modules/audio_device/BUILD.gn | 2 +- .../congestion_controller/goog_cc/BUILD.gn | 1 - modules/include/module.h | 63 ---- modules/pacing/BUILD.gn | 3 - .../task_queue_paced_sender_unittest.cc | 1 - modules/utility/BUILD.gn | 69 +--- .../include/mock/mock_process_thread.h | 41 --- modules/utility/include/process_thread.h | 60 ---- modules/utility/source/process_thread_impl.cc | 298 ---------------- modules/utility/source/process_thread_impl.h | 128 ------- .../source/process_thread_impl_unittest.cc | 331 ------------------ modules/video_processing/BUILD.gn | 1 - pc/BUILD.gn | 1 - 18 files changed, 17 insertions(+), 995 deletions(-) delete mode 100644 modules/include/module.h delete mode 100644 modules/utility/include/mock/mock_process_thread.h delete mode 100644 modules/utility/include/process_thread.h delete mode 100644 modules/utility/source/process_thread_impl.cc delete mode 100644 modules/utility/source/process_thread_impl.h delete mode 100644 modules/utility/source/process_thread_impl_unittest.cc diff --git a/audio/BUILD.gn b/audio/BUILD.gn index 3be1cad3ab..3efd00a9c2 100644 --- a/audio/BUILD.gn +++ b/audio/BUILD.gn @@ -186,7 +186,6 @@ if (rtc_include_tests) { "../modules/pacing", "../modules/rtp_rtcp:mock_rtp_rtcp", "../modules/rtp_rtcp:rtp_rtcp_format", - "../modules/utility", "../rtc_base:checks", "../rtc_base:macromagic", "../rtc_base:refcount", diff --git a/audio/voip/BUILD.gn b/audio/voip/BUILD.gn index b9397336b5..a70d1c382e 100644 --- a/audio/voip/BUILD.gn +++ b/audio/voip/BUILD.gn @@ -23,7 +23,6 @@ rtc_library("voip_core") { "../../modules/audio_device:audio_device_api", "../../modules/audio_mixer:audio_mixer_impl", "../../modules/audio_processing:api", - "../../modules/utility:utility", "../../rtc_base:criticalsection", "../../rtc_base:logging", "../../rtc_base/synchronization:mutex", @@ -46,7 +45,6 @@ rtc_library("audio_channel") { "../../modules/audio_device:audio_device_api", "../../modules/rtp_rtcp", "../../modules/rtp_rtcp:rtp_rtcp_format", - "../../modules/utility", "../../rtc_base:criticalsection", "../../rtc_base:location", "../../rtc_base:logging", @@ -71,7 +69,6 @@ rtc_library("audio_ingress") { "../../modules/audio_coding", "../../modules/rtp_rtcp", "../../modules/rtp_rtcp:rtp_rtcp_format", - "../../modules/utility", "../../rtc_base:criticalsection", "../../rtc_base:logging", "../../rtc_base:safe_minmax", diff --git a/audio/voip/test/BUILD.gn b/audio/voip/test/BUILD.gn index a2cc1c5451..d2ae985bc9 100644 --- a/audio/voip/test/BUILD.gn +++ b/audio/voip/test/BUILD.gn @@ -30,7 +30,6 @@ if (rtc_include_tests) { "../../../api/task_queue:default_task_queue_factory", "../../../modules/audio_device:mock_audio_device", "../../../modules/audio_processing:mocks", - "../../../modules/utility:mock_process_thread", "../../../test:audio_codec_mocks", "../../../test:mock_transport", "../../../test:run_loop", @@ -53,7 +52,6 @@ if (rtc_include_tests) { "../../../modules/audio_mixer:audio_mixer_test_utils", "../../../modules/rtp_rtcp:rtp_rtcp", "../../../modules/rtp_rtcp:rtp_rtcp_format", - "../../../modules/utility", "../../../rtc_base:logging", "../../../test:mock_transport", "../../../test:test_support", diff --git a/examples/androidvoip/BUILD.gn b/examples/androidvoip/BUILD.gn index 3120e0606b..f7f0d90e30 100644 --- a/examples/androidvoip/BUILD.gn +++ b/examples/androidvoip/BUILD.gn @@ -67,7 +67,6 @@ if (is_android) { "//api/task_queue:default_task_queue_factory", "//api/voip:voip_api", "//api/voip:voip_engine_factory", - "//modules/utility:utility", "//rtc_base", "//rtc_base/third_party/sigslot:sigslot", "//sdk/android:native_api_audio_device_module", diff --git a/modules/BUILD.gn b/modules/BUILD.gn index fd408ab8bf..3ad13b22d9 100644 --- a/modules/BUILD.gn +++ b/modules/BUILD.gn @@ -36,10 +36,7 @@ rtc_source_set("module_api_public") { rtc_source_set("module_api") { visibility = [ "*" ] - sources = [ - "include/module.h", - "include/module_common_types.h", - ] + sources = [ "include/module_common_types.h" ] } rtc_source_set("module_fec_api") { @@ -221,7 +218,6 @@ if (rtc_include_tests && !build_with_chromium) { "pacing:pacing_unittests", "remote_bitrate_estimator:remote_bitrate_estimator_unittests", "rtp_rtcp:rtp_rtcp_unittests", - "utility:utility_unittests", "video_coding:video_coding_unittests", "video_coding/timing:timing_unittests", "video_processing:video_processing_unittests", diff --git a/modules/audio_device/BUILD.gn b/modules/audio_device/BUILD.gn index b376955bdc..e565b5aed7 100644 --- a/modules/audio_device/BUILD.gn +++ b/modules/audio_device/BUILD.gn @@ -433,7 +433,6 @@ if (rtc_include_tests && !build_with_chromium) { "../../system_wrappers", "../../test:fileutils", "../../test:test_support", - "../utility", ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] if (is_linux || is_chromeos || is_mac || is_win) { @@ -460,6 +459,7 @@ if (rtc_include_tests && !build_with_chromium) { "../../sdk/android:libjingle_peerconnection_java", "../../sdk/android:native_api_jni", "../../sdk/android:native_test_jni_onload", + "../utility", ] } if (!rtc_include_internal_audio_device) { diff --git a/modules/congestion_controller/goog_cc/BUILD.gn b/modules/congestion_controller/goog_cc/BUILD.gn index 00cd2d55c1..e417405a4e 100644 --- a/modules/congestion_controller/goog_cc/BUILD.gn +++ b/modules/congestion_controller/goog_cc/BUILD.gn @@ -30,7 +30,6 @@ rtc_library("goog_cc") { ":probe_controller", ":pushback_controller", ":send_side_bwe", - "../..:module_api", "../../../api:field_trials_view", "../../../api:network_state_predictor_api", "../../../api/rtc_event_log", diff --git a/modules/include/module.h b/modules/include/module.h deleted file mode 100644 index 3046390e70..0000000000 --- a/modules/include/module.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_INCLUDE_MODULE_H_ -#define MODULES_INCLUDE_MODULE_H_ - -#include - -namespace webrtc { - -class ProcessThread; - -class Module { - public: - // Returns the number of milliseconds until the module wants a worker - // thread to call Process. - // This method is called on the same worker thread as Process will - // be called on. - // TODO(tommi): Almost all implementations of this function, need to know - // the current tick count. Consider passing it as an argument. It could - // also improve the accuracy of when the next callback occurs since the - // thread that calls Process() will also have it's tick count reference - // which might not match with what the implementations use. - virtual int64_t TimeUntilNextProcess() = 0; - - // Process any pending tasks such as timeouts. - // Called on a worker thread. - virtual void Process() = 0; - - // This method is called when the module is attached to a *running* process - // thread or detached from one. In the case of detaching, `process_thread` - // will be nullptr. - // - // This method will be called in the following cases: - // - // * Non-null process_thread: - // * ProcessThread::RegisterModule() is called while the thread is running. - // * ProcessThread::Start() is called and RegisterModule has previously - // been called. The thread will be started immediately after notifying - // all modules. - // - // * Null process_thread: - // * ProcessThread::DeRegisterModule() is called while the thread is - // running. - // * ProcessThread::Stop() was called and the thread has been stopped. - // - // NOTE: This method is not called from the worker thread itself, but from - // the thread that registers/deregisters the module or calls Start/Stop. - virtual void ProcessThreadAttached(ProcessThread* process_thread) {} - - protected: - virtual ~Module() {} -}; -} // namespace webrtc - -#endif // MODULES_INCLUDE_MODULE_H_ diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 3095df9b30..b424e2c7d8 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -32,7 +32,6 @@ rtc_library("pacing") { deps = [ ":interval_budget", - "..:module_api", "../../api:field_trials_view", "../../api:field_trials_view", "../../api:function_view", @@ -63,7 +62,6 @@ rtc_library("pacing") { "../../system_wrappers:metrics", "../rtp_rtcp", "../rtp_rtcp:rtp_rtcp_format", - "../utility", ] absl_deps = [ "//third_party/abseil-cpp/absl/memory", @@ -106,7 +104,6 @@ if (rtc_include_tests) { "../../api/units:data_rate", "../../api/units:time_delta", "../../api/units:timestamp", - "../../modules/utility:mock_process_thread", "../../rtc_base:checks", "../../rtc_base:rtc_base_tests_utils", "../../rtc_base/experiments:alr_experiment", diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index 44316efbde..926f4d969f 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -23,7 +23,6 @@ #include "api/transport/network_types.h" #include "api/units/data_rate.h" #include "modules/pacing/packet_router.h" -#include "modules/utility/include/mock/mock_process_thread.h" #include "test/gmock.h" #include "test/gtest.h" #include "test/scoped_key_value_config.h" diff --git a/modules/utility/BUILD.gn b/modules/utility/BUILD.gn index fc8512e27e..2b560943a3 100644 --- a/modules/utility/BUILD.gn +++ b/modules/utility/BUILD.gn @@ -8,68 +8,29 @@ import("../../webrtc.gni") -rtc_library("utility") { - visibility = [ "*" ] - sources = [ - "include/process_thread.h", - "source/process_thread_impl.cc", - "source/process_thread_impl.h", - ] +if (is_android) { + rtc_library("utility") { + visibility = [ "*" ] - if (is_android) { - sources += [ + sources = [ "include/helpers_android.h", "include/jvm_android.h", "source/helpers_android.cc", "source/jvm_android.cc", ] - } - if (is_ios) { - frameworks = [ "AVFoundation.framework" ] - } - - deps = [ - "..:module_api", - "../../api:sequence_checker", - "../../api/task_queue", - "../../common_audio", - "../../rtc_base:checks", - "../../rtc_base:event_tracer", - "../../rtc_base:location", - "../../rtc_base:logging", - "../../rtc_base:platform_thread", - "../../rtc_base:rtc_event", - "../../rtc_base:timeutils", - "../../rtc_base/system:arch", - "../../system_wrappers", - ] -} - -rtc_library("mock_process_thread") { - testonly = true - visibility = [ "*" ] - sources = [ "include/mock/mock_process_thread.h" ] - deps = [ - ":utility", - "../../rtc_base:location", - "../../test:test_support", - ] -} - -if (rtc_include_tests) { - rtc_library("utility_unittests") { - testonly = true - - sources = [ "source/process_thread_impl_unittest.cc" ] deps = [ - ":utility", - "..:module_api", - "../../api/task_queue", - "../../api/task_queue:task_queue_test", - "../../rtc_base:location", - "../../rtc_base:timeutils", - "../../test:test_support", + "../../api:sequence_checker", + "../../rtc_base:checks", + "../../rtc_base:logging", + "../../rtc_base:platform_thread", + "../../rtc_base/system:arch", ] } +} else { + # Add an empty source set so that dependent targets may include utility + # unconditionally. + rtc_source_set("utility") { + visibility = [ "*" ] + } } diff --git a/modules/utility/include/mock/mock_process_thread.h b/modules/utility/include/mock/mock_process_thread.h deleted file mode 100644 index e356bca99f..0000000000 --- a/modules/utility/include/mock/mock_process_thread.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_ -#define MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_ - -#include - -#include "modules/utility/include/process_thread.h" -#include "rtc_base/location.h" -#include "test/gmock.h" - -namespace webrtc { - -class MockProcessThread : public ProcessThread { - public: - MOCK_METHOD(void, Start, (), (override)); - MOCK_METHOD(void, Stop, (), (override)); - MOCK_METHOD(void, Delete, (), (override)); - MOCK_METHOD(void, WakeUp, (Module*), (override)); - MOCK_METHOD(void, PostTask, (std::unique_ptr), (override)); - MOCK_METHOD(void, - PostDelayedTask, - (std::unique_ptr, uint32_t), - (override)); - MOCK_METHOD(void, - RegisterModule, - (Module*, const rtc::Location&), - (override)); - MOCK_METHOD(void, DeRegisterModule, (Module*), (override)); -}; - -} // namespace webrtc -#endif // MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_ diff --git a/modules/utility/include/process_thread.h b/modules/utility/include/process_thread.h deleted file mode 100644 index 7786dacf94..0000000000 --- a/modules/utility/include/process_thread.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_ -#define MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_ - -#include - -#include "api/task_queue/queued_task.h" -#include "api/task_queue/task_queue_base.h" - -namespace rtc { -class Location; -} - -namespace webrtc { -class Module; - -// TODO(tommi): ProcessThread probably doesn't need to be a virtual -// interface. There exists one override besides ProcessThreadImpl, -// MockProcessThread, but when looking at how it is used, it seems -// a nullptr might suffice (or simply an actual ProcessThread instance). -class ProcessThread : public TaskQueueBase { - public: - ~ProcessThread() override; - - static std::unique_ptr Create(const char* thread_name); - - // Starts the worker thread. Must be called from the construction thread. - virtual void Start() = 0; - - // Stops the worker thread. Must be called from the construction thread. - virtual void Stop() = 0; - - // Wakes the thread up to give a module a chance to do processing right - // away. This causes the worker thread to wake up and requery the specified - // module for when it should be called back. (Typically the module should - // return 0 from TimeUntilNextProcess on the worker thread at that point). - // Can be called on any thread. - virtual void WakeUp(Module* module) = 0; - - // Adds a module that will start to receive callbacks on the worker thread. - // Can be called from any thread. - virtual void RegisterModule(Module* module, const rtc::Location& from) = 0; - - // Removes a previously registered module. - // Can be called from any thread. - virtual void DeRegisterModule(Module* module) = 0; -}; - -} // namespace webrtc - -#endif // MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_ diff --git a/modules/utility/source/process_thread_impl.cc b/modules/utility/source/process_thread_impl.cc deleted file mode 100644 index 2274aaee91..0000000000 --- a/modules/utility/source/process_thread_impl.cc +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/utility/source/process_thread_impl.h" - -#include - -#include "modules/include/module.h" -#include "rtc_base/checks.h" -#include "rtc_base/logging.h" -#include "rtc_base/time_utils.h" -#include "rtc_base/trace_event.h" - -namespace webrtc { -namespace { - -// We use this constant internally to signal that a module has requested -// a callback right away. When this is set, no call to TimeUntilNextProcess -// should be made, but Process() should be called directly. -const int64_t kCallProcessImmediately = -1; - -int64_t GetNextCallbackTime(Module* module, int64_t time_now) { - int64_t interval = module->TimeUntilNextProcess(); - if (interval < 0) { - // Falling behind, we should call the callback now. - return time_now; - } - return time_now + interval; -} -} // namespace - -ProcessThread::~ProcessThread() {} - -// static -std::unique_ptr ProcessThread::Create(const char* thread_name) { - return std::unique_ptr(new ProcessThreadImpl(thread_name)); -} - -ProcessThreadImpl::ProcessThreadImpl(const char* thread_name) - : stop_(false), thread_name_(thread_name) {} - -ProcessThreadImpl::~ProcessThreadImpl() { - RTC_DCHECK(thread_checker_.IsCurrent()); - RTC_DCHECK(!stop_); - - while (!delayed_tasks_.empty()) { - delete delayed_tasks_.top().task; - delayed_tasks_.pop(); - } - - while (!queue_.empty()) { - delete queue_.front(); - queue_.pop(); - } -} - -void ProcessThreadImpl::Delete() { - RTC_LOG(LS_WARNING) << "Process thread " << thread_name_ - << " is destroyed as a TaskQueue."; - Stop(); - delete this; -} - -// Doesn't need locking, because the contending thread isn't running. -void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS { - RTC_DCHECK(thread_checker_.IsCurrent()); - RTC_DCHECK(thread_.empty()); - if (!thread_.empty()) - return; - - RTC_DCHECK(!stop_); - - for (ModuleCallback& m : modules_) - m.module->ProcessThreadAttached(this); - - thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - CurrentTaskQueueSetter set_current(this); - while (Process()) { - } - }, - thread_name_); -} - -void ProcessThreadImpl::Stop() { - RTC_DCHECK(thread_checker_.IsCurrent()); - if (thread_.empty()) - return; - - { - // Need to take lock, for synchronization with `thread_`. - MutexLock lock(&mutex_); - stop_ = true; - } - - wake_up_.Set(); - thread_.Finalize(); - - StopNoLocks(); -} - -// No locking needed, since this is called after the contending thread is -// stopped. -void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS { - RTC_DCHECK(thread_.empty()); - stop_ = false; - - for (ModuleCallback& m : modules_) - m.module->ProcessThreadAttached(nullptr); -} - -void ProcessThreadImpl::WakeUp(Module* module) { - // Allowed to be called on any thread. - auto holds_mutex = [this] { - if (!IsCurrent()) { - return false; - } - RTC_DCHECK_RUN_ON(this); - return holds_mutex_; - }; - if (holds_mutex()) { - // Avoid locking if called on the ProcessThread, via a module's Process), - WakeUpNoLocks(module); - } else { - MutexLock lock(&mutex_); - WakeUpInternal(module); - } - wake_up_.Set(); -} - -// Must be called only indirectly from Process, which already holds the lock. -void ProcessThreadImpl::WakeUpNoLocks(Module* module) - RTC_NO_THREAD_SAFETY_ANALYSIS { - RTC_DCHECK_RUN_ON(this); - WakeUpInternal(module); -} - -void ProcessThreadImpl::WakeUpInternal(Module* module) { - for (ModuleCallback& m : modules_) { - if (m.module == module) - m.next_callback = kCallProcessImmediately; - } -} - -void ProcessThreadImpl::PostTask(std::unique_ptr task) { - // Allowed to be called on any thread, except from a module's Process method. - if (IsCurrent()) { - RTC_DCHECK_RUN_ON(this); - RTC_DCHECK(!holds_mutex_) << "Calling ProcessThread::PostTask from " - "Module::Process is not supported"; - } - { - MutexLock lock(&mutex_); - queue_.push(task.release()); - } - wake_up_.Set(); -} - -void ProcessThreadImpl::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - int64_t run_at_ms = rtc::TimeMillis() + milliseconds; - bool recalculate_wakeup_time; - { - MutexLock lock(&mutex_); - recalculate_wakeup_time = - delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms; - delayed_tasks_.emplace(run_at_ms, sequence_id_++, std::move(task)); - } - if (recalculate_wakeup_time) { - wake_up_.Set(); - } -} - -void ProcessThreadImpl::RegisterModule(Module* module, - const rtc::Location& from) { - TRACE_EVENT0("webrtc", "ProcessThreadImpl::RegisterModule"); - RTC_DCHECK(thread_checker_.IsCurrent()); - RTC_DCHECK(module) << from.ToString(); - -#if RTC_DCHECK_IS_ON - { - // Catch programmer error. - MutexLock lock(&mutex_); - for (const ModuleCallback& mc : modules_) { - RTC_DCHECK(mc.module != module) - << "Already registered here: " << mc.location.ToString() - << "\n" - "Now attempting from here: " - << from.ToString(); - } - } -#endif - - // Now that we know the module isn't in the list, we'll call out to notify - // the module that it's attached to the worker thread. We don't hold - // the lock while we make this call. - if (!thread_.empty()) - module->ProcessThreadAttached(this); - - { - MutexLock lock(&mutex_); - modules_.push_back(ModuleCallback(module, from)); - } - - // Wake the thread calling ProcessThreadImpl::Process() to update the - // waiting time. The waiting time for the just registered module may be - // shorter than all other registered modules. - wake_up_.Set(); -} - -void ProcessThreadImpl::DeRegisterModule(Module* module) { - RTC_DCHECK(thread_checker_.IsCurrent()); - RTC_DCHECK(module); - - { - MutexLock lock(&mutex_); - modules_.remove_if( - [&module](const ModuleCallback& m) { return m.module == module; }); - } - - // Notify the module that it's been detached. - module->ProcessThreadAttached(nullptr); -} - -bool ProcessThreadImpl::Process() { - TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_); - int64_t now = rtc::TimeMillis(); - int64_t next_checkpoint = now + (1000 * 60); - RTC_DCHECK_RUN_ON(this); - { - MutexLock lock(&mutex_); - if (stop_) - return false; - for (ModuleCallback& m : modules_) { - // TODO(tommi): Would be good to measure the time TimeUntilNextProcess - // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this - // operation should not require taking a lock, so querying all modules - // should run in a matter of nanoseconds. - if (m.next_callback == 0) - m.next_callback = GetNextCallbackTime(m.module, now); - - // Set to true for the duration of the calls to modules' Process(). - holds_mutex_ = true; - if (m.next_callback <= now || - m.next_callback == kCallProcessImmediately) { - { - TRACE_EVENT2("webrtc", "ModuleProcess", "function", - m.location.function_name(), "file", - m.location.file_name()); - m.module->Process(); - } - // Use a new 'now' reference to calculate when the next callback - // should occur. We'll continue to use 'now' above for the baseline - // of calculating how long we should wait, to reduce variance. - int64_t new_now = rtc::TimeMillis(); - m.next_callback = GetNextCallbackTime(m.module, new_now); - } - holds_mutex_ = false; - - if (m.next_callback < next_checkpoint) - next_checkpoint = m.next_callback; - } - - while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) { - queue_.push(delayed_tasks_.top().task); - delayed_tasks_.pop(); - } - - if (!delayed_tasks_.empty()) { - next_checkpoint = - std::min(next_checkpoint, delayed_tasks_.top().run_at_ms); - } - - while (!queue_.empty()) { - QueuedTask* task = queue_.front(); - queue_.pop(); - mutex_.Unlock(); - if (task->Run()) { - delete task; - } - mutex_.Lock(); - } - } - - int64_t time_to_wait = next_checkpoint - rtc::TimeMillis(); - if (time_to_wait > 0) - wake_up_.Wait(static_cast(time_to_wait)); - - return true; -} -} // namespace webrtc diff --git a/modules/utility/source/process_thread_impl.h b/modules/utility/source/process_thread_impl.h deleted file mode 100644 index 0dc7aff591..0000000000 --- a/modules/utility/source/process_thread_impl.h +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_ -#define MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_ - -#include - -#include -#include -#include - -#include "api/sequence_checker.h" -#include "api/task_queue/queued_task.h" -#include "modules/include/module.h" -#include "modules/utility/include/process_thread.h" -#include "rtc_base/event.h" -#include "rtc_base/location.h" -#include "rtc_base/platform_thread.h" - -namespace webrtc { - -class ProcessThreadImpl : public ProcessThread { - public: - explicit ProcessThreadImpl(const char* thread_name); - ~ProcessThreadImpl() override; - - void Start() override; - void Stop() override; - - void WakeUp(Module* module) override; - void PostTask(std::unique_ptr task) override; - void PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) override; - - void RegisterModule(Module* module, const rtc::Location& from) override; - void DeRegisterModule(Module* module) override; - - protected: - bool Process(); - - private: - struct ModuleCallback { - ModuleCallback() = delete; - ModuleCallback(ModuleCallback&& cb) = default; - ModuleCallback(const ModuleCallback& cb) = default; - ModuleCallback(Module* module, const rtc::Location& location) - : module(module), location(location) {} - bool operator==(const ModuleCallback& cb) const { - return cb.module == module; - } - - Module* const module; - int64_t next_callback = 0; // Absolute timestamp. - const rtc::Location location; - - private: - ModuleCallback& operator=(ModuleCallback&); - }; - struct DelayedTask { - DelayedTask(int64_t run_at_ms, - uint64_t sequence_id, - std::unique_ptr task) - : run_at_ms(run_at_ms), - sequence_id_(sequence_id), - task(task.release()) {} - friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) { - // Earliest DelayedTask should be at the top of the priority queue. - if (lhs.run_at_ms != rhs.run_at_ms) { - return lhs.run_at_ms > rhs.run_at_ms; - } - return lhs.sequence_id_ > rhs.sequence_id_; - } - - int64_t run_at_ms; - uint64_t sequence_id_; - // DelayedTask owns the `task`, but some delayed tasks must be removed from - // the std::priority_queue, but mustn't be deleted. std::priority_queue does - // not give non-const access to the values, so storing unique_ptr would - // delete the task as soon as it is remove from the priority queue. - // Thus lifetime of the `task` is managed manually. - QueuedTask* task; - }; - typedef std::list ModuleList; - - void Delete() override; - // The part of Stop processing that doesn't need any locking. - void StopNoLocks(); - void WakeUpNoLocks(Module* module); - void WakeUpInternal(Module* module) RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - - // Members protected by this mutex are accessed on the constructor thread and - // on the spawned process thread, and locking is needed only while the process - // thread is running. - Mutex mutex_; - - SequenceChecker thread_checker_; - rtc::Event wake_up_; - rtc::PlatformThread thread_; - - ModuleList modules_ RTC_GUARDED_BY(mutex_); - // Set to true when calling Process, to allow reentrant calls to WakeUp. - bool holds_mutex_ RTC_GUARDED_BY(this) = false; - std::queue queue_; - // `std::priority_queue` does not guarantee stable sort. For delayed tasks - // with the same wakeup time, use `sequence_id_` to ensure FIFO ordering. - std::priority_queue delayed_tasks_ RTC_GUARDED_BY(mutex_); - uint64_t sequence_id_ RTC_GUARDED_BY(mutex_) = 0; - // The `stop_` flag is modified only by the construction thread, protected by - // `thread_checker_`. It is read also by the spawned `thread_`. The latter - // thread must take `mutex_` before access, and for thread safety, the - // constructor thread needs to take `mutex_` when it modifies `stop_` and - // `thread_` is running. Annotations like RTC_GUARDED_BY doesn't support this - // usage pattern. - bool stop_ RTC_GUARDED_BY(mutex_); - const char* thread_name_; -}; - -} // namespace webrtc - -#endif // MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_ diff --git a/modules/utility/source/process_thread_impl_unittest.cc b/modules/utility/source/process_thread_impl_unittest.cc deleted file mode 100644 index 1fef0b6740..0000000000 --- a/modules/utility/source/process_thread_impl_unittest.cc +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/utility/source/process_thread_impl.h" - -#include -#include - -#include "api/task_queue/queued_task.h" -#include "api/task_queue/task_queue_test.h" -#include "modules/include/module.h" -#include "rtc_base/location.h" -#include "rtc_base/time_utils.h" -#include "test/gmock.h" -#include "test/gtest.h" - -namespace webrtc { - -using ::testing::_; -using ::testing::DoAll; -using ::testing::InSequence; -using ::testing::Invoke; -using ::testing::Return; -using ::testing::SetArgPointee; - -// The length of time, in milliseconds, to wait for an event to become signaled. -// Set to a fairly large value as there is quite a bit of variation on some -// Windows bots. -static const int kEventWaitTimeout = 500; - -class MockModule : public Module { - public: - MOCK_METHOD(int64_t, TimeUntilNextProcess, (), (override)); - MOCK_METHOD(void, Process, (), (override)); - MOCK_METHOD(void, ProcessThreadAttached, (ProcessThread*), (override)); -}; - -class RaiseEventTask : public QueuedTask { - public: - RaiseEventTask(rtc::Event* event) : event_(event) {} - bool Run() override { - event_->Set(); - return true; - } - - private: - rtc::Event* event_; -}; - -ACTION_P(SetEvent, event) { - event->Set(); -} - -ACTION_P(Increment, counter) { - ++(*counter); -} - -ACTION_P(SetTimestamp, ptr) { - *ptr = rtc::TimeMillis(); -} - -TEST(ProcessThreadImpl, StartStop) { - ProcessThreadImpl thread("ProcessThread"); - thread.Start(); - thread.Stop(); -} - -TEST(ProcessThreadImpl, MultipleStartStop) { - ProcessThreadImpl thread("ProcessThread"); - for (int i = 0; i < 5; ++i) { - thread.Start(); - thread.Stop(); - } -} - -// Verifies that we get at least call back to Process() on the worker thread. -TEST(ProcessThreadImpl, ProcessCall) { - ProcessThreadImpl thread("ProcessThread"); - thread.Start(); - - rtc::Event event; - - MockModule module; - EXPECT_CALL(module, TimeUntilNextProcess()) - .WillOnce(Return(0)) - .WillRepeatedly(Return(1)); - EXPECT_CALL(module, Process()) - .WillOnce(DoAll(SetEvent(&event), Return())) - .WillRepeatedly(Return()); - EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1); - - thread.RegisterModule(&module, RTC_FROM_HERE); - EXPECT_TRUE(event.Wait(kEventWaitTimeout)); - - EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1); - thread.Stop(); -} - -// Same as ProcessCall except the module is registered before the -// call to Start(). -TEST(ProcessThreadImpl, ProcessCall2) { - ProcessThreadImpl thread("ProcessThread"); - rtc::Event event; - - MockModule module; - EXPECT_CALL(module, TimeUntilNextProcess()) - .WillOnce(Return(0)) - .WillRepeatedly(Return(1)); - EXPECT_CALL(module, Process()) - .WillOnce(DoAll(SetEvent(&event), Return())) - .WillRepeatedly(Return()); - - thread.RegisterModule(&module, RTC_FROM_HERE); - - EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1); - thread.Start(); - EXPECT_TRUE(event.Wait(kEventWaitTimeout)); - - EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1); - thread.Stop(); -} - -// Tests setting up a module for callbacks and then unregister that module. -// After unregistration, we should not receive any further callbacks. -TEST(ProcessThreadImpl, Deregister) { - ProcessThreadImpl thread("ProcessThread"); - rtc::Event event; - - int process_count = 0; - MockModule module; - EXPECT_CALL(module, TimeUntilNextProcess()) - .WillOnce(Return(0)) - .WillRepeatedly(Return(1)); - EXPECT_CALL(module, Process()) - .WillOnce(DoAll(SetEvent(&event), Increment(&process_count), Return())) - .WillRepeatedly(DoAll(Increment(&process_count), Return())); - - thread.RegisterModule(&module, RTC_FROM_HERE); - - EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1); - thread.Start(); - - EXPECT_TRUE(event.Wait(kEventWaitTimeout)); - - EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1); - thread.DeRegisterModule(&module); - - EXPECT_GE(process_count, 1); - int count_after_deregister = process_count; - - // We shouldn't get any more callbacks. - EXPECT_FALSE(event.Wait(20)); - EXPECT_EQ(count_after_deregister, process_count); - thread.Stop(); -} - -// Helper function for testing receiving a callback after a certain amount of -// time. There's some variance of timing built into it to reduce chance of -// flakiness on bots. -void ProcessCallAfterAFewMs(int64_t milliseconds) { - ProcessThreadImpl thread("ProcessThread"); - thread.Start(); - - rtc::Event event; - - MockModule module; - int64_t start_time = 0; - int64_t called_time = 0; - EXPECT_CALL(module, TimeUntilNextProcess()) - .WillOnce(DoAll(SetTimestamp(&start_time), Return(milliseconds))) - .WillRepeatedly(Return(milliseconds)); - EXPECT_CALL(module, Process()) - .WillOnce(DoAll(SetTimestamp(&called_time), SetEvent(&event), Return())) - .WillRepeatedly(Return()); - - EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1); - thread.RegisterModule(&module, RTC_FROM_HERE); - - // Add a buffer of 50ms due to slowness of some trybots - // (e.g. win_drmemory_light) - EXPECT_TRUE(event.Wait(milliseconds + 50)); - - EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1); - thread.Stop(); - - ASSERT_GT(start_time, 0); - ASSERT_GT(called_time, 0); - // Use >= instead of > since due to rounding and timer accuracy (or lack - // thereof), can make the test run in "0"ms time. - EXPECT_GE(called_time, start_time); - // Check for an acceptable range. - uint32_t diff = called_time - start_time; - EXPECT_GE(diff, milliseconds - 15); - EXPECT_LT(diff, milliseconds + 15); -} - -// DISABLED for now since the virtual build bots are too slow :( -// TODO(tommi): Fix. -TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter5ms) { - ProcessCallAfterAFewMs(5); -} - -// DISABLED for now since the virtual build bots are too slow :( -// TODO(tommi): Fix. -TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter50ms) { - ProcessCallAfterAFewMs(50); -} - -// DISABLED for now since the virtual build bots are too slow :( -// TODO(tommi): Fix. -TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter200ms) { - ProcessCallAfterAFewMs(200); -} - -// Runs callbacks with the goal of getting up to 50 callbacks within a second -// (on average 1 callback every 20ms). On real hardware, we're usually pretty -// close to that, but the test bots that run on virtual machines, will -// typically be in the range 30-40 callbacks. -// DISABLED for now since this can take up to 2 seconds to run on the slowest -// build bots. -// TODO(tommi): Fix. -TEST(ProcessThreadImpl, DISABLED_Process50Times) { - ProcessThreadImpl thread("ProcessThread"); - thread.Start(); - - rtc::Event event; - - MockModule module; - int callback_count = 0; - // Ask for a callback after 20ms. - EXPECT_CALL(module, TimeUntilNextProcess()).WillRepeatedly(Return(20)); - EXPECT_CALL(module, Process()) - .WillRepeatedly(DoAll(Increment(&callback_count), Return())); - - EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1); - thread.RegisterModule(&module, RTC_FROM_HERE); - - EXPECT_TRUE(event.Wait(1000)); - - EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1); - thread.Stop(); - - printf("Callback count: %i\n", callback_count); - // Check that we got called back up to 50 times. - // Some of the try bots run on slow virtual machines, so the lower bound - // is much more relaxed to avoid flakiness. - EXPECT_GE(callback_count, 25); - EXPECT_LE(callback_count, 50); -} - -// Tests that we can wake up the worker thread to give us a callback right -// away when we know the thread is sleeping. -TEST(ProcessThreadImpl, WakeUp) { - ProcessThreadImpl thread("ProcessThread"); - thread.Start(); - - rtc::Event started; - rtc::Event called; - - MockModule module; - int64_t start_time; - int64_t called_time; - - // Ask for a callback after 1000ms. - // TimeUntilNextProcess will be called twice. - // The first time we use it to get the thread into a waiting state. - // Then we wake the thread and there should not be another call made to - // TimeUntilNextProcess before Process() is called. - // The second time TimeUntilNextProcess is then called, is after Process - // has been called and we don't expect any more calls. - EXPECT_CALL(module, TimeUntilNextProcess()) - .WillOnce( - DoAll(SetTimestamp(&start_time), SetEvent(&started), Return(1000))) - .WillOnce(Return(1000)); - EXPECT_CALL(module, Process()) - .WillOnce(DoAll(SetTimestamp(&called_time), SetEvent(&called), Return())) - .WillRepeatedly(Return()); - - EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1); - thread.RegisterModule(&module, RTC_FROM_HERE); - - EXPECT_TRUE(started.Wait(kEventWaitTimeout)); - thread.WakeUp(&module); - EXPECT_TRUE(called.Wait(kEventWaitTimeout)); - - EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1); - thread.Stop(); - - EXPECT_GE(called_time, start_time); - uint32_t diff = called_time - start_time; - // We should have been called back much quicker than 1sec. - EXPECT_LE(diff, 100u); -} - -// Tests that we can post a task that gets run straight away on the worker -// thread. -TEST(ProcessThreadImpl, PostTask) { - ProcessThreadImpl thread("ProcessThread"); - rtc::Event task_ran; - std::unique_ptr task(new RaiseEventTask(&task_ran)); - thread.Start(); - thread.PostTask(std::move(task)); - EXPECT_TRUE(task_ran.Wait(kEventWaitTimeout)); - thread.Stop(); -} - -class ProcessThreadFactory : public TaskQueueFactory { - public: - ~ProcessThreadFactory() override = default; - std::unique_ptr CreateTaskQueue( - absl::string_view name, - Priority priority) const override { - ProcessThreadImpl* process_thread = new ProcessThreadImpl("thread"); - process_thread->Start(); - return std::unique_ptr(process_thread); - } -}; - -INSTANTIATE_TEST_SUITE_P( - ProcessThread, - TaskQueueTest, - testing::Values(std::make_unique)); - -} // namespace webrtc diff --git a/modules/video_processing/BUILD.gn b/modules/video_processing/BUILD.gn index b3461186a2..a5b847f74b 100644 --- a/modules/video_processing/BUILD.gn +++ b/modules/video_processing/BUILD.gn @@ -31,7 +31,6 @@ rtc_library("video_processing") { "../../api/video:video_rtp_headers", "../../common_audio", "../../common_video", - "../../modules/utility", "../../rtc_base:checks", "../../rtc_base/system:arch", "../../system_wrappers", diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 5fac258f33..4195db379a 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -2463,7 +2463,6 @@ if (rtc_include_tests && !build_with_chromium) { "../media:rtc_media_tests_utils", "../modules/audio_processing", "../modules/audio_processing:api", - "../modules/utility", "../p2p:p2p_test_utils", "../p2p:rtc_p2p", "../rtc_base",