Extract activity executor into separate class from PC level fixture impl

Bug: webrtc:11479
Change-Id: Ida9c944d928e9973bf543a2e5b415a7c9007b833
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/173024
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31032}
This commit is contained in:
Artem Titov 2020-04-07 23:36:07 +02:00 committed by Commit Bot
parent f16e550c14
commit 8f888ff546
5 changed files with 252 additions and 145 deletions

View File

@ -304,6 +304,28 @@ if (rtc_include_tests) {
]
}
rtc_library("test_activities_executor") {
visibility = [ "*" ]
testonly = true
sources = [
"test_activities_executor.cc",
"test_activities_executor.h",
]
deps = [
"../../../api/units:time_delta",
"../../../api/units:timestamp",
"../../../rtc_base:checks",
"../../../rtc_base:criticalsection",
"../../../rtc_base:logging",
"../../../rtc_base:rtc_base_approved",
"../../../rtc_base:task_queue_for_test",
"../../../rtc_base/task_utils:repeating_task",
"../../../system_wrappers",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/types:optional",
]
}
rtc_library("peerconnection_quality_test") {
visibility = [ "*" ]
testonly = true
@ -322,6 +344,7 @@ if (rtc_include_tests) {
":sdp_changer",
":single_process_encoded_image_data_injector",
":stats_poller",
":test_activities_executor",
":test_peer",
":test_peer_factory",
":video_quality_analyzer_injection_helper",
@ -348,7 +371,6 @@ if (rtc_include_tests) {
"../../../rtc_base:rtc_base_approved",
"../../../rtc_base:safe_conversions",
"../../../rtc_base:task_queue_for_test",
"../../../rtc_base/task_utils:repeating_task",
"../../../system_wrappers",
"../../../system_wrappers:field_trial",
]

View File

@ -22,7 +22,6 @@
#include "api/scoped_refptr.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/test/video_quality_analyzer_interface.h"
#include "api/units/time_delta.h"
#include "pc/sdp_utils.h"
#include "pc/test/mock_peer_connection_observers.h"
#include "rtc_base/bind.h"
@ -106,7 +105,8 @@ PeerConnectionE2EQualityTest::PeerConnectionE2EQualityTest(
std::unique_ptr<VideoQualityAnalyzerInterface> video_quality_analyzer)
: clock_(Clock::GetRealTimeClock()),
task_queue_factory_(CreateDefaultTaskQueueFactory()),
test_case_name_(std::move(test_case_name)) {
test_case_name_(std::move(test_case_name)),
executor_(std::make_unique<TestActivitiesExecutor>(clock_)) {
// Create default video quality analyzer. We will always create an analyzer,
// even if there are no video streams, because it will be installed into video
// encoder/decoder factories.
@ -129,74 +129,14 @@ PeerConnectionE2EQualityTest::PeerConnectionE2EQualityTest(
void PeerConnectionE2EQualityTest::ExecuteAt(
TimeDelta target_time_since_start,
std::function<void(TimeDelta)> func) {
ExecuteTask(target_time_since_start, absl::nullopt, func);
executor_->ScheduleActivity(target_time_since_start, absl::nullopt, func);
}
void PeerConnectionE2EQualityTest::ExecuteEvery(
TimeDelta initial_delay_since_start,
TimeDelta interval,
std::function<void(TimeDelta)> func) {
ExecuteTask(initial_delay_since_start, interval, func);
}
void PeerConnectionE2EQualityTest::ExecuteTask(
TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func) {
RTC_CHECK(initial_delay_since_start.IsFinite() &&
initial_delay_since_start >= TimeDelta::Zero());
RTC_CHECK(!interval ||
(interval->IsFinite() && *interval > TimeDelta::Zero()));
rtc::CritScope crit(&lock_);
ScheduledActivity activity(initial_delay_since_start, interval, func);
if (start_time_.IsInfinite()) {
scheduled_activities_.push(std::move(activity));
} else {
PostTask(std::move(activity));
}
}
void PeerConnectionE2EQualityTest::PostTask(ScheduledActivity activity) {
// Because start_time_ will never change at this point copy it to local
// variable to capture in in lambda without requirement to hold a lock.
Timestamp start_time = start_time_;
TimeDelta remaining_delay =
activity.initial_delay_since_start == TimeDelta::Zero()
? TimeDelta::Zero()
: activity.initial_delay_since_start - (Now() - start_time_);
if (remaining_delay < TimeDelta::Zero()) {
RTC_LOG(WARNING) << "Executing late task immediately, late by="
<< ToString(remaining_delay.Abs());
remaining_delay = TimeDelta::Zero();
}
if (activity.interval) {
if (remaining_delay == TimeDelta::Zero()) {
repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
task_queue_->Get(), [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
return;
}
repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
return;
}
if (remaining_delay == TimeDelta::Zero()) {
task_queue_->PostTask(
[activity, start_time, this]() { activity.func(Now() - start_time); });
return;
}
task_queue_->PostDelayedTask(
[activity, start_time, this]() { activity.func(Now() - start_time); },
remaining_delay.ms());
executor_->ScheduleActivity(initial_delay_since_start, interval, func);
}
void PeerConnectionE2EQualityTest::AddQualityMetricsReporter(
@ -342,20 +282,7 @@ void PeerConnectionE2EQualityTest::Run(RunParams run_params) {
RTC_LOG(INFO) << "Configuration is done. Now Alice is calling to Bob...";
// Setup call.
signaling_thread->Invoke<void>(
RTC_FROM_HERE,
rtc::Bind(&PeerConnectionE2EQualityTest::SetupCallOnSignalingThread, this,
run_params));
{
rtc::CritScope crit(&lock_);
start_time_ = Now();
while (!scheduled_activities_.empty()) {
PostTask(std::move(scheduled_activities_.front()));
scheduled_activities_.pop();
}
}
// Setup stats poller.
std::vector<StatsObserverInterface*> observers = {
audio_quality_analyzer_.get(),
video_quality_analyzer_injection_helper_.get()};
@ -364,16 +291,18 @@ void PeerConnectionE2EQualityTest::Run(RunParams run_params) {
}
StatsPoller stats_poller(observers,
{{"alice", alice_.get()}, {"bob", bob_.get()}});
executor_->ScheduleActivity(TimeDelta::Zero(), kStatsUpdateInterval,
[&stats_poller](TimeDelta) {
stats_poller.PollStatsAndNotifyObservers();
});
task_queue_->PostTask([&stats_poller, this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
stats_polling_task_ =
RepeatingTaskHandle::Start(task_queue_->Get(), [this, &stats_poller]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
stats_poller.PollStatsAndNotifyObservers();
return kStatsUpdateInterval;
});
});
// Setup call.
signaling_thread->Invoke<void>(
RTC_FROM_HERE,
rtc::Bind(&PeerConnectionE2EQualityTest::SetupCallOnSignalingThread, this,
run_params));
executor_->Start(task_queue_.get());
Timestamp start_time = Now();
rtc::Event done;
bool is_quick_test_enabled = field_trial::IsEnabled("WebRTC-QuickPerfTest");
@ -385,30 +314,13 @@ void PeerConnectionE2EQualityTest::Run(RunParams run_params) {
RTC_LOG(INFO) << "Test is done, initiating disconnect sequence.";
task_queue_->SendTask(
[&stats_poller, this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
stats_polling_task_.Stop();
// Get final end-of-call stats.
stats_poller.PollStatsAndNotifyObservers();
},
RTC_FROM_HERE);
// Stop all client started tasks to prevent their access to any call related
// objects after these objects will be destroyed during call tear down.
executor_->Stop();
// We need to detach AEC dumping from peers, because dump uses |task_queue_|
// inside.
alice_->DetachAecDump();
bob_->DetachAecDump();
// Stop all client started tasks on task queue to prevent their access to any
// call related objects after these objects will be destroyed during call tear
// down.
task_queue_->SendTask(
[this]() {
rtc::CritScope crit(&lock_);
for (auto& handle : repeating_task_handles_) {
handle.Stop();
}
},
RTC_FROM_HERE);
// Tear down the call.
signaling_thread->Invoke<void>(
RTC_FROM_HERE,
@ -418,7 +330,7 @@ void PeerConnectionE2EQualityTest::Run(RunParams run_params) {
RTC_LOG(INFO) << "All peers are disconnected.";
{
rtc::CritScope crit(&lock_);
real_test_duration_ = end_time - start_time_;
real_test_duration_ = end_time - start_time;
}
audio_quality_analyzer_->Stop();
@ -729,13 +641,5 @@ Timestamp PeerConnectionE2EQualityTest::Now() const {
return clock_->CurrentTime();
}
PeerConnectionE2EQualityTest::ScheduledActivity::ScheduledActivity(
TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func)
: initial_delay_since_start(initial_delay_since_start),
interval(interval),
func(std::move(func)) {}
} // namespace webrtc_pc_e2e
} // namespace webrtc

View File

@ -21,7 +21,6 @@
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
#include "system_wrappers/include/clock.h"
@ -33,6 +32,7 @@
#include "test/pc/e2e/peer_configurer.h"
#include "test/pc/e2e/peer_connection_quality_test_params.h"
#include "test/pc/e2e/sdp/sdp_changer.h"
#include "test/pc/e2e/test_activities_executor.h"
#include "test/pc/e2e/test_peer.h"
namespace webrtc {
@ -79,20 +79,6 @@ class PeerConnectionE2EQualityTest
}
private:
struct ScheduledActivity {
ScheduledActivity(TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func);
TimeDelta initial_delay_since_start;
absl::optional<TimeDelta> interval;
std::function<void(TimeDelta)> func;
};
void ExecuteTask(TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func);
void PostTask(ScheduledActivity activity) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
// For some functionality some field trials have to be enabled, so we will
// enable them here.
void SetupRequiredFieldTrials(const RunParams& run_params);
@ -120,6 +106,7 @@ class PeerConnectionE2EQualityTest
std::unique_ptr<SingleProcessEncodedImageDataInjector>
encoded_image_id_controller_;
std::unique_ptr<AudioQualityAnalyzerInterface> audio_quality_analyzer_;
std::unique_ptr<TestActivitiesExecutor> executor_;
std::vector<std::unique_ptr<PeerConfigurerImpl>> peer_configurations_;
@ -139,20 +126,7 @@ class PeerConnectionE2EQualityTest
AnalyzerHelper analyzer_helper_;
rtc::CriticalSection lock_;
// Time when test call was started. Minus infinity means that call wasn't
// started yet.
Timestamp start_time_ RTC_GUARDED_BY(lock_) = Timestamp::MinusInfinity();
TimeDelta real_test_duration_ RTC_GUARDED_BY(lock_) = TimeDelta::Zero();
// Queue of activities that were added before test call was started.
// Activities from this queue will be posted on the |task_queue_| after test
// call will be set up and then this queue will be unused.
std::queue<ScheduledActivity> scheduled_activities_ RTC_GUARDED_BY(lock_);
// List of task handles for activities, that are posted on |task_queue_| as
// repeated during the call.
std::vector<RepeatingTaskHandle> repeating_task_handles_
RTC_GUARDED_BY(lock_);
RepeatingTaskHandle stats_polling_task_ RTC_GUARDED_BY(&task_queue_);
// Task queue, that is used for running activities during test call.
// This task queue will be created before call set up and will be destroyed

View File

@ -0,0 +1,124 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/pc/e2e/test_activities_executor.h"
#include <memory>
#include <utility>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
namespace webrtc {
namespace webrtc_pc_e2e {
void TestActivitiesExecutor::Start(TaskQueueForTest* task_queue) {
RTC_DCHECK(task_queue);
task_queue_ = task_queue;
rtc::CritScope crit(&lock_);
start_time_ = Now();
while (!scheduled_activities_.empty()) {
PostActivity(std::move(scheduled_activities_.front()));
scheduled_activities_.pop();
}
}
void TestActivitiesExecutor::Stop() {
if (task_queue_ == nullptr) {
// Already stopped or not started.
return;
}
task_queue_->SendTask(
[this]() {
rtc::CritScope crit(&lock_);
for (auto& handle : repeating_task_handles_) {
handle.Stop();
}
},
RTC_FROM_HERE);
task_queue_ = nullptr;
}
void TestActivitiesExecutor::ScheduleActivity(
TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func) {
RTC_CHECK(initial_delay_since_start.IsFinite() &&
initial_delay_since_start >= TimeDelta::Zero());
RTC_CHECK(!interval ||
(interval->IsFinite() && *interval > TimeDelta::Zero()));
rtc::CritScope crit(&lock_);
ScheduledActivity activity(initial_delay_since_start, interval, func);
if (start_time_.IsInfinite()) {
scheduled_activities_.push(std::move(activity));
} else {
PostActivity(std::move(activity));
}
}
void TestActivitiesExecutor::PostActivity(ScheduledActivity activity) {
// Because start_time_ will never change at this point copy it to local
// variable to capture in in lambda without requirement to hold a lock.
Timestamp start_time = start_time_;
TimeDelta remaining_delay =
activity.initial_delay_since_start == TimeDelta::Zero()
? TimeDelta::Zero()
: activity.initial_delay_since_start - (Now() - start_time);
if (remaining_delay < TimeDelta::Zero()) {
RTC_LOG(WARNING) << "Executing late task immediately, late by="
<< ToString(remaining_delay.Abs());
remaining_delay = TimeDelta::Zero();
}
if (activity.interval) {
if (remaining_delay == TimeDelta::Zero()) {
repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
task_queue_->Get(), [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
return;
}
repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
return;
}
if (remaining_delay == TimeDelta::Zero()) {
task_queue_->PostTask(
[activity, start_time, this]() { activity.func(Now() - start_time); });
return;
}
task_queue_->PostDelayedTask(
[activity, start_time, this]() { activity.func(Now() - start_time); },
remaining_delay.ms());
}
Timestamp TestActivitiesExecutor::Now() const {
return clock_->CurrentTime();
}
TestActivitiesExecutor::ScheduledActivity::ScheduledActivity(
TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func)
: initial_delay_since_start(initial_delay_since_start),
interval(interval),
func(std::move(func)) {}
} // namespace webrtc_pc_e2e
} // namespace webrtc

View File

@ -0,0 +1,83 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_
#define TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_
#include <queue>
#include <vector>
#include "absl/types/optional.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
namespace webrtc_pc_e2e {
class TestActivitiesExecutor {
public:
explicit TestActivitiesExecutor(Clock* clock) : clock_(clock) {}
~TestActivitiesExecutor() { Stop(); }
// Starts scheduled activities according to their schedule. All activities
// that will be scheduled after Start(...) was invoked will be executed
// immediately according to their schedule.
void Start(TaskQueueForTest* task_queue);
void Stop();
// Schedule activity to be executed. If test isn't started yet, then activity
// will be executed according to its schedule after Start() will be invoked.
// If test is started, then it will be executed immediately according to its
// schedule.
void ScheduleActivity(TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func);
private:
struct ScheduledActivity {
ScheduledActivity(TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func);
TimeDelta initial_delay_since_start;
absl::optional<TimeDelta> interval;
std::function<void(TimeDelta)> func;
};
void PostActivity(ScheduledActivity activity)
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
Timestamp Now() const;
Clock* const clock_;
TaskQueueForTest* task_queue_;
rtc::CriticalSection lock_;
// Time when test was started. Minus infinity means that it wasn't started
// yet.
Timestamp start_time_ RTC_GUARDED_BY(lock_) = Timestamp::MinusInfinity();
// Queue of activities that were added before test was started.
// Activities from this queue will be posted on the |task_queue_| after test
// will be set up and then this queue will be unused.
std::queue<ScheduledActivity> scheduled_activities_ RTC_GUARDED_BY(lock_);
// List of task handles for activities, that are posted on |task_queue_| as
// repeated during the call.
std::vector<RepeatingTaskHandle> repeating_task_handles_
RTC_GUARDED_BY(lock_);
};
} // namespace webrtc_pc_e2e
} // namespace webrtc
#endif // TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_