From 105a10aef0a06dd7e467bf225aa48da9cb97d103 Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Mon, 1 Apr 2019 09:18:14 +0200 Subject: [PATCH] Using TimeController for Scenario test framework As part of this change, a task queue is used to handle packet processing in real time mode. This requires that we also do most call and media stream related operation on the same task queue to satisfy thread checkers. Bug: webrtc:10365 Change-Id: Icdd9d56e4ca14f2c944dc655c91e29392e3765f7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/127544 Commit-Queue: Sebastian Jansson Reviewed-by: Artem Titov Cr-Commit-Position: refs/heads/master@{#27379} --- test/scenario/BUILD.gn | 2 + test/scenario/audio_stream.cc | 48 ++++++---- test/scenario/call_client.cc | 78 +++++++++++----- test/scenario/call_client.h | 16 +++- test/scenario/scenario.cc | 140 +++++++++++------------------ test/scenario/scenario.h | 58 ++++-------- test/scenario/scenario_unittest.cc | 5 +- test/scenario/simulated_time.cc | 5 +- test/scenario/simulated_time.h | 1 + test/scenario/video_stream.cc | 114 +++++++++++++---------- 10 files changed, 243 insertions(+), 224 deletions(-) diff --git a/test/scenario/BUILD.gn b/test/scenario/BUILD.gn index ac4a73632f..2dcd3f923b 100644 --- a/test/scenario/BUILD.gn +++ b/test/scenario/BUILD.gn @@ -129,10 +129,12 @@ if (rtc_include_tests) { "../../rtc_base:safe_minmax", "../../rtc_base:sequenced_task_checker", "../../rtc_base:task_queue_for_test", + "../../rtc_base/task_utils:repeating_task", "../../system_wrappers", "../../system_wrappers:field_trial", "../../video", "../logging:log_writer", + "../time_controller", "network:emulated_network", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", diff --git a/test/scenario/audio_stream.cc b/test/scenario/audio_stream.cc index d73d2fe6c0..13921555f6 100644 --- a/test/scenario/audio_stream.cc +++ b/test/scenario/audio_stream.cc @@ -126,24 +126,29 @@ SendAudioStream::SendAudioStream( send_config.track_id, config.encoder.priority_rate->bps())); } - send_stream_ = sender_->call_->CreateAudioSendStream(send_config); - if (field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")) { - sender->call_->OnAudioTransportOverheadChanged( - sender_->transport_.packet_overhead().bytes()); - } + sender_->SendTask([&] { + send_stream_ = sender_->call_->CreateAudioSendStream(send_config); + if (field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")) { + sender->call_->OnAudioTransportOverheadChanged( + sender_->transport_->packet_overhead().bytes()); + } + }); } SendAudioStream::~SendAudioStream() { - sender_->call_->DestroyAudioSendStream(send_stream_); + sender_->SendTask( + [this] { sender_->call_->DestroyAudioSendStream(send_stream_); }); } void SendAudioStream::Start() { - send_stream_->Start(); - sender_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + sender_->SendTask([this] { + send_stream_->Start(); + sender_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + }); } void SendAudioStream::Stop() { - send_stream_->Stop(); + sender_->SendTask([this] { send_stream_->Stop(); }); } void SendAudioStream::SetMuted(bool mute) { @@ -154,8 +159,10 @@ ColumnPrinter SendAudioStream::StatsPrinter() { return ColumnPrinter::Lambda( "audio_target_rate", [this](rtc::SimpleStringBuilder& sb) { - AudioSendStream::Stats stats = send_stream_->GetStats(); - sb.AppendFormat("%.0lf", stats.target_bitrate_bps / 8.0); + sender_->SendTask([this, &sb] { + AudioSendStream::Stats stats = send_stream_->GetStats(); + sb.AppendFormat("%.0lf", stats.target_bitrate_bps / 8.0); + }); }, 64); } @@ -182,19 +189,24 @@ ReceiveAudioStream::ReceiveAudioStream( recv_config.decoder_map = { {CallTest::kAudioSendPayloadType, {"opus", 48000, 2}}}; recv_config.sync_group = config.render.sync_group; - receive_stream_ = receiver_->call_->CreateAudioReceiveStream(recv_config); + receiver_->SendTask([&] { + receive_stream_ = receiver_->call_->CreateAudioReceiveStream(recv_config); + }); } ReceiveAudioStream::~ReceiveAudioStream() { - receiver_->call_->DestroyAudioReceiveStream(receive_stream_); + receiver_->SendTask( + [&] { receiver_->call_->DestroyAudioReceiveStream(receive_stream_); }); } void ReceiveAudioStream::Start() { - receive_stream_->Start(); - receiver_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + receiver_->SendTask([&] { + receive_stream_->Start(); + receiver_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + }); } void ReceiveAudioStream::Stop() { - receive_stream_->Stop(); + receiver_->SendTask([&] { receive_stream_->Stop(); }); } AudioStreamPair::~AudioStreamPair() = default; @@ -206,12 +218,12 @@ AudioStreamPair::AudioStreamPair( rtc::scoped_refptr decoder_factory, AudioStreamConfig config) : config_(config), - send_stream_(sender, config, encoder_factory, &sender->transport_), + send_stream_(sender, config, encoder_factory, sender->transport_.get()), receive_stream_(receiver, config, &send_stream_, decoder_factory, - &receiver->transport_) {} + receiver->transport_.get()) {} } // namespace test } // namespace webrtc diff --git a/test/scenario/call_client.cc b/test/scenario/call_client.cc index 3a8ce11b0f..7df4e030de 100644 --- a/test/scenario/call_client.cc +++ b/test/scenario/call_client.cc @@ -30,12 +30,13 @@ const uint32_t kReceiverLocalAudioSsrc = 0x1234567; const char* kPriorityStreamId = "priority-track"; -CallClientFakeAudio InitAudio() { +CallClientFakeAudio InitAudio(TimeController* time_controller) { CallClientFakeAudio setup; auto capturer = TestAudioDeviceModule::CreatePulsedNoiseCapturer(256, 48000); auto renderer = TestAudioDeviceModule::CreateDiscardRenderer(48000); - setup.fake_audio_device = TestAudioDeviceModule::CreateTestAudioDeviceModule( - std::move(capturer), std::move(renderer), 1.f); + setup.fake_audio_device = TestAudioDeviceModule::Create( + time_controller->GetTaskQueueFactory(), std::move(capturer), + std::move(renderer), 1.f); setup.apm = AudioProcessingBuilder().Create(); setup.fake_audio_device->Init(); AudioState::Config audio_state_config; @@ -48,30 +49,37 @@ CallClientFakeAudio InitAudio() { return setup; } -Call* CreateCall(CallClientConfig config, - LoggingNetworkControllerFactory* network_controller_factory_, +Call* CreateCall(TimeController* time_controller, + CallClientConfig config, + LoggingNetworkControllerFactory* network_controller_factory, rtc::scoped_refptr audio_state) { - CallConfig call_config(network_controller_factory_->GetEventLog()); + CallConfig call_config(network_controller_factory->GetEventLog()); call_config.bitrate_config.max_bitrate_bps = config.transport.rates.max_rate.bps_or(-1); call_config.bitrate_config.min_bitrate_bps = config.transport.rates.min_rate.bps(); call_config.bitrate_config.start_bitrate_bps = config.transport.rates.start_rate.bps(); - call_config.network_controller_factory = network_controller_factory_; + call_config.network_controller_factory = network_controller_factory; call_config.audio_state = audio_state; - return Call::Create(call_config); + return Call::Create(call_config, time_controller->GetClock(), + time_controller->CreateProcessThread("CallModules"), + time_controller->CreateProcessThread("Pacer"), + time_controller->GetTaskQueueFactory()); } } LoggingNetworkControllerFactory::LoggingNetworkControllerFactory( + TimeController* time_controller, LogWriterFactoryInterface* log_writer_factory, - TransportControllerConfig config) { + TransportControllerConfig config) + : time_controller_(time_controller) { std::unique_ptr cc_out; if (!log_writer_factory) { event_log_ = RtcEventLog::CreateNull(); } else { - event_log_ = RtcEventLog::Create(RtcEventLog::EncodingType::Legacy); + event_log_ = RtcEventLog::Create(RtcEventLog::EncodingType::Legacy, + time_controller->GetTaskQueueFactory()); bool success = event_log_->StartLogging( log_writer_factory->Create(".rtc.dat"), RtcEventLog::kImmediateOutput); RTC_CHECK(success); @@ -118,6 +126,7 @@ LoggingNetworkControllerFactory::LoggingNetworkControllerFactory( } LoggingNetworkControllerFactory::~LoggingNetworkControllerFactory() { + time_controller_->InvokeWithControlledYield([this]() { event_log_.reset(); }); } void LoggingNetworkControllerFactory::LogCongestionControllerStats( @@ -140,21 +149,33 @@ TimeDelta LoggingNetworkControllerFactory::GetProcessInterval() const { } CallClient::CallClient( - Clock* clock, + TimeController* time_controller, std::unique_ptr log_writer_factory, CallClientConfig config) - : clock_(clock), + : time_controller_(time_controller), + clock_(time_controller->GetClock()), log_writer_factory_(std::move(log_writer_factory)), - network_controller_factory_(log_writer_factory_.get(), config.transport), - fake_audio_setup_(InitAudio()), - call_(CreateCall(config, - &network_controller_factory_, - fake_audio_setup_.audio_state)), - transport_(clock_, call_.get()), - header_parser_(RtpHeaderParser::Create()) {} + network_controller_factory_(time_controller, + log_writer_factory_.get(), + config.transport), + header_parser_(RtpHeaderParser::Create()), + task_queue_(time_controller->GetTaskQueueFactory()->CreateTaskQueue( + "CallClient", + TaskQueueFactory::Priority::NORMAL)) { + SendTask([this, config] { + fake_audio_setup_ = InitAudio(time_controller_); + call_.reset(CreateCall(time_controller_, config, + &network_controller_factory_, + fake_audio_setup_.audio_state)); + transport_ = absl::make_unique(clock_, call_.get()); + }); +} CallClient::~CallClient() { - delete header_parser_; + SendTask([&] { + call_.reset(); + fake_audio_setup_ = {}; + }); } ColumnPrinter CallClient::StatsPrinter() { @@ -185,8 +206,16 @@ void CallClient::OnPacketReceived(EmulatedIpPacket packet) { RTC_CHECK(ssrc.has_value()); media_type = ssrc_media_types_[*ssrc]; } - call_->Receiver()->DeliverPacket(media_type, packet.data, - packet.arrival_time.us()); + struct Closure { + void operator()() { + call->Receiver()->DeliverPacket(media_type, packet.data, + packet.arrival_time.us()); + } + Call* call; + MediaType media_type; + EmulatedIpPacket packet; + }; + task_queue_.PostTask(Closure{call_.get(), media_type, std::move(packet)}); } std::unique_ptr CallClient::GetLogWriter(std::string name) { @@ -232,6 +261,11 @@ void CallClient::AddExtensions(std::vector extensions) { header_parser_->RegisterRtpHeaderExtension(extension); } +void CallClient::SendTask(std::function task) { + time_controller_->InvokeWithControlledYield( + [&] { task_queue_.SendTask(std::move(task)); }); +} + CallClientPair::~CallClientPair() = default; } // namespace test diff --git a/test/scenario/call_client.h b/test/scenario/call_client.h index 4671b1de8a..14c283d874 100644 --- a/test/scenario/call_client.h +++ b/test/scenario/call_client.h @@ -22,11 +22,13 @@ #include "modules/congestion_controller/test/controller_printer.h" #include "modules/rtp_rtcp/include/rtp_header_parser.h" #include "rtc_base/constructor_magic.h" +#include "rtc_base/task_queue_for_test.h" #include "test/logging/log_writer.h" #include "test/scenario/column_printer.h" #include "test/scenario/network/network_emulation.h" #include "test/scenario/network_node.h" #include "test/scenario/scenario_config.h" +#include "test/time_controller/time_controller.h" namespace webrtc { @@ -34,7 +36,8 @@ namespace test { class LoggingNetworkControllerFactory : public NetworkControllerFactoryInterface { public: - LoggingNetworkControllerFactory(LogWriterFactoryInterface* log_writer_factory, + LoggingNetworkControllerFactory(TimeController* time_controller, + LogWriterFactoryInterface* log_writer_factory, TransportControllerConfig config); RTC_DISALLOW_COPY_AND_ASSIGN(LoggingNetworkControllerFactory); ~LoggingNetworkControllerFactory(); @@ -46,6 +49,7 @@ class LoggingNetworkControllerFactory RtcEventLog* GetEventLog() const; private: + TimeController* time_controller_; std::unique_ptr event_log_; std::unique_ptr owned_cc_factory_; NetworkControllerFactoryInterface* cc_factory_ = nullptr; @@ -62,7 +66,7 @@ struct CallClientFakeAudio { // stream session. class CallClient : public EmulatedNetworkReceiverInterface { public: - CallClient(Clock* clock, + CallClient(TimeController* time_controller, std::unique_ptr log_writer_factory, CallClientConfig config); RTC_DISALLOW_COPY_AND_ASSIGN(CallClient); @@ -94,14 +98,16 @@ class CallClient : public EmulatedNetworkReceiverInterface { uint32_t GetNextRtxSsrc(); std::string GetNextPriorityId(); void AddExtensions(std::vector extensions); + void SendTask(std::function task); + TimeController* const time_controller_; Clock* clock_; const std::unique_ptr log_writer_factory_; LoggingNetworkControllerFactory network_controller_factory_; CallClientFakeAudio fake_audio_setup_; std::unique_ptr call_; - NetworkNodeTransport transport_; - RtpHeaderParser* const header_parser_; + std::unique_ptr transport_; + std::unique_ptr const header_parser_; std::unique_ptr fec_controller_factory_; // Stores the configured overhead per known destination endpoint. This is used @@ -114,6 +120,8 @@ class CallClient : public EmulatedNetworkReceiverInterface { int next_audio_local_ssrc_index_ = 0; int next_priority_index_ = 0; std::map ssrc_media_types_; + // Defined last so it's destroyed first. + TaskQueueForTest task_queue_; }; class CallClientPair { diff --git a/test/scenario/scenario.cc b/test/scenario/scenario.cc index 47b1d912d2..09e70f0849 100644 --- a/test/scenario/scenario.cc +++ b/test/scenario/scenario.cc @@ -19,6 +19,8 @@ #include "test/logging/file_log_writer.h" #include "test/scenario/network/network_emulation.h" #include "test/testsupport/file_utils.h" +#include "test/time_controller/real_time_controller.h" +#include "test/time_controller/simulated_time_controller.h" WEBRTC_DEFINE_bool(scenario_logs, false, "Save logs from scenario framework."); WEBRTC_DEFINE_string(scenario_logs_root, @@ -28,7 +30,8 @@ WEBRTC_DEFINE_string(scenario_logs_root, namespace webrtc { namespace test { namespace { -int64_t kMicrosPerSec = 1000000; +const Timestamp kSimulatedStartTime = Timestamp::seconds(100000); + std::unique_ptr GetScenarioLogManager( std::string file_name) { if (FLAG_scenario_logs && !file_name.empty()) { @@ -42,31 +45,14 @@ std::unique_ptr GetScenarioLogManager( } return nullptr; } -} - -RepeatedActivity::RepeatedActivity(TimeDelta interval, - std::function function) - : interval_(interval), function_(function) {} - -void RepeatedActivity::Stop() { - interval_ = TimeDelta::PlusInfinity(); -} - -void RepeatedActivity::Poll(Timestamp time) { - RTC_DCHECK(last_update_.IsFinite()); - if (time >= last_update_ + interval_) { - function_(time - last_update_); - last_update_ = time; +std::unique_ptr CreateTimeController(bool real_time) { + if (real_time) { + return absl::make_unique(); + } else { + return absl::make_unique( + kSimulatedStartTime); } } - -void RepeatedActivity::SetStartTime(Timestamp time) { - last_update_ = time; -} - -Timestamp RepeatedActivity::NextTime() { - RTC_DCHECK(last_update_.IsFinite()); - return last_update_ + interval_; } Scenario::Scenario() @@ -81,22 +67,17 @@ Scenario::Scenario( std::unique_ptr log_writer_factory, bool real_time) : log_writer_factory_(std::move(log_writer_factory)), - real_time_mode_(real_time), - sim_clock_(100000 * kMicrosPerSec), - clock_(real_time ? Clock::GetRealTimeClock() : &sim_clock_), + time_controller_(CreateTimeController(real_time)), + clock_(time_controller_->GetClock()), audio_decoder_factory_(CreateBuiltinAudioDecoderFactory()), - audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()) { - if (!real_time_mode_ && log_writer_factory_) { - rtc::SetClockForTesting(&event_log_fake_clock_); - event_log_fake_clock_.SetTimeNanos(sim_clock_.TimeInMicroseconds() * 1000); - } -} + audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()), + task_queue_(time_controller_->GetTaskQueueFactory()->CreateTaskQueue( + "Scenario", + TaskQueueFactory::Priority::NORMAL)) {} Scenario::~Scenario() { if (start_time_.IsFinite()) Stop(); - if (!real_time_mode_) - rtc::SetClockForTesting(nullptr); } ColumnPrinter Scenario::TimePrinter() { @@ -123,9 +104,8 @@ StatesPrinter* Scenario::CreatePrinter(std::string name, } CallClient* Scenario::CreateClient(std::string name, CallClientConfig config) { - RTC_DCHECK(real_time_mode_); CallClient* client = - new CallClient(clock_, GetLogWriterFactory(name), config); + new CallClient(time_controller_.get(), GetLogWriterFactory(name), config); if (config.transport.state_log_interval.IsFinite()) { Every(config.transport.state_log_interval, [this, client]() { client->network_controller_factory_.LogCongestionControllerStats(Now()); @@ -178,7 +158,7 @@ void Scenario::ChangeRoute(std::pair clients, uint64_t route_id = next_route_id_++; clients.second->route_overhead_.insert({route_id, overhead}); EmulatedNetworkNode::CreateRoute(route_id, over_nodes, clients.second); - clients.first->transport_.Connect(over_nodes.front(), route_id, overhead); + clients.first->transport_->Connect(over_nodes.front(), route_id, overhead); } SimulatedTimeClient* Scenario::CreateSimulatedTimeClient( @@ -190,17 +170,20 @@ SimulatedTimeClient* Scenario::CreateSimulatedTimeClient( uint64_t send_id = next_route_id_++; uint64_t return_id = next_route_id_++; SimulatedTimeClient* client = new SimulatedTimeClient( - GetLogWriterFactory(name), config, stream_configs, send_link, return_link, - send_id, return_id, Now()); + time_controller_.get(), GetLogWriterFactory(name), config, stream_configs, + send_link, return_link, send_id, return_id, Now()); if (log_writer_factory_ && !name.empty() && config.transport.state_log_interval.IsFinite()) { Every(config.transport.state_log_interval, [this, client]() { client->network_controller_factory_.LogCongestionControllerStats(Now()); }); } - - Every(client->GetNetworkControllerProcessInterval(), - [this, client] { client->CongestionProcess(Now()); }); + if (client->GetNetworkControllerProcessInterval().IsFinite()) { + Every(client->GetNetworkControllerProcessInterval(), + [this, client] { client->CongestionProcess(Now()); }); + } else { + task_queue_.PostTask([this, client] { client->CongestionProcess(Now()); }); + } Every(TimeDelta::ms(5), [this, client] { client->PacerProcess(Now()); }); simulated_time_clients_.emplace_back(client); return client; @@ -316,32 +299,34 @@ AudioStreamPair* Scenario::CreateAudioStream( void Scenario::Every(TimeDelta interval, std::function function) { - repeated_activities_.emplace_back(new RepeatedActivity(interval, function)); - if (start_time_.IsFinite()) { - repeated_activities_.back()->SetStartTime(Now()); - } + RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval, + [interval, function] { + function(interval); + return interval; + }); } void Scenario::Every(TimeDelta interval, std::function function) { - auto function_with_argument = [function](TimeDelta) { function(); }; - repeated_activities_.emplace_back( - new RepeatedActivity(interval, function_with_argument)); - if (start_time_.IsFinite()) { - repeated_activities_.back()->SetStartTime(Now()); - } + RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval, + [interval, function] { + function(); + return interval; + }); } void Scenario::At(TimeDelta offset, std::function function) { - pending_activities_.emplace_back(new PendingActivity{offset, function}); + RTC_DCHECK_GT(offset.ms(), TimeSinceStart().ms()); + task_queue_.PostDelayedTask(function, TimeUntilTarget(offset).ms()); } void Scenario::RunFor(TimeDelta duration) { - RunUntil(TimeSinceStart() + duration); + if (start_time_.IsInfinite()) + Start(); + time_controller_->Sleep(duration); } void Scenario::RunUntil(TimeDelta target_time_since_start) { - RunUntil(target_time_since_start, TimeDelta::PlusInfinity(), - []() { return false; }); + RunFor(TimeUntilTarget(target_time_since_start)); } void Scenario::RunUntil(TimeDelta target_time_since_start, @@ -349,43 +334,16 @@ void Scenario::RunUntil(TimeDelta target_time_since_start, std::function exit_function) { if (start_time_.IsInfinite()) Start(); - - rtc::Event done_; - while (!exit_function() && TimeSinceStart() < target_time_since_start) { - Timestamp current_time = Now(); - TimeDelta duration = current_time - start_time_; - Timestamp next_time = current_time + check_interval; - for (auto& activity : repeated_activities_) { - activity->Poll(current_time); - next_time = std::min(next_time, activity->NextTime()); - } - for (auto activity = pending_activities_.begin(); - activity < pending_activities_.end(); activity++) { - if (duration > (*activity)->after_duration) { - (*activity)->function(); - pending_activities_.erase(activity); - } - } - TimeDelta wait_time = next_time - current_time; - if (real_time_mode_) { - done_.Wait(wait_time.ms()); - } else { - sim_clock_.AdvanceTimeMicroseconds(wait_time.us()); - // The fake clock is quite slow to update, we only update it if logging is - // turned on to save time. - if (log_writer_factory_) - event_log_fake_clock_.SetTimeNanos(sim_clock_.TimeInMicroseconds() * - 1000); - } + while (check_interval >= TimeUntilTarget(target_time_since_start)) { + time_controller_->Sleep(check_interval); + if (exit_function()) + return; } + time_controller_->Sleep(TimeUntilTarget(target_time_since_start)); } void Scenario::Start() { start_time_ = Timestamp::us(clock_->TimeInMicroseconds()); - for (auto& activity : repeated_activities_) { - activity->SetStartTime(start_time_); - } - for (auto& stream_pair : video_streams_) stream_pair->receive()->Start(); for (auto& stream_pair : audio_streams_) @@ -426,5 +384,9 @@ TimeDelta Scenario::TimeSinceStart() { return Now() - start_time_; } +TimeDelta Scenario::TimeUntilTarget(TimeDelta target_time_offset) { + return target_time_offset - TimeSinceStart(); +} + } // namespace test } // namespace webrtc diff --git a/test/scenario/scenario.h b/test/scenario/scenario.h index 6de1d78f7d..cf949df517 100644 --- a/test/scenario/scenario.h +++ b/test/scenario/scenario.h @@ -17,6 +17,8 @@ #include "absl/memory/memory.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/fake_clock.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" #include "test/logging/log_writer.h" #include "test/scenario/audio_stream.h" #include "test/scenario/call_client.h" @@ -25,33 +27,10 @@ #include "test/scenario/scenario_config.h" #include "test/scenario/simulated_time.h" #include "test/scenario/video_stream.h" +#include "test/time_controller/time_controller.h" namespace webrtc { namespace test { -// RepeatedActivity is created by the Scenario class and can be used to stop a -// running activity at runtime. -class RepeatedActivity { - public: - void Stop(); - - private: - friend class Scenario; - RepeatedActivity(TimeDelta interval, std::function function); - - void Poll(Timestamp time); - void SetStartTime(Timestamp time); - Timestamp NextTime(); - - TimeDelta interval_; - std::function function_; - Timestamp last_update_ = Timestamp::MinusInfinity(); -}; - -struct PendingActivity { - TimeDelta after_duration; - std::function function; -}; - // Scenario is a class owning everything for a test scenario. It creates and // holds network nodes, call clients and media streams. It also provides methods // for changing behavior at runtime. Since it always keeps ownership of the @@ -128,11 +107,14 @@ class Scenario { std::vector over_nodes, CrossTrafficConfig config); - // Runs the provided function with a fixed interval. + // Runs the provided function with a fixed interval. For real time tests, + // |function| starts being called after |interval| from the call to Every(). void Every(TimeDelta interval, std::function function); void Every(TimeDelta interval, std::function function); - // Runs the provided function after given duration has passed in a session. + // Runs the provided function after given duration has passed. For real time + // tests, |function| is called after |target_time_since_start| from the call + // to Every(). void At(TimeDelta offset, std::function function); // Sends a packet over the nodes and runs |action| when it has been delivered. @@ -140,13 +122,13 @@ class Scenario { size_t packet_size, std::function action); - // Runs the scenario for the given time or until the exit function returns - // true. + // Runs the scenario for the given time. void RunFor(TimeDelta duration); + // Runs the scenario until |target_time_since_start|. void RunUntil(TimeDelta target_time_since_start); - // Will check |exit_function| every |check_interval|. It stops after a check - // if either |target_time_since_start| has passed or if |exit_function| - // returns true. + // Runs the scenario until |target_time_since_start| or |exit_function| + // returns true. |exit_function| is polled after each |check_interval| has + // passed. void RunUntil(TimeDelta target_time_since_start, TimeDelta check_interval, std::function exit_function); @@ -182,14 +164,12 @@ class Scenario { } private: + TimeDelta TimeUntilTarget(TimeDelta target_time_offset); + NullReceiver null_receiver_; - std::unique_ptr log_writer_factory_; - const bool real_time_mode_; - SimulatedClock sim_clock_; + const std::unique_ptr log_writer_factory_; + std::unique_ptr time_controller_; Clock* clock_; - // Event logs use a global clock instance, this is used to override that - // instance when not running in real time. - rtc::FakeClock event_log_fake_clock_; std::vector> clients_; std::vector> client_pairs_; @@ -200,9 +180,7 @@ class Scenario { std::vector> simulated_time_clients_; - std::vector> repeated_activities_; std::vector> action_receivers_; - std::vector> pending_activities_; std::vector> printers_; int64_t next_route_id_ = 40000; @@ -210,6 +188,8 @@ class Scenario { rtc::scoped_refptr audio_encoder_factory_; Timestamp start_time_ = Timestamp::PlusInfinity(); + // Defined last so it's destroyed first. + rtc::TaskQueue task_queue_; }; } // namespace test } // namespace webrtc diff --git a/test/scenario/scenario_unittest.cc b/test/scenario/scenario_unittest.cc index 97285d3914..4a4b7f638d 100644 --- a/test/scenario/scenario_unittest.cc +++ b/test/scenario/scenario_unittest.cc @@ -7,12 +7,15 @@ * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ +#include #include "test/scenario/scenario.h" #include "test/gtest.h" namespace webrtc { namespace test { TEST(ScenarioTest, StartsAndStopsWithoutErrors) { + std::atomic packet_received(false); + std::atomic bitrate_changed(false); Scenario s; CallClientConfig call_client_config; call_client_config.transport.rates.start_rate = DataRate::kbps(300); @@ -38,10 +41,8 @@ TEST(ScenarioTest, StartsAndStopsWithoutErrors) { CrossTrafficConfig cross_traffic_config; s.CreateCrossTraffic({alice_net}, cross_traffic_config); - bool packet_received = false; s.NetworkDelayedAction({alice_net, bob_net}, 100, [&packet_received] { packet_received = true; }); - bool bitrate_changed = false; s.Every(TimeDelta::ms(10), [alice, bob, &bitrate_changed] { if (alice->GetStats().send_bandwidth_bps != 300000 && bob->GetStats().send_bandwidth_bps != 300000) diff --git a/test/scenario/simulated_time.cc b/test/scenario/simulated_time.cc index 5824c5fe21..efc0b72ab4 100644 --- a/test/scenario/simulated_time.cc +++ b/test/scenario/simulated_time.cc @@ -245,6 +245,7 @@ void SimulatedFeedback::OnPacketReceived(EmulatedIpPacket packet) { } SimulatedTimeClient::SimulatedTimeClient( + TimeController* time_controller, std::unique_ptr log_writer_factory, SimulatedTimeClientConfig config, std::vector stream_configs, @@ -254,7 +255,9 @@ SimulatedTimeClient::SimulatedTimeClient( uint64_t return_receiver_id, Timestamp at_time) : log_writer_factory_(std::move(log_writer_factory)), - network_controller_factory_(log_writer_factory_.get(), config.transport), + network_controller_factory_(time_controller, + log_writer_factory_.get(), + config.transport), send_link_(send_link), return_link_(return_link), sender_(send_link.front(), send_receiver_id), diff --git a/test/scenario/simulated_time.h b/test/scenario/simulated_time.h index 762748dc66..9214a8dd59 100644 --- a/test/scenario/simulated_time.h +++ b/test/scenario/simulated_time.h @@ -121,6 +121,7 @@ class SimulatedSender { class SimulatedTimeClient : EmulatedNetworkReceiverInterface { public: SimulatedTimeClient( + TimeController* time_controller, std::unique_ptr log_writer_factory, SimulatedTimeClientConfig config, std::vector stream_configs, diff --git a/test/scenario/video_stream.cc b/test/scenario/video_stream.cc index 02ee327818..f85972c853 100644 --- a/test/scenario/video_stream.cc +++ b/test/scenario/video_stream.cc @@ -339,7 +339,8 @@ SendVideoStream::SendVideoStream(CallClient* sender, : sender_(sender), config_(config) { video_capturer_ = absl::make_unique( sender_->clock_, CreateFrameGenerator(sender_->clock_, config.source), - config.source.framerate); + config.source.framerate, + *sender->time_controller_->GetTaskQueueFactory()); video_capturer_->Init(); using Encoder = VideoStreamConfig::Encoder; @@ -384,59 +385,66 @@ SendVideoStream::SendVideoStream(CallClient* sender, send_config.encoder_settings.bitrate_allocator_factory = bitrate_allocator_factory_.get(); - send_stream_ = sender_->call_->CreateVideoSendStream( - std::move(send_config), std::move(encoder_config)); - std::vector > - frame_info_handlers; - if (config.analyzer.frame_quality_handler) - frame_info_handlers.push_back(config.analyzer.frame_quality_handler); + sender_->SendTask([&] { + send_stream_ = sender_->call_->CreateVideoSendStream( + std::move(send_config), std::move(encoder_config)); + std::vector > + frame_info_handlers; + if (config.analyzer.frame_quality_handler) + frame_info_handlers.push_back(config.analyzer.frame_quality_handler); - if (analyzer->Active()) { - frame_tap_.reset(new ForwardingCapturedFrameTap(sender_->clock_, analyzer, - video_capturer_.get())); - send_stream_->SetSource(frame_tap_.get(), - config.encoder.degradation_preference); - } else { - send_stream_->SetSource(video_capturer_.get(), - config.encoder.degradation_preference); - } + if (analyzer->Active()) { + frame_tap_.reset(new ForwardingCapturedFrameTap(sender_->clock_, analyzer, + video_capturer_.get())); + send_stream_->SetSource(frame_tap_.get(), + config.encoder.degradation_preference); + } else { + send_stream_->SetSource(video_capturer_.get(), + config.encoder.degradation_preference); + } + }); } SendVideoStream::~SendVideoStream() { - sender_->call_->DestroyVideoSendStream(send_stream_); + sender_->SendTask( + [this] { sender_->call_->DestroyVideoSendStream(send_stream_); }); } void SendVideoStream::Start() { - send_stream_->Start(); - sender_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + sender_->SendTask([this] { + send_stream_->Start(); + sender_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + }); } void SendVideoStream::Stop() { - send_stream_->Stop(); + sender_->SendTask([this] { send_stream_->Stop(); }); } void SendVideoStream::UpdateConfig( std::function modifier) { - rtc::CritScope cs(&crit_); - VideoStreamConfig prior_config = config_; - modifier(&config_); - if (prior_config.encoder.fake.max_rate != config_.encoder.fake.max_rate) { - for (auto* encoder : fake_encoders_) { - encoder->SetMaxBitrate(config_.encoder.fake.max_rate.kbps()); + sender_->SendTask([&] { + rtc::CritScope cs(&crit_); + VideoStreamConfig prior_config = config_; + modifier(&config_); + if (prior_config.encoder.fake.max_rate != config_.encoder.fake.max_rate) { + for (auto* encoder : fake_encoders_) { + encoder->SetMaxBitrate(config_.encoder.fake.max_rate.kbps()); + } } - } - // TODO(srte): Add more conditions that should cause reconfiguration. - if (prior_config.encoder.max_framerate != config_.encoder.max_framerate) { - VideoEncoderConfig encoder_config = CreateVideoEncoderConfig(config_); - send_stream_->ReconfigureVideoEncoder(std::move(encoder_config)); - } - if (prior_config.source.framerate != config_.source.framerate) { - SetCaptureFramerate(config_.source.framerate); - } + // TODO(srte): Add more conditions that should cause reconfiguration. + if (prior_config.encoder.max_framerate != config_.encoder.max_framerate) { + VideoEncoderConfig encoder_config = CreateVideoEncoderConfig(config_); + send_stream_->ReconfigureVideoEncoder(std::move(encoder_config)); + } + if (prior_config.source.framerate != config_.source.framerate) { + SetCaptureFramerate(config_.source.framerate); + } + }); } void SendVideoStream::SetCaptureFramerate(int framerate) { - video_capturer_->ChangeFramerate(framerate); + sender_->SendTask([&] { video_capturer_->ChangeFramerate(framerate); }); } VideoSendStream::Stats SendVideoStream::GetStats() const { @@ -508,27 +516,35 @@ ReceiveVideoStream::ReceiveVideoStream(CallClient* receiver, MediaType::VIDEO; if (config.stream.use_rtx) receiver_->ssrc_media_types_[recv_config.rtp.rtx_ssrc] = MediaType::VIDEO; - receive_streams_.push_back( - receiver_->call_->CreateVideoReceiveStream(std::move(recv_config))); + receiver_->SendTask([this, &recv_config] { + receive_streams_.push_back( + receiver_->call_->CreateVideoReceiveStream(std::move(recv_config))); + }); } } ReceiveVideoStream::~ReceiveVideoStream() { - for (auto* recv_stream : receive_streams_) - receiver_->call_->DestroyVideoReceiveStream(recv_stream); - if (flecfec_stream_) - receiver_->call_->DestroyFlexfecReceiveStream(flecfec_stream_); + receiver_->SendTask([this] { + for (auto* recv_stream : receive_streams_) + receiver_->call_->DestroyVideoReceiveStream(recv_stream); + if (flecfec_stream_) + receiver_->call_->DestroyFlexfecReceiveStream(flecfec_stream_); + }); } void ReceiveVideoStream::Start() { - for (auto* recv_stream : receive_streams_) - recv_stream->Start(); - receiver_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + receiver_->SendTask([this] { + for (auto* recv_stream : receive_streams_) + recv_stream->Start(); + receiver_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + }); } void ReceiveVideoStream::Stop() { - for (auto* recv_stream : receive_streams_) - recv_stream->Stop(); + receiver_->SendTask([this] { + for (auto* recv_stream : receive_streams_) + recv_stream->Stop(); + }); } VideoStreamPair::~VideoStreamPair() = default; @@ -541,12 +557,12 @@ VideoStreamPair::VideoStreamPair( : config_(config), analyzer_(std::move(quality_writer), config.analyzer.frame_quality_handler), - send_stream_(sender, config, &sender->transport_, &analyzer_), + send_stream_(sender, config, sender->transport_.get(), &analyzer_), receive_stream_(receiver, config, &send_stream_, /*chosen_stream=*/0, - &receiver->transport_, + receiver->transport_.get(), &analyzer_) {} } // namespace test