diff --git a/test/scenario/BUILD.gn b/test/scenario/BUILD.gn index ac4a73632f..2dcd3f923b 100644 --- a/test/scenario/BUILD.gn +++ b/test/scenario/BUILD.gn @@ -129,10 +129,12 @@ if (rtc_include_tests) { "../../rtc_base:safe_minmax", "../../rtc_base:sequenced_task_checker", "../../rtc_base:task_queue_for_test", + "../../rtc_base/task_utils:repeating_task", "../../system_wrappers", "../../system_wrappers:field_trial", "../../video", "../logging:log_writer", + "../time_controller", "network:emulated_network", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", diff --git a/test/scenario/audio_stream.cc b/test/scenario/audio_stream.cc index d73d2fe6c0..13921555f6 100644 --- a/test/scenario/audio_stream.cc +++ b/test/scenario/audio_stream.cc @@ -126,24 +126,29 @@ SendAudioStream::SendAudioStream( send_config.track_id, config.encoder.priority_rate->bps())); } - send_stream_ = sender_->call_->CreateAudioSendStream(send_config); - if (field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")) { - sender->call_->OnAudioTransportOverheadChanged( - sender_->transport_.packet_overhead().bytes()); - } + sender_->SendTask([&] { + send_stream_ = sender_->call_->CreateAudioSendStream(send_config); + if (field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")) { + sender->call_->OnAudioTransportOverheadChanged( + sender_->transport_->packet_overhead().bytes()); + } + }); } SendAudioStream::~SendAudioStream() { - sender_->call_->DestroyAudioSendStream(send_stream_); + sender_->SendTask( + [this] { sender_->call_->DestroyAudioSendStream(send_stream_); }); } void SendAudioStream::Start() { - send_stream_->Start(); - sender_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + sender_->SendTask([this] { + send_stream_->Start(); + sender_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + }); } void SendAudioStream::Stop() { - send_stream_->Stop(); + sender_->SendTask([this] { send_stream_->Stop(); }); } void SendAudioStream::SetMuted(bool mute) { @@ -154,8 +159,10 @@ ColumnPrinter SendAudioStream::StatsPrinter() { return ColumnPrinter::Lambda( "audio_target_rate", [this](rtc::SimpleStringBuilder& sb) { - AudioSendStream::Stats stats = send_stream_->GetStats(); - sb.AppendFormat("%.0lf", stats.target_bitrate_bps / 8.0); + sender_->SendTask([this, &sb] { + AudioSendStream::Stats stats = send_stream_->GetStats(); + sb.AppendFormat("%.0lf", stats.target_bitrate_bps / 8.0); + }); }, 64); } @@ -182,19 +189,24 @@ ReceiveAudioStream::ReceiveAudioStream( recv_config.decoder_map = { {CallTest::kAudioSendPayloadType, {"opus", 48000, 2}}}; recv_config.sync_group = config.render.sync_group; - receive_stream_ = receiver_->call_->CreateAudioReceiveStream(recv_config); + receiver_->SendTask([&] { + receive_stream_ = receiver_->call_->CreateAudioReceiveStream(recv_config); + }); } ReceiveAudioStream::~ReceiveAudioStream() { - receiver_->call_->DestroyAudioReceiveStream(receive_stream_); + receiver_->SendTask( + [&] { receiver_->call_->DestroyAudioReceiveStream(receive_stream_); }); } void ReceiveAudioStream::Start() { - receive_stream_->Start(); - receiver_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + receiver_->SendTask([&] { + receive_stream_->Start(); + receiver_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + }); } void ReceiveAudioStream::Stop() { - receive_stream_->Stop(); + receiver_->SendTask([&] { receive_stream_->Stop(); }); } AudioStreamPair::~AudioStreamPair() = default; @@ -206,12 +218,12 @@ AudioStreamPair::AudioStreamPair( rtc::scoped_refptr decoder_factory, AudioStreamConfig config) : config_(config), - send_stream_(sender, config, encoder_factory, &sender->transport_), + send_stream_(sender, config, encoder_factory, sender->transport_.get()), receive_stream_(receiver, config, &send_stream_, decoder_factory, - &receiver->transport_) {} + receiver->transport_.get()) {} } // namespace test } // namespace webrtc diff --git a/test/scenario/call_client.cc b/test/scenario/call_client.cc index 3a8ce11b0f..7df4e030de 100644 --- a/test/scenario/call_client.cc +++ b/test/scenario/call_client.cc @@ -30,12 +30,13 @@ const uint32_t kReceiverLocalAudioSsrc = 0x1234567; const char* kPriorityStreamId = "priority-track"; -CallClientFakeAudio InitAudio() { +CallClientFakeAudio InitAudio(TimeController* time_controller) { CallClientFakeAudio setup; auto capturer = TestAudioDeviceModule::CreatePulsedNoiseCapturer(256, 48000); auto renderer = TestAudioDeviceModule::CreateDiscardRenderer(48000); - setup.fake_audio_device = TestAudioDeviceModule::CreateTestAudioDeviceModule( - std::move(capturer), std::move(renderer), 1.f); + setup.fake_audio_device = TestAudioDeviceModule::Create( + time_controller->GetTaskQueueFactory(), std::move(capturer), + std::move(renderer), 1.f); setup.apm = AudioProcessingBuilder().Create(); setup.fake_audio_device->Init(); AudioState::Config audio_state_config; @@ -48,30 +49,37 @@ CallClientFakeAudio InitAudio() { return setup; } -Call* CreateCall(CallClientConfig config, - LoggingNetworkControllerFactory* network_controller_factory_, +Call* CreateCall(TimeController* time_controller, + CallClientConfig config, + LoggingNetworkControllerFactory* network_controller_factory, rtc::scoped_refptr audio_state) { - CallConfig call_config(network_controller_factory_->GetEventLog()); + CallConfig call_config(network_controller_factory->GetEventLog()); call_config.bitrate_config.max_bitrate_bps = config.transport.rates.max_rate.bps_or(-1); call_config.bitrate_config.min_bitrate_bps = config.transport.rates.min_rate.bps(); call_config.bitrate_config.start_bitrate_bps = config.transport.rates.start_rate.bps(); - call_config.network_controller_factory = network_controller_factory_; + call_config.network_controller_factory = network_controller_factory; call_config.audio_state = audio_state; - return Call::Create(call_config); + return Call::Create(call_config, time_controller->GetClock(), + time_controller->CreateProcessThread("CallModules"), + time_controller->CreateProcessThread("Pacer"), + time_controller->GetTaskQueueFactory()); } } LoggingNetworkControllerFactory::LoggingNetworkControllerFactory( + TimeController* time_controller, LogWriterFactoryInterface* log_writer_factory, - TransportControllerConfig config) { + TransportControllerConfig config) + : time_controller_(time_controller) { std::unique_ptr cc_out; if (!log_writer_factory) { event_log_ = RtcEventLog::CreateNull(); } else { - event_log_ = RtcEventLog::Create(RtcEventLog::EncodingType::Legacy); + event_log_ = RtcEventLog::Create(RtcEventLog::EncodingType::Legacy, + time_controller->GetTaskQueueFactory()); bool success = event_log_->StartLogging( log_writer_factory->Create(".rtc.dat"), RtcEventLog::kImmediateOutput); RTC_CHECK(success); @@ -118,6 +126,7 @@ LoggingNetworkControllerFactory::LoggingNetworkControllerFactory( } LoggingNetworkControllerFactory::~LoggingNetworkControllerFactory() { + time_controller_->InvokeWithControlledYield([this]() { event_log_.reset(); }); } void LoggingNetworkControllerFactory::LogCongestionControllerStats( @@ -140,21 +149,33 @@ TimeDelta LoggingNetworkControllerFactory::GetProcessInterval() const { } CallClient::CallClient( - Clock* clock, + TimeController* time_controller, std::unique_ptr log_writer_factory, CallClientConfig config) - : clock_(clock), + : time_controller_(time_controller), + clock_(time_controller->GetClock()), log_writer_factory_(std::move(log_writer_factory)), - network_controller_factory_(log_writer_factory_.get(), config.transport), - fake_audio_setup_(InitAudio()), - call_(CreateCall(config, - &network_controller_factory_, - fake_audio_setup_.audio_state)), - transport_(clock_, call_.get()), - header_parser_(RtpHeaderParser::Create()) {} + network_controller_factory_(time_controller, + log_writer_factory_.get(), + config.transport), + header_parser_(RtpHeaderParser::Create()), + task_queue_(time_controller->GetTaskQueueFactory()->CreateTaskQueue( + "CallClient", + TaskQueueFactory::Priority::NORMAL)) { + SendTask([this, config] { + fake_audio_setup_ = InitAudio(time_controller_); + call_.reset(CreateCall(time_controller_, config, + &network_controller_factory_, + fake_audio_setup_.audio_state)); + transport_ = absl::make_unique(clock_, call_.get()); + }); +} CallClient::~CallClient() { - delete header_parser_; + SendTask([&] { + call_.reset(); + fake_audio_setup_ = {}; + }); } ColumnPrinter CallClient::StatsPrinter() { @@ -185,8 +206,16 @@ void CallClient::OnPacketReceived(EmulatedIpPacket packet) { RTC_CHECK(ssrc.has_value()); media_type = ssrc_media_types_[*ssrc]; } - call_->Receiver()->DeliverPacket(media_type, packet.data, - packet.arrival_time.us()); + struct Closure { + void operator()() { + call->Receiver()->DeliverPacket(media_type, packet.data, + packet.arrival_time.us()); + } + Call* call; + MediaType media_type; + EmulatedIpPacket packet; + }; + task_queue_.PostTask(Closure{call_.get(), media_type, std::move(packet)}); } std::unique_ptr CallClient::GetLogWriter(std::string name) { @@ -232,6 +261,11 @@ void CallClient::AddExtensions(std::vector extensions) { header_parser_->RegisterRtpHeaderExtension(extension); } +void CallClient::SendTask(std::function task) { + time_controller_->InvokeWithControlledYield( + [&] { task_queue_.SendTask(std::move(task)); }); +} + CallClientPair::~CallClientPair() = default; } // namespace test diff --git a/test/scenario/call_client.h b/test/scenario/call_client.h index 4671b1de8a..14c283d874 100644 --- a/test/scenario/call_client.h +++ b/test/scenario/call_client.h @@ -22,11 +22,13 @@ #include "modules/congestion_controller/test/controller_printer.h" #include "modules/rtp_rtcp/include/rtp_header_parser.h" #include "rtc_base/constructor_magic.h" +#include "rtc_base/task_queue_for_test.h" #include "test/logging/log_writer.h" #include "test/scenario/column_printer.h" #include "test/scenario/network/network_emulation.h" #include "test/scenario/network_node.h" #include "test/scenario/scenario_config.h" +#include "test/time_controller/time_controller.h" namespace webrtc { @@ -34,7 +36,8 @@ namespace test { class LoggingNetworkControllerFactory : public NetworkControllerFactoryInterface { public: - LoggingNetworkControllerFactory(LogWriterFactoryInterface* log_writer_factory, + LoggingNetworkControllerFactory(TimeController* time_controller, + LogWriterFactoryInterface* log_writer_factory, TransportControllerConfig config); RTC_DISALLOW_COPY_AND_ASSIGN(LoggingNetworkControllerFactory); ~LoggingNetworkControllerFactory(); @@ -46,6 +49,7 @@ class LoggingNetworkControllerFactory RtcEventLog* GetEventLog() const; private: + TimeController* time_controller_; std::unique_ptr event_log_; std::unique_ptr owned_cc_factory_; NetworkControllerFactoryInterface* cc_factory_ = nullptr; @@ -62,7 +66,7 @@ struct CallClientFakeAudio { // stream session. class CallClient : public EmulatedNetworkReceiverInterface { public: - CallClient(Clock* clock, + CallClient(TimeController* time_controller, std::unique_ptr log_writer_factory, CallClientConfig config); RTC_DISALLOW_COPY_AND_ASSIGN(CallClient); @@ -94,14 +98,16 @@ class CallClient : public EmulatedNetworkReceiverInterface { uint32_t GetNextRtxSsrc(); std::string GetNextPriorityId(); void AddExtensions(std::vector extensions); + void SendTask(std::function task); + TimeController* const time_controller_; Clock* clock_; const std::unique_ptr log_writer_factory_; LoggingNetworkControllerFactory network_controller_factory_; CallClientFakeAudio fake_audio_setup_; std::unique_ptr call_; - NetworkNodeTransport transport_; - RtpHeaderParser* const header_parser_; + std::unique_ptr transport_; + std::unique_ptr const header_parser_; std::unique_ptr fec_controller_factory_; // Stores the configured overhead per known destination endpoint. This is used @@ -114,6 +120,8 @@ class CallClient : public EmulatedNetworkReceiverInterface { int next_audio_local_ssrc_index_ = 0; int next_priority_index_ = 0; std::map ssrc_media_types_; + // Defined last so it's destroyed first. + TaskQueueForTest task_queue_; }; class CallClientPair { diff --git a/test/scenario/scenario.cc b/test/scenario/scenario.cc index 47b1d912d2..09e70f0849 100644 --- a/test/scenario/scenario.cc +++ b/test/scenario/scenario.cc @@ -19,6 +19,8 @@ #include "test/logging/file_log_writer.h" #include "test/scenario/network/network_emulation.h" #include "test/testsupport/file_utils.h" +#include "test/time_controller/real_time_controller.h" +#include "test/time_controller/simulated_time_controller.h" WEBRTC_DEFINE_bool(scenario_logs, false, "Save logs from scenario framework."); WEBRTC_DEFINE_string(scenario_logs_root, @@ -28,7 +30,8 @@ WEBRTC_DEFINE_string(scenario_logs_root, namespace webrtc { namespace test { namespace { -int64_t kMicrosPerSec = 1000000; +const Timestamp kSimulatedStartTime = Timestamp::seconds(100000); + std::unique_ptr GetScenarioLogManager( std::string file_name) { if (FLAG_scenario_logs && !file_name.empty()) { @@ -42,31 +45,14 @@ std::unique_ptr GetScenarioLogManager( } return nullptr; } -} - -RepeatedActivity::RepeatedActivity(TimeDelta interval, - std::function function) - : interval_(interval), function_(function) {} - -void RepeatedActivity::Stop() { - interval_ = TimeDelta::PlusInfinity(); -} - -void RepeatedActivity::Poll(Timestamp time) { - RTC_DCHECK(last_update_.IsFinite()); - if (time >= last_update_ + interval_) { - function_(time - last_update_); - last_update_ = time; +std::unique_ptr CreateTimeController(bool real_time) { + if (real_time) { + return absl::make_unique(); + } else { + return absl::make_unique( + kSimulatedStartTime); } } - -void RepeatedActivity::SetStartTime(Timestamp time) { - last_update_ = time; -} - -Timestamp RepeatedActivity::NextTime() { - RTC_DCHECK(last_update_.IsFinite()); - return last_update_ + interval_; } Scenario::Scenario() @@ -81,22 +67,17 @@ Scenario::Scenario( std::unique_ptr log_writer_factory, bool real_time) : log_writer_factory_(std::move(log_writer_factory)), - real_time_mode_(real_time), - sim_clock_(100000 * kMicrosPerSec), - clock_(real_time ? Clock::GetRealTimeClock() : &sim_clock_), + time_controller_(CreateTimeController(real_time)), + clock_(time_controller_->GetClock()), audio_decoder_factory_(CreateBuiltinAudioDecoderFactory()), - audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()) { - if (!real_time_mode_ && log_writer_factory_) { - rtc::SetClockForTesting(&event_log_fake_clock_); - event_log_fake_clock_.SetTimeNanos(sim_clock_.TimeInMicroseconds() * 1000); - } -} + audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()), + task_queue_(time_controller_->GetTaskQueueFactory()->CreateTaskQueue( + "Scenario", + TaskQueueFactory::Priority::NORMAL)) {} Scenario::~Scenario() { if (start_time_.IsFinite()) Stop(); - if (!real_time_mode_) - rtc::SetClockForTesting(nullptr); } ColumnPrinter Scenario::TimePrinter() { @@ -123,9 +104,8 @@ StatesPrinter* Scenario::CreatePrinter(std::string name, } CallClient* Scenario::CreateClient(std::string name, CallClientConfig config) { - RTC_DCHECK(real_time_mode_); CallClient* client = - new CallClient(clock_, GetLogWriterFactory(name), config); + new CallClient(time_controller_.get(), GetLogWriterFactory(name), config); if (config.transport.state_log_interval.IsFinite()) { Every(config.transport.state_log_interval, [this, client]() { client->network_controller_factory_.LogCongestionControllerStats(Now()); @@ -178,7 +158,7 @@ void Scenario::ChangeRoute(std::pair clients, uint64_t route_id = next_route_id_++; clients.second->route_overhead_.insert({route_id, overhead}); EmulatedNetworkNode::CreateRoute(route_id, over_nodes, clients.second); - clients.first->transport_.Connect(over_nodes.front(), route_id, overhead); + clients.first->transport_->Connect(over_nodes.front(), route_id, overhead); } SimulatedTimeClient* Scenario::CreateSimulatedTimeClient( @@ -190,17 +170,20 @@ SimulatedTimeClient* Scenario::CreateSimulatedTimeClient( uint64_t send_id = next_route_id_++; uint64_t return_id = next_route_id_++; SimulatedTimeClient* client = new SimulatedTimeClient( - GetLogWriterFactory(name), config, stream_configs, send_link, return_link, - send_id, return_id, Now()); + time_controller_.get(), GetLogWriterFactory(name), config, stream_configs, + send_link, return_link, send_id, return_id, Now()); if (log_writer_factory_ && !name.empty() && config.transport.state_log_interval.IsFinite()) { Every(config.transport.state_log_interval, [this, client]() { client->network_controller_factory_.LogCongestionControllerStats(Now()); }); } - - Every(client->GetNetworkControllerProcessInterval(), - [this, client] { client->CongestionProcess(Now()); }); + if (client->GetNetworkControllerProcessInterval().IsFinite()) { + Every(client->GetNetworkControllerProcessInterval(), + [this, client] { client->CongestionProcess(Now()); }); + } else { + task_queue_.PostTask([this, client] { client->CongestionProcess(Now()); }); + } Every(TimeDelta::ms(5), [this, client] { client->PacerProcess(Now()); }); simulated_time_clients_.emplace_back(client); return client; @@ -316,32 +299,34 @@ AudioStreamPair* Scenario::CreateAudioStream( void Scenario::Every(TimeDelta interval, std::function function) { - repeated_activities_.emplace_back(new RepeatedActivity(interval, function)); - if (start_time_.IsFinite()) { - repeated_activities_.back()->SetStartTime(Now()); - } + RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval, + [interval, function] { + function(interval); + return interval; + }); } void Scenario::Every(TimeDelta interval, std::function function) { - auto function_with_argument = [function](TimeDelta) { function(); }; - repeated_activities_.emplace_back( - new RepeatedActivity(interval, function_with_argument)); - if (start_time_.IsFinite()) { - repeated_activities_.back()->SetStartTime(Now()); - } + RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval, + [interval, function] { + function(); + return interval; + }); } void Scenario::At(TimeDelta offset, std::function function) { - pending_activities_.emplace_back(new PendingActivity{offset, function}); + RTC_DCHECK_GT(offset.ms(), TimeSinceStart().ms()); + task_queue_.PostDelayedTask(function, TimeUntilTarget(offset).ms()); } void Scenario::RunFor(TimeDelta duration) { - RunUntil(TimeSinceStart() + duration); + if (start_time_.IsInfinite()) + Start(); + time_controller_->Sleep(duration); } void Scenario::RunUntil(TimeDelta target_time_since_start) { - RunUntil(target_time_since_start, TimeDelta::PlusInfinity(), - []() { return false; }); + RunFor(TimeUntilTarget(target_time_since_start)); } void Scenario::RunUntil(TimeDelta target_time_since_start, @@ -349,43 +334,16 @@ void Scenario::RunUntil(TimeDelta target_time_since_start, std::function exit_function) { if (start_time_.IsInfinite()) Start(); - - rtc::Event done_; - while (!exit_function() && TimeSinceStart() < target_time_since_start) { - Timestamp current_time = Now(); - TimeDelta duration = current_time - start_time_; - Timestamp next_time = current_time + check_interval; - for (auto& activity : repeated_activities_) { - activity->Poll(current_time); - next_time = std::min(next_time, activity->NextTime()); - } - for (auto activity = pending_activities_.begin(); - activity < pending_activities_.end(); activity++) { - if (duration > (*activity)->after_duration) { - (*activity)->function(); - pending_activities_.erase(activity); - } - } - TimeDelta wait_time = next_time - current_time; - if (real_time_mode_) { - done_.Wait(wait_time.ms()); - } else { - sim_clock_.AdvanceTimeMicroseconds(wait_time.us()); - // The fake clock is quite slow to update, we only update it if logging is - // turned on to save time. - if (log_writer_factory_) - event_log_fake_clock_.SetTimeNanos(sim_clock_.TimeInMicroseconds() * - 1000); - } + while (check_interval >= TimeUntilTarget(target_time_since_start)) { + time_controller_->Sleep(check_interval); + if (exit_function()) + return; } + time_controller_->Sleep(TimeUntilTarget(target_time_since_start)); } void Scenario::Start() { start_time_ = Timestamp::us(clock_->TimeInMicroseconds()); - for (auto& activity : repeated_activities_) { - activity->SetStartTime(start_time_); - } - for (auto& stream_pair : video_streams_) stream_pair->receive()->Start(); for (auto& stream_pair : audio_streams_) @@ -426,5 +384,9 @@ TimeDelta Scenario::TimeSinceStart() { return Now() - start_time_; } +TimeDelta Scenario::TimeUntilTarget(TimeDelta target_time_offset) { + return target_time_offset - TimeSinceStart(); +} + } // namespace test } // namespace webrtc diff --git a/test/scenario/scenario.h b/test/scenario/scenario.h index 6de1d78f7d..cf949df517 100644 --- a/test/scenario/scenario.h +++ b/test/scenario/scenario.h @@ -17,6 +17,8 @@ #include "absl/memory/memory.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/fake_clock.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" #include "test/logging/log_writer.h" #include "test/scenario/audio_stream.h" #include "test/scenario/call_client.h" @@ -25,33 +27,10 @@ #include "test/scenario/scenario_config.h" #include "test/scenario/simulated_time.h" #include "test/scenario/video_stream.h" +#include "test/time_controller/time_controller.h" namespace webrtc { namespace test { -// RepeatedActivity is created by the Scenario class and can be used to stop a -// running activity at runtime. -class RepeatedActivity { - public: - void Stop(); - - private: - friend class Scenario; - RepeatedActivity(TimeDelta interval, std::function function); - - void Poll(Timestamp time); - void SetStartTime(Timestamp time); - Timestamp NextTime(); - - TimeDelta interval_; - std::function function_; - Timestamp last_update_ = Timestamp::MinusInfinity(); -}; - -struct PendingActivity { - TimeDelta after_duration; - std::function function; -}; - // Scenario is a class owning everything for a test scenario. It creates and // holds network nodes, call clients and media streams. It also provides methods // for changing behavior at runtime. Since it always keeps ownership of the @@ -128,11 +107,14 @@ class Scenario { std::vector over_nodes, CrossTrafficConfig config); - // Runs the provided function with a fixed interval. + // Runs the provided function with a fixed interval. For real time tests, + // |function| starts being called after |interval| from the call to Every(). void Every(TimeDelta interval, std::function function); void Every(TimeDelta interval, std::function function); - // Runs the provided function after given duration has passed in a session. + // Runs the provided function after given duration has passed. For real time + // tests, |function| is called after |target_time_since_start| from the call + // to Every(). void At(TimeDelta offset, std::function function); // Sends a packet over the nodes and runs |action| when it has been delivered. @@ -140,13 +122,13 @@ class Scenario { size_t packet_size, std::function action); - // Runs the scenario for the given time or until the exit function returns - // true. + // Runs the scenario for the given time. void RunFor(TimeDelta duration); + // Runs the scenario until |target_time_since_start|. void RunUntil(TimeDelta target_time_since_start); - // Will check |exit_function| every |check_interval|. It stops after a check - // if either |target_time_since_start| has passed or if |exit_function| - // returns true. + // Runs the scenario until |target_time_since_start| or |exit_function| + // returns true. |exit_function| is polled after each |check_interval| has + // passed. void RunUntil(TimeDelta target_time_since_start, TimeDelta check_interval, std::function exit_function); @@ -182,14 +164,12 @@ class Scenario { } private: + TimeDelta TimeUntilTarget(TimeDelta target_time_offset); + NullReceiver null_receiver_; - std::unique_ptr log_writer_factory_; - const bool real_time_mode_; - SimulatedClock sim_clock_; + const std::unique_ptr log_writer_factory_; + std::unique_ptr time_controller_; Clock* clock_; - // Event logs use a global clock instance, this is used to override that - // instance when not running in real time. - rtc::FakeClock event_log_fake_clock_; std::vector> clients_; std::vector> client_pairs_; @@ -200,9 +180,7 @@ class Scenario { std::vector> simulated_time_clients_; - std::vector> repeated_activities_; std::vector> action_receivers_; - std::vector> pending_activities_; std::vector> printers_; int64_t next_route_id_ = 40000; @@ -210,6 +188,8 @@ class Scenario { rtc::scoped_refptr audio_encoder_factory_; Timestamp start_time_ = Timestamp::PlusInfinity(); + // Defined last so it's destroyed first. + rtc::TaskQueue task_queue_; }; } // namespace test } // namespace webrtc diff --git a/test/scenario/scenario_unittest.cc b/test/scenario/scenario_unittest.cc index 97285d3914..4a4b7f638d 100644 --- a/test/scenario/scenario_unittest.cc +++ b/test/scenario/scenario_unittest.cc @@ -7,12 +7,15 @@ * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ +#include #include "test/scenario/scenario.h" #include "test/gtest.h" namespace webrtc { namespace test { TEST(ScenarioTest, StartsAndStopsWithoutErrors) { + std::atomic packet_received(false); + std::atomic bitrate_changed(false); Scenario s; CallClientConfig call_client_config; call_client_config.transport.rates.start_rate = DataRate::kbps(300); @@ -38,10 +41,8 @@ TEST(ScenarioTest, StartsAndStopsWithoutErrors) { CrossTrafficConfig cross_traffic_config; s.CreateCrossTraffic({alice_net}, cross_traffic_config); - bool packet_received = false; s.NetworkDelayedAction({alice_net, bob_net}, 100, [&packet_received] { packet_received = true; }); - bool bitrate_changed = false; s.Every(TimeDelta::ms(10), [alice, bob, &bitrate_changed] { if (alice->GetStats().send_bandwidth_bps != 300000 && bob->GetStats().send_bandwidth_bps != 300000) diff --git a/test/scenario/simulated_time.cc b/test/scenario/simulated_time.cc index 5824c5fe21..efc0b72ab4 100644 --- a/test/scenario/simulated_time.cc +++ b/test/scenario/simulated_time.cc @@ -245,6 +245,7 @@ void SimulatedFeedback::OnPacketReceived(EmulatedIpPacket packet) { } SimulatedTimeClient::SimulatedTimeClient( + TimeController* time_controller, std::unique_ptr log_writer_factory, SimulatedTimeClientConfig config, std::vector stream_configs, @@ -254,7 +255,9 @@ SimulatedTimeClient::SimulatedTimeClient( uint64_t return_receiver_id, Timestamp at_time) : log_writer_factory_(std::move(log_writer_factory)), - network_controller_factory_(log_writer_factory_.get(), config.transport), + network_controller_factory_(time_controller, + log_writer_factory_.get(), + config.transport), send_link_(send_link), return_link_(return_link), sender_(send_link.front(), send_receiver_id), diff --git a/test/scenario/simulated_time.h b/test/scenario/simulated_time.h index 762748dc66..9214a8dd59 100644 --- a/test/scenario/simulated_time.h +++ b/test/scenario/simulated_time.h @@ -121,6 +121,7 @@ class SimulatedSender { class SimulatedTimeClient : EmulatedNetworkReceiverInterface { public: SimulatedTimeClient( + TimeController* time_controller, std::unique_ptr log_writer_factory, SimulatedTimeClientConfig config, std::vector stream_configs, diff --git a/test/scenario/video_stream.cc b/test/scenario/video_stream.cc index 02ee327818..f85972c853 100644 --- a/test/scenario/video_stream.cc +++ b/test/scenario/video_stream.cc @@ -339,7 +339,8 @@ SendVideoStream::SendVideoStream(CallClient* sender, : sender_(sender), config_(config) { video_capturer_ = absl::make_unique( sender_->clock_, CreateFrameGenerator(sender_->clock_, config.source), - config.source.framerate); + config.source.framerate, + *sender->time_controller_->GetTaskQueueFactory()); video_capturer_->Init(); using Encoder = VideoStreamConfig::Encoder; @@ -384,59 +385,66 @@ SendVideoStream::SendVideoStream(CallClient* sender, send_config.encoder_settings.bitrate_allocator_factory = bitrate_allocator_factory_.get(); - send_stream_ = sender_->call_->CreateVideoSendStream( - std::move(send_config), std::move(encoder_config)); - std::vector > - frame_info_handlers; - if (config.analyzer.frame_quality_handler) - frame_info_handlers.push_back(config.analyzer.frame_quality_handler); + sender_->SendTask([&] { + send_stream_ = sender_->call_->CreateVideoSendStream( + std::move(send_config), std::move(encoder_config)); + std::vector > + frame_info_handlers; + if (config.analyzer.frame_quality_handler) + frame_info_handlers.push_back(config.analyzer.frame_quality_handler); - if (analyzer->Active()) { - frame_tap_.reset(new ForwardingCapturedFrameTap(sender_->clock_, analyzer, - video_capturer_.get())); - send_stream_->SetSource(frame_tap_.get(), - config.encoder.degradation_preference); - } else { - send_stream_->SetSource(video_capturer_.get(), - config.encoder.degradation_preference); - } + if (analyzer->Active()) { + frame_tap_.reset(new ForwardingCapturedFrameTap(sender_->clock_, analyzer, + video_capturer_.get())); + send_stream_->SetSource(frame_tap_.get(), + config.encoder.degradation_preference); + } else { + send_stream_->SetSource(video_capturer_.get(), + config.encoder.degradation_preference); + } + }); } SendVideoStream::~SendVideoStream() { - sender_->call_->DestroyVideoSendStream(send_stream_); + sender_->SendTask( + [this] { sender_->call_->DestroyVideoSendStream(send_stream_); }); } void SendVideoStream::Start() { - send_stream_->Start(); - sender_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + sender_->SendTask([this] { + send_stream_->Start(); + sender_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + }); } void SendVideoStream::Stop() { - send_stream_->Stop(); + sender_->SendTask([this] { send_stream_->Stop(); }); } void SendVideoStream::UpdateConfig( std::function modifier) { - rtc::CritScope cs(&crit_); - VideoStreamConfig prior_config = config_; - modifier(&config_); - if (prior_config.encoder.fake.max_rate != config_.encoder.fake.max_rate) { - for (auto* encoder : fake_encoders_) { - encoder->SetMaxBitrate(config_.encoder.fake.max_rate.kbps()); + sender_->SendTask([&] { + rtc::CritScope cs(&crit_); + VideoStreamConfig prior_config = config_; + modifier(&config_); + if (prior_config.encoder.fake.max_rate != config_.encoder.fake.max_rate) { + for (auto* encoder : fake_encoders_) { + encoder->SetMaxBitrate(config_.encoder.fake.max_rate.kbps()); + } } - } - // TODO(srte): Add more conditions that should cause reconfiguration. - if (prior_config.encoder.max_framerate != config_.encoder.max_framerate) { - VideoEncoderConfig encoder_config = CreateVideoEncoderConfig(config_); - send_stream_->ReconfigureVideoEncoder(std::move(encoder_config)); - } - if (prior_config.source.framerate != config_.source.framerate) { - SetCaptureFramerate(config_.source.framerate); - } + // TODO(srte): Add more conditions that should cause reconfiguration. + if (prior_config.encoder.max_framerate != config_.encoder.max_framerate) { + VideoEncoderConfig encoder_config = CreateVideoEncoderConfig(config_); + send_stream_->ReconfigureVideoEncoder(std::move(encoder_config)); + } + if (prior_config.source.framerate != config_.source.framerate) { + SetCaptureFramerate(config_.source.framerate); + } + }); } void SendVideoStream::SetCaptureFramerate(int framerate) { - video_capturer_->ChangeFramerate(framerate); + sender_->SendTask([&] { video_capturer_->ChangeFramerate(framerate); }); } VideoSendStream::Stats SendVideoStream::GetStats() const { @@ -508,27 +516,35 @@ ReceiveVideoStream::ReceiveVideoStream(CallClient* receiver, MediaType::VIDEO; if (config.stream.use_rtx) receiver_->ssrc_media_types_[recv_config.rtp.rtx_ssrc] = MediaType::VIDEO; - receive_streams_.push_back( - receiver_->call_->CreateVideoReceiveStream(std::move(recv_config))); + receiver_->SendTask([this, &recv_config] { + receive_streams_.push_back( + receiver_->call_->CreateVideoReceiveStream(std::move(recv_config))); + }); } } ReceiveVideoStream::~ReceiveVideoStream() { - for (auto* recv_stream : receive_streams_) - receiver_->call_->DestroyVideoReceiveStream(recv_stream); - if (flecfec_stream_) - receiver_->call_->DestroyFlexfecReceiveStream(flecfec_stream_); + receiver_->SendTask([this] { + for (auto* recv_stream : receive_streams_) + receiver_->call_->DestroyVideoReceiveStream(recv_stream); + if (flecfec_stream_) + receiver_->call_->DestroyFlexfecReceiveStream(flecfec_stream_); + }); } void ReceiveVideoStream::Start() { - for (auto* recv_stream : receive_streams_) - recv_stream->Start(); - receiver_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + receiver_->SendTask([this] { + for (auto* recv_stream : receive_streams_) + recv_stream->Start(); + receiver_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + }); } void ReceiveVideoStream::Stop() { - for (auto* recv_stream : receive_streams_) - recv_stream->Stop(); + receiver_->SendTask([this] { + for (auto* recv_stream : receive_streams_) + recv_stream->Stop(); + }); } VideoStreamPair::~VideoStreamPair() = default; @@ -541,12 +557,12 @@ VideoStreamPair::VideoStreamPair( : config_(config), analyzer_(std::move(quality_writer), config.analyzer.frame_quality_handler), - send_stream_(sender, config, &sender->transport_, &analyzer_), + send_stream_(sender, config, sender->transport_.get(), &analyzer_), receive_stream_(receiver, config, &send_stream_, /*chosen_stream=*/0, - &receiver->transport_, + receiver->transport_.get(), &analyzer_) {} } // namespace test