Add API to schedule environment changing actions during test in PC E2E framework

Bug: webrtc:10138
Change-Id: Ieebeec823829eb9dcaba4c31e7e9e998814982e3
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/126463
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Peter Slatala <psla@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27153}
This commit is contained in:
Artem Titov 2019-03-15 15:57:53 +01:00 committed by Commit Bot
parent 6cac21d554
commit ba82e0020d
5 changed files with 175 additions and 12 deletions

View File

@ -210,6 +210,7 @@ if (rtc_include_tests) {
"../../../api:libjingle_peerconnection_api",
"../../../api:scoped_refptr",
"../../../api/units:time_delta",
"../../../api/units:timestamp",
"../../../logging:rtc_event_log_api",
"../../../logging:rtc_event_log_impl_output",
"../../../pc:pc_test_utils",

View File

@ -208,6 +208,20 @@ class PeerConnectionE2EQualityTestFixture {
TimeDelta run_duration;
};
// Add activity that will be executed on the best effort at least after
// |target_time_since_start| after call will be set up (after offer/answer
// exchange, ICE gathering will be done and ICE candidates will passed to
// remote side). |func| param is amount of time spent from the call set up.
virtual void ExecuteAt(TimeDelta target_time_since_start,
std::function<void(TimeDelta)> func) = 0;
// Add activity that will be executed every |interval| with first execution
// on the best effort at least after |initial_delay_since_start| after call
// will be set up (after all participants will be connected). |func| param is
// amount of time spent from the call set up.
virtual void ExecuteEvery(TimeDelta initial_delay_since_start,
TimeDelta interval,
std::function<void(TimeDelta)> func) = 0;
virtual void Run(std::unique_ptr<InjectableComponents> alice_components,
std::unique_ptr<Params> alice_params,
std::unique_ptr<InjectableComponents> bob_components,

View File

@ -64,9 +64,12 @@ TEST(PeerConnectionE2EQualityTestSmokeTest, RunWithEmulatedNetwork) {
std::unique_ptr<NetworkEmulationManager> network_emulation_manager =
CreateNetworkEmulationManager();
auto alice_network_behavior =
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig());
SimulatedNetwork* alice_network_behavior_ptr = alice_network_behavior.get();
EmulatedNetworkNode* alice_node =
network_emulation_manager->CreateEmulatedNode(
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
std::move(alice_network_behavior));
EmulatedNetworkNode* bob_node = network_emulation_manager->CreateEmulatedNode(
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
EmulatedEndpoint* alice_endpoint =
@ -108,6 +111,12 @@ TEST(PeerConnectionE2EQualityTestSmokeTest, RunWithEmulatedNetwork) {
auto fixture = CreatePeerConnectionE2EQualityTestFixture(
"smoke_test", std::move(audio_quality_analyzer),
std::move(video_quality_analyzer));
fixture->ExecuteAt(TimeDelta::seconds(2),
[alice_network_behavior_ptr](TimeDelta) {
BuiltInNetworkBehaviorConfig config;
config.loss_percent = 5;
alice_network_behavior_ptr->SetConfig(config);
});
fixture->Run(std::move(alice_components), std::move(alice_params),
std::move(bob_components), std::move(bob_params),
RunParams{TimeDelta::seconds(5)});

View File

@ -102,8 +102,7 @@ PeerConnectionE2EQualityTest::PeerConnectionE2EQualityTest(
std::unique_ptr<AudioQualityAnalyzerInterface> audio_quality_analyzer,
std::unique_ptr<VideoQualityAnalyzerInterface> video_quality_analyzer)
: clock_(Clock::GetRealTimeClock()),
test_case_name_(std::move(test_case_name)),
task_queue_("pc_e2e_quality_test") {
test_case_name_(std::move(test_case_name)) {
// Create default video quality analyzer. We will always create an analyzer,
// even if there are no video streams, because it will be installed into video
// encoder/decoder factories.
@ -123,6 +122,79 @@ PeerConnectionE2EQualityTest::PeerConnectionE2EQualityTest(
audio_quality_analyzer_.swap(audio_quality_analyzer);
}
void PeerConnectionE2EQualityTest::ExecuteAt(
TimeDelta target_time_since_start,
std::function<void(TimeDelta)> func) {
ExecuteTask(target_time_since_start, absl::nullopt, func);
}
void PeerConnectionE2EQualityTest::ExecuteEvery(
TimeDelta initial_delay_since_start,
TimeDelta interval,
std::function<void(TimeDelta)> func) {
ExecuteTask(initial_delay_since_start, interval, func);
}
void PeerConnectionE2EQualityTest::ExecuteTask(
TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func) {
RTC_CHECK(initial_delay_since_start.IsFinite() &&
initial_delay_since_start >= TimeDelta::Zero());
RTC_CHECK(!interval ||
(interval->IsFinite() && *interval > TimeDelta::Zero()));
rtc::CritScope crit(&lock_);
ScheduledActivity activity(initial_delay_since_start, interval, func);
if (start_time_.IsInfinite()) {
scheduled_activities_.push(std::move(activity));
} else {
PostTask(std::move(activity));
}
}
void PeerConnectionE2EQualityTest::PostTask(ScheduledActivity activity) {
// Because start_time_ will never change at this point copy it to local
// variable to capture in in lambda without requirement to hold a lock.
Timestamp start_time = start_time_;
TimeDelta remaining_delay =
activity.initial_delay_since_start == TimeDelta::Zero()
? TimeDelta::Zero()
: activity.initial_delay_since_start - (Now() - start_time_);
if (remaining_delay < TimeDelta::Zero()) {
RTC_LOG(WARNING) << "Executing late task immediately, late by="
<< ToString(remaining_delay.Abs());
remaining_delay = TimeDelta::Zero();
}
if (activity.interval) {
if (remaining_delay == TimeDelta::Zero()) {
repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
task_queue_->Get(), [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
return;
}
repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
return;
}
if (remaining_delay == TimeDelta::Zero()) {
task_queue_->PostTask(
[activity, start_time, this]() { activity.func(Now() - start_time); });
return;
}
task_queue_->PostDelayedTask(
[activity, start_time, this]() { activity.func(Now() - start_time); },
remaining_delay.ms());
}
void PeerConnectionE2EQualityTest::Run(
std::unique_ptr<InjectableComponents> alice_components,
std::unique_ptr<Params> alice_params,
@ -216,20 +288,31 @@ void PeerConnectionE2EQualityTest::Run(
webrtc::RtcEventLog::kImmediateOutput);
}
// Create a |task_queue_|.
task_queue_ = absl::make_unique<rtc::TaskQueue>("pc_e2e_quality_test");
// Setup call.
signaling_thread->Invoke<void>(
RTC_FROM_HERE,
rtc::Bind(&PeerConnectionE2EQualityTest::SetupCallOnSignalingThread,
this));
{
rtc::CritScope crit(&lock_);
start_time_ = Now();
while (!scheduled_activities_.empty()) {
PostTask(std::move(scheduled_activities_.front()));
scheduled_activities_.pop();
}
}
StatsPoller stats_poller({audio_quality_analyzer_.get(),
video_quality_analyzer_injection_helper_.get()},
{{"alice", alice_.get()}, {"bob", bob_.get()}});
task_queue_.PostTask([&stats_poller, this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
task_queue_->PostTask([&stats_poller, this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
stats_polling_task_ =
RepeatingTaskHandle::Start(task_queue_.Get(), [this, &stats_poller]() {
RTC_DCHECK_RUN_ON(&task_queue_);
RepeatingTaskHandle::Start(task_queue_->Get(), [this, &stats_poller]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
stats_poller.PollStatsAndNotifyObservers();
return kStatsUpdateInterval;
});
@ -239,8 +322,8 @@ void PeerConnectionE2EQualityTest::Run(
done.Wait(run_params.run_duration.ms());
rtc::Event stats_polling_stopped;
task_queue_.PostTask([&stats_polling_stopped, this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
task_queue_->PostTask([&stats_polling_stopped, this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
stats_polling_task_.Stop();
stats_polling_stopped.Set();
});
@ -248,6 +331,11 @@ void PeerConnectionE2EQualityTest::Run(
RTC_CHECK(no_timeout) << "Failed to stop Stats polling after "
<< kStatsPollingStopTimeout.seconds() << " seconds.";
// Destroy |task_queue_|. It is done to stop all running tasks and prevent
// their access to any call related objects after these objects will be
// destroyed during call tear down.
task_queue_.reset();
// Tear down the call.
signaling_thread->Invoke<void>(
RTC_FROM_HERE,
rtc::Bind(&PeerConnectionE2EQualityTest::TearDownCallOnSignalingThread,
@ -559,5 +647,17 @@ VideoFrameWriter* PeerConnectionE2EQualityTest::MaybeCreateVideoWriter(
return out;
}
Timestamp PeerConnectionE2EQualityTest::Now() const {
return Timestamp::us(clock_->TimeInMicroseconds());
}
PeerConnectionE2EQualityTest::ScheduledActivity::ScheduledActivity(
TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func)
: initial_delay_since_start(initial_delay_since_start),
interval(std::move(interval)),
func(std::move(func)) {}
} // namespace test
} // namespace webrtc

View File

@ -11,9 +11,12 @@
#define TEST_PC_E2E_PEER_CONNECTION_QUALITY_TEST_H_
#include <memory>
#include <queue>
#include <string>
#include <vector>
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "pc/test/frame_generator_capturer_video_track_source.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/repeating_task.h"
@ -54,7 +57,27 @@ class PeerConnectionE2EQualityTest
std::unique_ptr<Params> bob_params,
RunParams run_params) override;
void ExecuteAt(TimeDelta target_time_since_start,
std::function<void(TimeDelta)> func) override;
void ExecuteEvery(TimeDelta initial_delay_since_start,
TimeDelta interval,
std::function<void(TimeDelta)> func) override;
private:
struct ScheduledActivity {
ScheduledActivity(TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func);
TimeDelta initial_delay_since_start;
absl::optional<TimeDelta> interval;
std::function<void(TimeDelta)> func;
};
void ExecuteTask(TimeDelta initial_delay_since_start,
absl::optional<TimeDelta> interval,
std::function<void(TimeDelta)> func);
void PostTask(ScheduledActivity activity) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Set missing params to default values if it is required:
// * Generate video stream labels if some of them missed
// * Generate audio stream labels if some of them missed
@ -83,6 +106,7 @@ class PeerConnectionE2EQualityTest
VideoFrameWriter* MaybeCreateVideoWriter(
absl::optional<std::string> file_name,
const VideoConfig& config);
Timestamp Now() const;
Clock* const clock_;
std::string test_case_name_;
@ -103,10 +127,25 @@ class PeerConnectionE2EQualityTest
std::vector<std::unique_ptr<rtc::VideoSinkInterface<VideoFrame>>>
output_video_sinks_;
rtc::CriticalSection lock_;
// Time when test call was started. Minus infinity means that call wasn't
// started yet.
Timestamp start_time_ RTC_GUARDED_BY(lock_) = Timestamp::MinusInfinity();
// Queue of activities that were added before test call was started.
// Activities from this queue will be posted on the |task_queue_| after test
// call will be set up and then this queue will be unused.
std::queue<ScheduledActivity> scheduled_activities_ RTC_GUARDED_BY(lock_);
// List of task handles for activities, that are posted on |task_queue_| as
// repeated during the call.
std::vector<RepeatingTaskHandle> repeating_task_handles_
RTC_GUARDED_BY(lock_);
RepeatingTaskHandle stats_polling_task_ RTC_GUARDED_BY(&task_queue_);
// Must be the last field, so it will be deleted first, because tasks
// in the TaskQueue can access other fields of the instance of this class.
rtc::TaskQueue task_queue_;
// Task queue, that is used for running activities during test call.
// This task queue will be created before call set up and will be destroyed
// immediately before call tear down.
std::unique_ptr<rtc::TaskQueue> task_queue_;
};
} // namespace test