Reland "Launch WebRTC-SendPacketsOnWorkerThread""

This reverts commit a09b30dd8a18f809c4a245d7ecd5848a00ccfe0e.

Reland OK: Internal test fixed.

Bug: webrtc:14502, b/254640777
Change-Id: I4838111169b10099a8b14e18170307b342e45033
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295864
Reviewed-by: Erik Språng <sprang@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39460}
This commit is contained in:
Evan Shrubsole 2023-03-02 12:52:50 +00:00 committed by WebRTC LUCI CQ
parent a76487ffd2
commit 02bdf66f95
6 changed files with 37 additions and 97 deletions

View File

@ -78,83 +78,6 @@ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
return padding_packets; return padding_packets;
} }
class TaskQueueWithFakePrecisionFactory : public TaskQueueFactory {
public:
explicit TaskQueueWithFakePrecisionFactory(
TaskQueueFactory* task_queue_factory)
: task_queue_factory_(task_queue_factory) {}
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueWithFakePrecision(
const_cast<TaskQueueWithFakePrecisionFactory*>(this),
task_queue_factory_));
}
int delayed_low_precision_count() const {
return delayed_low_precision_count_;
}
int delayed_high_precision_count() const {
return delayed_high_precision_count_;
}
private:
friend class TaskQueueWithFakePrecision;
class TaskQueueWithFakePrecision : public TaskQueueBase {
public:
TaskQueueWithFakePrecision(
TaskQueueWithFakePrecisionFactory* parent_factory,
TaskQueueFactory* task_queue_factory)
: parent_factory_(parent_factory),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueueWithFakePrecision",
TaskQueueFactory::Priority::NORMAL)) {}
~TaskQueueWithFakePrecision() override {}
void Delete() override {
// `task_queue_->Delete()` is implicitly called in the destructor due to
// TaskQueueDeleter.
delete this;
}
void PostTaskImpl(absl::AnyInvocable<void() &&> task,
const PostTaskTraits& /*traits*/,
const Location& /*location*/) override {
task_queue_->PostTask(WrapTask(std::move(task)));
}
void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const PostDelayedTaskTraits& traits,
const Location& location) override {
if (traits.high_precision) {
++parent_factory_->delayed_high_precision_count_;
task_queue_->PostDelayedHighPrecisionTask(WrapTask(std::move(task)),
delay);
} else {
++parent_factory_->delayed_low_precision_count_;
task_queue_->PostDelayedTask(WrapTask(std::move(task)), delay);
}
}
private:
absl::AnyInvocable<void() &&> WrapTask(absl::AnyInvocable<void() &&> task) {
return [this, task = std::move(task)]() mutable {
CurrentTaskQueueSetter set_current(this);
std::move(task)();
};
}
TaskQueueWithFakePrecisionFactory* parent_factory_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
};
TaskQueueFactory* task_queue_factory_;
std::atomic<int> delayed_low_precision_count_ = 0u;
std::atomic<int> delayed_high_precision_count_ = 0u;
};
} // namespace } // namespace
namespace test { namespace test {
@ -192,15 +115,15 @@ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
return packets; return packets;
} }
constexpr char kSendPacketOnWorkerThreadFieldTrial[] = constexpr char kSendPacketOnWorkerThreadFieldTrialDisabled[] =
"WebRTC-SendPacketsOnWorkerThread/Enabled/"; "WebRTC-SendPacketsOnWorkerThread/Disabled/";
std::vector<std::string> ParameterizedFieldTrials() { std::vector<std::string> ParameterizedFieldTrials() {
return {{""}, {kSendPacketOnWorkerThreadFieldTrial}}; return {{""}, {kSendPacketOnWorkerThreadFieldTrialDisabled}};
} }
bool UsingWorkerThread(absl::string_view field_trials) { bool UsingWorkerThread(absl::string_view field_trials) {
return field_trials.find(kSendPacketOnWorkerThreadFieldTrial) != return field_trials.find(kSendPacketOnWorkerThreadFieldTrialDisabled) ==
std::string::npos; std::string::npos;
} }

View File

@ -24,7 +24,7 @@ MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials,
absl::string_view task_queue_name, absl::string_view task_queue_name,
TaskQueueFactory* factory) TaskQueueFactory* factory)
: owned_task_queue_( : owned_task_queue_(
field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread") !field_trials.IsDisabled("WebRTC-SendPacketsOnWorkerThread")
? nullptr ? nullptr
: factory->CreateTaskQueue(task_queue_name, : factory->CreateTaskQueue(task_queue_name,
rtc::TaskQueue::Priority::NORMAL)), rtc::TaskQueue::Priority::NORMAL)),

View File

@ -23,11 +23,13 @@ namespace webrtc {
namespace { namespace {
constexpr char kFieldTrialString[] = constexpr char kFieldTrialEnabledString[] =
"WebRTC-SendPacketsOnWorkerThread/Enabled/"; "WebRTC-SendPacketsOnWorkerThread/Enabled/";
constexpr char kFieldTrialDisabledString[] =
"WebRTC-SendPacketsOnWorkerThread/Disabled/";
TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) { TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialString); test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -41,7 +43,7 @@ TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) {
} }
TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) { TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial(""); test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -55,7 +57,7 @@ TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) {
} }
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) { TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialString); test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -69,7 +71,7 @@ TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) {
} }
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) { TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial(""); test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -86,7 +88,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskPerDefault) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask. // We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does not have more references after a // But we can test that the safety flag does not have more references after a
// call. // call.
test::ExplicitKeyValueConfig field_trial(""); test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -101,7 +103,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask. // We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does have one more references after a // But we can test that the safety flag does have one more references after a
// call. // call.
test::ExplicitKeyValueConfig field_trial(kFieldTrialString); test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -113,7 +115,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) {
} }
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) { TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) {
test::ExplicitKeyValueConfig field_trial(""); test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -122,7 +124,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) {
} }
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) { TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialString); test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller; RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory()); MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -138,7 +140,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) {
} }
TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) { TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) {
test::ExplicitKeyValueConfig field_trial(""); test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller; RealTimeController controller;
{ {
MaybeWorkerThread m(field_trial, "test_tq", MaybeWorkerThread m(field_trial, "test_tq",
@ -148,7 +150,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) {
} }
TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorInExperiment) { TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialString); test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller; RealTimeController controller;
{ {
MaybeWorkerThread m(field_trial, "test_tq", MaybeWorkerThread m(field_trial, "test_tq",

View File

@ -150,6 +150,7 @@ if (rtc_include_tests && !build_with_chromium) {
"../time_controller", "../time_controller",
] ]
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/flags:flag", "//third_party/abseil-cpp/absl/flags:flag",
"//third_party/abseil-cpp/absl/flags:parse", "//third_party/abseil-cpp/absl/flags:parse",
"//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/functional:any_invocable",

View File

@ -10,9 +10,10 @@
#include "test/scenario/network_node.h" #include "test/scenario/network_node.h"
#include <algorithm> #include <algorithm>
#include <memory>
#include <vector> #include <vector>
#include <memory> #include "absl/cleanup/cleanup.h"
#include "rtc_base/net_helper.h" #include "rtc_base/net_helper.h"
#include "rtc_base/numerics/safe_minmax.h" #include "rtc_base/numerics/safe_minmax.h"
@ -127,13 +128,25 @@ void NetworkNodeTransport::Connect(EmulatedEndpoint* endpoint,
current_network_route_ = route; current_network_route_ = route;
} }
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged( // Must be called from the worker thread.
kDummyTransportName, route); rtc::Event event;
auto cleanup = absl::MakeCleanup([&event] { event.Set(); });
auto&& task = [this, &route, cleanup = std::move(cleanup)] {
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged(
kDummyTransportName, route);
};
if (!sender_call_->worker_thread()->IsCurrent()) {
sender_call_->worker_thread()->PostTask(std::move(task));
} else {
std::move(task)();
}
event.Wait(TimeDelta::Seconds(1));
} }
void NetworkNodeTransport::Disconnect() { void NetworkNodeTransport::Disconnect() {
MutexLock lock(&mutex_); MutexLock lock(&mutex_);
current_network_route_.connected = false; current_network_route_.connected = false;
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged( sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged(
kDummyTransportName, current_network_route_); kDummyTransportName, current_network_route_);
current_network_route_ = {}; current_network_route_ = {};

View File

@ -16,6 +16,7 @@
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "api/rtc_event_log/rtc_event_log.h" #include "api/rtc_event_log/rtc_event_log.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h" #include "api/units/time_delta.h"
#include "api/units/timestamp.h" #include "api/units/timestamp.h"
@ -161,7 +162,7 @@ class VideoSendStreamImplTest : public ::testing::Test {
int initial_encoder_max_bitrate, int initial_encoder_max_bitrate,
double initial_encoder_bitrate_priority, double initial_encoder_bitrate_priority,
VideoEncoderConfig::ContentType content_type) { VideoEncoderConfig::ContentType content_type) {
RTC_DCHECK(!worker_queue_.IsCurrent()); RTC_DCHECK_RUN_ON(&worker_queue_);
EXPECT_CALL(bitrate_allocator_, GetStartBitrate(_)) EXPECT_CALL(bitrate_allocator_, GetStartBitrate(_))
.WillOnce(Return(123000)); .WillOnce(Return(123000));