Revert "Launch WebRTC-SendPacketsOnWorkerThread"

This reverts commit 8d33105015183d02978ecefcedef241247af3802.

Reason for revert: Speculative revert, may have caused breakage in post submit tests. E.g. https://ci.chromium.org/p/webrtc/builders/ci/Linux32%20Debug/32343 (waterfall https://ci.chromium.org/p/webrtc/g/ci/console?limit=200)

Original change's description:
> Launch WebRTC-SendPacketsOnWorkerThread
>
> Bug: webrtc:14502, b/254640777
> Change-Id: I61269443b5ce87ba0c5354f863c731292c86dbce
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/293581
> Reviewed-by: Per Kjellander <perkj@webrtc.org>
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39425}

Bug: webrtc:14502, b/254640777
Change-Id: Iec5d373fb7a73bc07d8cc4af4ca03a0f60331eda
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295662
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Commit-Queue: Andrey Logvin <landrey@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Owners-Override: Andrey Logvin <landrey@webrtc.org>
Auto-Submit: Andrey Logvin <landrey@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39429}
This commit is contained in:
Andrey Logvin 2023-03-01 08:48:58 +00:00 committed by WebRTC LUCI CQ
parent 348741b8b2
commit a09b30dd8a
6 changed files with 93 additions and 37 deletions

View File

@ -78,6 +78,79 @@ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
return padding_packets;
}
class TaskQueueWithFakePrecisionFactory : public TaskQueueFactory {
public:
explicit TaskQueueWithFakePrecisionFactory(
TaskQueueFactory* task_queue_factory)
: task_queue_factory_(task_queue_factory) {}
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueWithFakePrecision(
const_cast<TaskQueueWithFakePrecisionFactory*>(this),
task_queue_factory_));
}
int delayed_low_precision_count() const {
return delayed_low_precision_count_;
}
int delayed_high_precision_count() const {
return delayed_high_precision_count_;
}
private:
friend class TaskQueueWithFakePrecision;
class TaskQueueWithFakePrecision : public TaskQueueBase {
public:
TaskQueueWithFakePrecision(
TaskQueueWithFakePrecisionFactory* parent_factory,
TaskQueueFactory* task_queue_factory)
: parent_factory_(parent_factory),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueueWithFakePrecision",
TaskQueueFactory::Priority::NORMAL)) {}
~TaskQueueWithFakePrecision() override {}
void Delete() override {
// `task_queue_->Delete()` is implicitly called in the destructor due to
// TaskQueueDeleter.
delete this;
}
void PostTask(absl::AnyInvocable<void() &&> task) override {
task_queue_->PostTask(WrapTask(std::move(task)));
}
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
++parent_factory_->delayed_low_precision_count_;
task_queue_->PostDelayedTask(WrapTask(std::move(task)), delay);
}
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
++parent_factory_->delayed_high_precision_count_;
task_queue_->PostDelayedHighPrecisionTask(WrapTask(std::move(task)),
delay);
}
private:
absl::AnyInvocable<void() &&> WrapTask(absl::AnyInvocable<void() &&> task) {
return [this, task = std::move(task)]() mutable {
CurrentTaskQueueSetter set_current(this);
std::move(task)();
};
}
TaskQueueWithFakePrecisionFactory* parent_factory_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
};
TaskQueueFactory* task_queue_factory_;
std::atomic<int> delayed_low_precision_count_ = 0u;
std::atomic<int> delayed_high_precision_count_ = 0u;
};
} // namespace
namespace test {
@ -115,15 +188,15 @@ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
return packets;
}
constexpr char kSendPacketOnWorkerThreadFieldTrialDisabled[] =
"WebRTC-SendPacketsOnWorkerThread/Disabled/";
constexpr char kSendPacketOnWorkerThreadFieldTrial[] =
"WebRTC-SendPacketsOnWorkerThread/Enabled/";
std::vector<std::string> ParameterizedFieldTrials() {
return {{""}, {kSendPacketOnWorkerThreadFieldTrialDisabled}};
return {{""}, {kSendPacketOnWorkerThreadFieldTrial}};
}
bool UsingWorkerThread(absl::string_view field_trials) {
return field_trials.find(kSendPacketOnWorkerThreadFieldTrialDisabled) ==
return field_trials.find(kSendPacketOnWorkerThreadFieldTrial) !=
std::string::npos;
}

View File

@ -24,7 +24,7 @@ MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials,
absl::string_view task_queue_name,
TaskQueueFactory* factory)
: owned_task_queue_(
!field_trials.IsDisabled("WebRTC-SendPacketsOnWorkerThread")
field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread")
? nullptr
: factory->CreateTaskQueue(task_queue_name,
rtc::TaskQueue::Priority::NORMAL)),

View File

@ -23,13 +23,11 @@ namespace webrtc {
namespace {
constexpr char kFieldTrialEnabledString[] =
constexpr char kFieldTrialString[] =
"WebRTC-SendPacketsOnWorkerThread/Enabled/";
constexpr char kFieldTrialDisabledString[] =
"WebRTC-SendPacketsOnWorkerThread/Disabled/";
TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -43,7 +41,7 @@ TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) {
}
TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -57,7 +55,7 @@ TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) {
}
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -71,7 +69,7 @@ TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) {
}
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -88,7 +86,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskPerDefault) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does not have more references after a
// call.
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -103,7 +101,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does have one more references after a
// call.
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -115,7 +113,7 @@ TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) {
}
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -124,7 +122,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) {
}
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
@ -140,7 +138,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) {
}
TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
{
MaybeWorkerThread m(field_trial, "test_tq",
@ -150,7 +148,7 @@ TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) {
}
TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
{
MaybeWorkerThread m(field_trial, "test_tq",

View File

@ -150,7 +150,6 @@ if (rtc_include_tests && !build_with_chromium) {
"../time_controller",
]
absl_deps = [
"//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/flags:flag",
"//third_party/abseil-cpp/absl/flags:parse",
"//third_party/abseil-cpp/absl/functional:any_invocable",

View File

@ -10,10 +10,9 @@
#include "test/scenario/network_node.h"
#include <algorithm>
#include <memory>
#include <vector>
#include "absl/cleanup/cleanup.h"
#include <memory>
#include "rtc_base/net_helper.h"
#include "rtc_base/numerics/safe_minmax.h"
@ -128,25 +127,13 @@ void NetworkNodeTransport::Connect(EmulatedEndpoint* endpoint,
current_network_route_ = route;
}
// Must be called from the worker thread.
rtc::Event event;
auto cleanup = absl::MakeCleanup([&event] { event.Set(); });
auto&& task = [this, &route, cleanup = std::move(cleanup)] {
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged(
kDummyTransportName, route);
};
if (!sender_call_->worker_thread()->IsCurrent()) {
sender_call_->worker_thread()->PostTask(std::move(task));
} else {
std::move(task)();
}
event.Wait(TimeDelta::Seconds(1));
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged(
kDummyTransportName, route);
}
void NetworkNodeTransport::Disconnect() {
MutexLock lock(&mutex_);
current_network_route_.connected = false;
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged(
kDummyTransportName, current_network_route_);
current_network_route_ = {};

View File

@ -16,7 +16,6 @@
#include "absl/types/optional.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@ -162,7 +161,7 @@ class VideoSendStreamImplTest : public ::testing::Test {
int initial_encoder_max_bitrate,
double initial_encoder_bitrate_priority,
VideoEncoderConfig::ContentType content_type) {
RTC_DCHECK_RUN_ON(&worker_queue_);
RTC_DCHECK(!worker_queue_.IsCurrent());
EXPECT_CALL(bitrate_allocator_, GetStartBitrate(_))
.WillOnce(Return(123000));