From 02bdf66f95e3639f3c1cccefee1a9bc5f380c0b8 Mon Sep 17 00:00:00 2001 From: Evan Shrubsole Date: Thu, 2 Mar 2023 12:52:50 +0000 Subject: [PATCH] Reland "Launch WebRTC-SendPacketsOnWorkerThread"" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit a09b30dd8a18f809c4a245d7ecd5848a00ccfe0e. Reland OK: Internal test fixed. Bug: webrtc:14502, b/254640777 Change-Id: I4838111169b10099a8b14e18170307b342e45033 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295864 Reviewed-by: Erik Språng Reviewed-by: Tomas Gunnarsson Commit-Queue: Evan Shrubsole Cr-Commit-Position: refs/heads/main@{#39460} --- .../task_queue_paced_sender_unittest.cc | 85 +------------------ modules/utility/maybe_worker_thread.cc | 2 +- .../utility/maybe_worker_thread_unittests.cc | 24 +++--- test/scenario/BUILD.gn | 1 + test/scenario/network_node.cc | 19 ++++- video/video_send_stream_impl_unittest.cc | 3 +- 6 files changed, 37 insertions(+), 97 deletions(-) diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index 64415d7411..55ec1445c9 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -78,83 +78,6 @@ std::vector> GeneratePadding( return padding_packets; } -class TaskQueueWithFakePrecisionFactory : public TaskQueueFactory { - public: - explicit TaskQueueWithFakePrecisionFactory( - TaskQueueFactory* task_queue_factory) - : task_queue_factory_(task_queue_factory) {} - - std::unique_ptr CreateTaskQueue( - absl::string_view name, - Priority priority) const override { - return std::unique_ptr( - new TaskQueueWithFakePrecision( - const_cast(this), - task_queue_factory_)); - } - - int delayed_low_precision_count() const { - return delayed_low_precision_count_; - } - int delayed_high_precision_count() const { - return delayed_high_precision_count_; - } - - private: - friend class TaskQueueWithFakePrecision; - - class TaskQueueWithFakePrecision : public TaskQueueBase { - public: - TaskQueueWithFakePrecision( - TaskQueueWithFakePrecisionFactory* parent_factory, - TaskQueueFactory* task_queue_factory) - : parent_factory_(parent_factory), - task_queue_(task_queue_factory->CreateTaskQueue( - "TaskQueueWithFakePrecision", - TaskQueueFactory::Priority::NORMAL)) {} - ~TaskQueueWithFakePrecision() override {} - - void Delete() override { - // `task_queue_->Delete()` is implicitly called in the destructor due to - // TaskQueueDeleter. - delete this; - } - void PostTaskImpl(absl::AnyInvocable task, - const PostTaskTraits& /*traits*/, - const Location& /*location*/) override { - task_queue_->PostTask(WrapTask(std::move(task))); - } - void PostDelayedTaskImpl(absl::AnyInvocable task, - TimeDelta delay, - const PostDelayedTaskTraits& traits, - const Location& location) override { - if (traits.high_precision) { - ++parent_factory_->delayed_high_precision_count_; - task_queue_->PostDelayedHighPrecisionTask(WrapTask(std::move(task)), - delay); - } else { - ++parent_factory_->delayed_low_precision_count_; - task_queue_->PostDelayedTask(WrapTask(std::move(task)), delay); - } - } - - private: - absl::AnyInvocable WrapTask(absl::AnyInvocable task) { - return [this, task = std::move(task)]() mutable { - CurrentTaskQueueSetter set_current(this); - std::move(task)(); - }; - } - - TaskQueueWithFakePrecisionFactory* parent_factory_; - std::unique_ptr task_queue_; - }; - - TaskQueueFactory* task_queue_factory_; - std::atomic delayed_low_precision_count_ = 0u; - std::atomic delayed_high_precision_count_ = 0u; -}; - } // namespace namespace test { @@ -192,15 +115,15 @@ std::vector> GeneratePackets( return packets; } -constexpr char kSendPacketOnWorkerThreadFieldTrial[] = - "WebRTC-SendPacketsOnWorkerThread/Enabled/"; +constexpr char kSendPacketOnWorkerThreadFieldTrialDisabled[] = + "WebRTC-SendPacketsOnWorkerThread/Disabled/"; std::vector ParameterizedFieldTrials() { - return {{""}, {kSendPacketOnWorkerThreadFieldTrial}}; + return {{""}, {kSendPacketOnWorkerThreadFieldTrialDisabled}}; } bool UsingWorkerThread(absl::string_view field_trials) { - return field_trials.find(kSendPacketOnWorkerThreadFieldTrial) != + return field_trials.find(kSendPacketOnWorkerThreadFieldTrialDisabled) == std::string::npos; } diff --git a/modules/utility/maybe_worker_thread.cc b/modules/utility/maybe_worker_thread.cc index abb52d4691..aaa79bb9f3 100644 --- a/modules/utility/maybe_worker_thread.cc +++ b/modules/utility/maybe_worker_thread.cc @@ -24,7 +24,7 @@ MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials, absl::string_view task_queue_name, TaskQueueFactory* factory) : owned_task_queue_( - field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread") + !field_trials.IsDisabled("WebRTC-SendPacketsOnWorkerThread") ? nullptr : factory->CreateTaskQueue(task_queue_name, rtc::TaskQueue::Priority::NORMAL)), diff --git a/modules/utility/maybe_worker_thread_unittests.cc b/modules/utility/maybe_worker_thread_unittests.cc index f6a981f90a..2ce4d19727 100644 --- a/modules/utility/maybe_worker_thread_unittests.cc +++ b/modules/utility/maybe_worker_thread_unittests.cc @@ -23,11 +23,13 @@ namespace webrtc { namespace { -constexpr char kFieldTrialString[] = +constexpr char kFieldTrialEnabledString[] = "WebRTC-SendPacketsOnWorkerThread/Enabled/"; +constexpr char kFieldTrialDisabledString[] = + "WebRTC-SendPacketsOnWorkerThread/Disabled/"; TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) { - test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -41,7 +43,7 @@ TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) { } TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) { - test::ExplicitKeyValueConfig field_trial(""); + test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -55,7 +57,7 @@ TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) { } TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) { - test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -69,7 +71,7 @@ TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) { } TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) { - test::ExplicitKeyValueConfig field_trial(""); + test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -86,7 +88,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskPerDefault) { // We cant really test that the return value from MaybeSafeTask is a SafeTask. // But we can test that the safety flag does not have more references after a // call. - test::ExplicitKeyValueConfig field_trial(""); + test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -101,7 +103,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) { // We cant really test that the return value from MaybeSafeTask is a SafeTask. // But we can test that the safety flag does have one more references after a // call. - test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -113,7 +115,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) { } TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) { - test::ExplicitKeyValueConfig field_trial(""); + test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -122,7 +124,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) { } TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) { - test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString); RealTimeController controller; MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); @@ -138,7 +140,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) { } TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) { - test::ExplicitKeyValueConfig field_trial(""); + test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString); RealTimeController controller; { MaybeWorkerThread m(field_trial, "test_tq", @@ -148,7 +150,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) { } TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorInExperiment) { - test::ExplicitKeyValueConfig field_trial(kFieldTrialString); + test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString); RealTimeController controller; { MaybeWorkerThread m(field_trial, "test_tq", diff --git a/test/scenario/BUILD.gn b/test/scenario/BUILD.gn index 5da6dce87d..077d1ae11f 100644 --- a/test/scenario/BUILD.gn +++ b/test/scenario/BUILD.gn @@ -150,6 +150,7 @@ if (rtc_include_tests && !build_with_chromium) { "../time_controller", ] absl_deps = [ + "//third_party/abseil-cpp/absl/cleanup", "//third_party/abseil-cpp/absl/flags:flag", "//third_party/abseil-cpp/absl/flags:parse", "//third_party/abseil-cpp/absl/functional:any_invocable", diff --git a/test/scenario/network_node.cc b/test/scenario/network_node.cc index e149bb11e0..c9aee680b8 100644 --- a/test/scenario/network_node.cc +++ b/test/scenario/network_node.cc @@ -10,9 +10,10 @@ #include "test/scenario/network_node.h" #include +#include #include -#include +#include "absl/cleanup/cleanup.h" #include "rtc_base/net_helper.h" #include "rtc_base/numerics/safe_minmax.h" @@ -127,13 +128,25 @@ void NetworkNodeTransport::Connect(EmulatedEndpoint* endpoint, current_network_route_ = route; } - sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged( - kDummyTransportName, route); + // Must be called from the worker thread. + rtc::Event event; + auto cleanup = absl::MakeCleanup([&event] { event.Set(); }); + auto&& task = [this, &route, cleanup = std::move(cleanup)] { + sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged( + kDummyTransportName, route); + }; + if (!sender_call_->worker_thread()->IsCurrent()) { + sender_call_->worker_thread()->PostTask(std::move(task)); + } else { + std::move(task)(); + } + event.Wait(TimeDelta::Seconds(1)); } void NetworkNodeTransport::Disconnect() { MutexLock lock(&mutex_); current_network_route_.connected = false; + sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged( kDummyTransportName, current_network_route_); current_network_route_ = {}; diff --git a/video/video_send_stream_impl_unittest.cc b/video/video_send_stream_impl_unittest.cc index c38dcd0e1e..0fcc5e44cc 100644 --- a/video/video_send_stream_impl_unittest.cc +++ b/video/video_send_stream_impl_unittest.cc @@ -16,6 +16,7 @@ #include "absl/types/optional.h" #include "api/rtc_event_log/rtc_event_log.h" +#include "api/sequence_checker.h" #include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" @@ -161,7 +162,7 @@ class VideoSendStreamImplTest : public ::testing::Test { int initial_encoder_max_bitrate, double initial_encoder_bitrate_priority, VideoEncoderConfig::ContentType content_type) { - RTC_DCHECK(!worker_queue_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_queue_); EXPECT_CALL(bitrate_allocator_, GetStartBitrate(_)) .WillOnce(Return(123000));