diff --git a/BUILD.gn b/BUILD.gn index b3e771071f..4e30a71e7b 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -547,6 +547,7 @@ if (rtc_include_tests) { "rtc_base:weak_ptr_unittests", "rtc_base/experiments:experiments_unittests", "rtc_base/synchronization:sequence_checker_unittests", + "rtc_base/task_utils:pending_task_safety_flag_unittests", "rtc_base/task_utils:to_queued_task_unittests", "sdk:sdk_tests", "test:rtp_test_utils", diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc index 2d23087cc8..123be7da4c 100644 --- a/call/call_perf_tests.cc +++ b/call/call_perf_tests.cc @@ -96,21 +96,24 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver, static const int kMinRunTimeMs = 30000; public: - explicit VideoRtcpAndSyncObserver(Clock* clock, const std::string& test_label) + explicit VideoRtcpAndSyncObserver(TaskQueueBase* task_queue, + Clock* clock, + const std::string& test_label) : test::RtpRtcpObserver(CallPerfTest::kLongTimeoutMs), clock_(clock), test_label_(test_label), creation_time_ms_(clock_->TimeInMilliseconds()), - first_time_in_sync_(-1), - receive_stream_(nullptr) {} + task_queue_(task_queue) {} void OnFrame(const VideoFrame& video_frame) override { - VideoReceiveStream::Stats stats; - { - rtc::CritScope lock(&crit_); - if (receive_stream_) - stats = receive_stream_->GetStats(); - } + task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); })); + } + + void CheckStats() { + if (!receive_stream_) + return; + + VideoReceiveStream::Stats stats = receive_stream_->GetStats(); if (stats.sync_offset_ms == std::numeric_limits::max()) return; @@ -135,7 +138,8 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver, } void set_receive_stream(VideoReceiveStream* receive_stream) { - rtc::CritScope lock(&crit_); + RTC_DCHECK_EQ(task_queue_, TaskQueueBase::Current()); + // Note that receive_stream may be nullptr. receive_stream_ = receive_stream; } @@ -148,10 +152,10 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver, Clock* const clock_; std::string test_label_; const int64_t creation_time_ms_; - int64_t first_time_in_sync_; - rtc::CriticalSection crit_; - VideoReceiveStream* receive_stream_ RTC_GUARDED_BY(crit_); + int64_t first_time_in_sync_ = -1; + VideoReceiveStream* receive_stream_ = nullptr; std::vector sync_offset_ms_list_; + TaskQueueBase* const task_queue_; }; void CallPerfTest::TestAudioVideoSync(FecMode fec, @@ -168,7 +172,8 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, audio_net_config.queue_delay_ms = 500; audio_net_config.loss_percent = 5; - VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock(), test_label); + auto observer = std::make_unique( + task_queue(), Clock::GetRealTimeClock(), test_label); std::map audio_pt_map; std::map video_pt_map; @@ -218,7 +223,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, }); audio_send_transport = std::make_unique( - task_queue(), sender_call_.get(), &observer, + task_queue(), sender_call_.get(), observer.get(), test::PacketTransport::kSender, audio_pt_map, std::make_unique( Clock::GetRealTimeClock(), @@ -226,7 +231,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, audio_send_transport->SetReceiver(receiver_call_->Receiver()); video_send_transport = std::make_unique( - task_queue(), sender_call_.get(), &observer, + task_queue(), sender_call_.get(), observer.get(), test::PacketTransport::kSender, video_pt_map, std::make_unique(Clock::GetRealTimeClock(), std::make_unique( @@ -234,7 +239,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, video_send_transport->SetReceiver(receiver_call_->Receiver()); receive_transport = std::make_unique( - task_queue(), receiver_call_.get(), &observer, + task_queue(), receiver_call_.get(), observer.get(), test::PacketTransport::kReceiver, payload_type_map_, std::make_unique(Clock::GetRealTimeClock(), std::make_unique( @@ -259,7 +264,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, video_receive_configs_[0].rtp.ulpfec_payload_type = kUlpfecPayloadType; } video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000; - video_receive_configs_[0].renderer = &observer; + video_receive_configs_[0].renderer = observer.get(); video_receive_configs_[0].sync_group = kSyncGroup; AudioReceiveStream::Config audio_recv_config; @@ -281,7 +286,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, receiver_call_->CreateAudioReceiveStream(audio_recv_config); } EXPECT_EQ(1u, video_receive_streams_.size()); - observer.set_receive_stream(video_receive_streams_[0]); + observer->set_receive_stream(video_receive_streams_[0]); drifting_clock = std::make_unique(clock_, video_ntp_speed); CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed, kDefaultFramerate, kDefaultWidth, @@ -293,10 +298,13 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, audio_receive_stream->Start(); }); - EXPECT_TRUE(observer.Wait()) + EXPECT_TRUE(observer->Wait()) << "Timed out while waiting for audio and video to be synchronized."; SendTask(RTC_FROM_HERE, task_queue(), [&]() { + // Clear the pointer to the receive stream since it will now be deleted. + observer->set_receive_stream(nullptr); + audio_send_stream->Stop(); audio_receive_stream->Stop(); @@ -314,7 +322,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, DestroyCalls(); }); - observer.PrintResults(); + observer->PrintResults(); // In quick test synchronization may not be achieved in time. if (!field_trial::IsEnabled("WebRTC-QuickPerfTest")) { @@ -323,6 +331,9 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.AVSyncOffsetInMs")); #endif } + + task_queue()->PostTask( + ToQueuedTask([to_delete = observer.release()]() { delete to_delete; })); } TEST_F(CallPerfTest, PlaysOutAudioAndVideoInSyncWithoutClockDrift) { diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 951cd4e410..d7d70342e1 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -526,9 +526,9 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { test::NetworkSimulationConfig net_conf; net_conf.bandwidth = DataRate::KilobitsPerSec(300); auto send_node = s.CreateSimulationNode(net_conf); + auto* callee = s.CreateClient("return", call_conf); auto* route = s.CreateRoutes(s.CreateClient("send", call_conf), {send_node}, - s.CreateClient("return", call_conf), - {s.CreateSimulationNode(net_conf)}); + callee, {s.CreateSimulationNode(net_conf)}); test::VideoStreamConfig lossy_config; lossy_config.source.framerate = 5; @@ -556,14 +556,20 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { // from initial probing. s.RunFor(TimeDelta::Seconds(1)); rtx_packets = 0; - int decoded_baseline = lossy->receive()->GetStats().frames_decoded; + int decoded_baseline = 0; + callee->SendTask([&decoded_baseline, &lossy]() { + decoded_baseline = lossy->receive()->GetStats().frames_decoded; + }); s.RunFor(TimeDelta::Seconds(1)); // We expect both that RTX packets were sent and that an appropriate number of // frames were received. This is somewhat redundant but reduces the risk of // false positives in future regressions (e.g. RTX is send due to probing). EXPECT_GE(rtx_packets, 1); - int frames_decoded = - lossy->receive()->GetStats().frames_decoded - decoded_baseline; + int frames_decoded = 0; + callee->SendTask([&decoded_baseline, &frames_decoded, &lossy]() { + frames_decoded = + lossy->receive()->GetStats().frames_decoded - decoded_baseline; + }); EXPECT_EQ(frames_decoded, 5); } diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc index 1083214fa5..361da92ff2 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc +++ b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc @@ -537,8 +537,8 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) { auto ret_net = {s.CreateSimulationNode(net_conf)}; auto* client = s.CreateClient("send", CallClientConfig()); - auto* route = s.CreateRoutes( - client, send_net, s.CreateClient("return", CallClientConfig()), ret_net); + auto* callee = s.CreateClient("return", CallClientConfig()); + auto* route = s.CreateRoutes(client, send_net, callee, ret_net); // TODO(srte): Make this work with RTX enabled or remove it. auto* video = s.CreateVideoStream(route->forward(), [](VideoStreamConfig* c) { c->stream.use_rtx = false; @@ -553,9 +553,17 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) { s.net()->StopCrossTraffic(tcp_traffic); s.RunFor(TimeDelta::Seconds(20)); } - return DataSize::Bytes(video->receive() - ->GetStats() - .rtp_stats.packet_counter.TotalBytes()) / + + // Querying the video stats from within the expected runtime environment + // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that + // we're currently on). + VideoReceiveStream::Stats video_receive_stats; + auto* video_stream = video->receive(); + callee->SendTask([&video_stream, &video_receive_stats]() { + video_receive_stats = video_stream->GetStats(); + }); + return DataSize::Bytes( + video_receive_stats.rtp_stats.packet_counter.TotalBytes()) / s.TimeSinceStart(); } diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index 2e7d53ceb2..8409aa29e5 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -26,12 +26,39 @@ rtc_library("repeating_task") { ] } +rtc_library("pending_task_safety_flag") { + sources = [ + "pending_task_safety_flag.cc", + "pending_task_safety_flag.h", + ] + deps = [ + "..:checks", + "..:refcount", + "..:thread_checker", + "../../api:scoped_refptr", + "../synchronization:sequence_checker", + ] +} + rtc_source_set("to_queued_task") { sources = [ "to_queued_task.h" ] deps = [ "../../api/task_queue" ] } if (rtc_include_tests) { + rtc_library("pending_task_safety_flag_unittests") { + testonly = true + sources = [ "pending_task_safety_flag_unittest.cc" ] + deps = [ + ":pending_task_safety_flag", + ":to_queued_task", + "..:rtc_base_approved", + "..:rtc_task_queue", + "..:task_queue_for_test", + "../../test:test_support", + ] + } + rtc_library("repeating_task_unittests") { testonly = true sources = [ "repeating_task_unittest.cc" ] diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc new file mode 100644 index 0000000000..307d2d594c --- /dev/null +++ b/rtc_base/task_utils/pending_task_safety_flag.cc @@ -0,0 +1,32 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_utils/pending_task_safety_flag.h" + +#include "rtc_base/ref_counted_object.h" + +namespace webrtc { + +// static +PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() { + return new rtc::RefCountedObject(); +} + +void PendingTaskSafetyFlag::SetNotAlive() { + RTC_DCHECK_RUN_ON(&main_sequence_); + alive_ = false; +} + +bool PendingTaskSafetyFlag::alive() const { + RTC_DCHECK_RUN_ON(&main_sequence_); + return alive_; +} + +} // namespace webrtc diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h new file mode 100644 index 0000000000..1b301c8034 --- /dev/null +++ b/rtc_base/task_utils/pending_task_safety_flag.h @@ -0,0 +1,61 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ +#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ + +#include "api/scoped_refptr.h" +#include "rtc_base/checks.h" +#include "rtc_base/ref_count.h" +#include "rtc_base/synchronization/sequence_checker.h" + +namespace webrtc { + +// Use this flag to drop pending tasks that have been posted to the "main" +// thread/TQ and end up running after the owning instance has been +// deleted. The owning instance signals deletion by calling SetNotAlive() from +// its destructor. +// +// When posting a task, post a copy (capture by-value in a lambda) of the flag +// instance and before performing the work, check the |alive()| state. Abort if +// alive() returns |false|: +// +// // Running outside of the main thread. +// my_task_queue_->PostTask(ToQueuedTask( +// [safety = pending_task_safety_flag_, this]() { +// // Now running on the main thread. +// if (!safety->alive()) +// return; +// MyMethod(); +// })); +// +// Note that checking the state only works on the construction/destruction +// thread of the ReceiveStatisticsProxy instance. +class PendingTaskSafetyFlag : public rtc::RefCountInterface { + public: + using Pointer = rtc::scoped_refptr; + static Pointer Create(); + + ~PendingTaskSafetyFlag() = default; + + void SetNotAlive(); + bool alive() const; + + protected: + PendingTaskSafetyFlag() = default; + + private: + bool alive_ = true; + SequenceChecker main_sequence_; +}; + +} // namespace webrtc + +#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc new file mode 100644 index 0000000000..0c1c3c8e52 --- /dev/null +++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc @@ -0,0 +1,151 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_utils/pending_task_safety_flag.h" + +#include + +#include "rtc_base/event.h" +#include "rtc_base/logging.h" +#include "rtc_base/task_queue_for_test.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "test/gmock.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { +using ::testing::AtLeast; +using ::testing::Invoke; +using ::testing::MockFunction; +using ::testing::NiceMock; +using ::testing::Return; +} // namespace + +TEST(PendingTaskSafetyFlagTest, Basic) { + PendingTaskSafetyFlag::Pointer safety_flag; + { + // Scope for the |owner| instance. + class Owner { + public: + Owner() = default; + ~Owner() { flag_->SetNotAlive(); } + + PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + } owner; + EXPECT_TRUE(owner.flag_->alive()); + safety_flag = owner.flag_; + EXPECT_TRUE(safety_flag->alive()); + } + EXPECT_FALSE(safety_flag->alive()); +} + +TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) { + TaskQueueForTest tq1("OwnerHere"); + TaskQueueForTest tq2("OwnerNotHere"); + + class Owner { + public: + Owner() : tq_main_(TaskQueueBase::Current()) { RTC_DCHECK(tq_main_); } + ~Owner() { + RTC_DCHECK(tq_main_->IsCurrent()); + flag_->SetNotAlive(); + } + + void DoStuff() { + RTC_DCHECK(!tq_main_->IsCurrent()); + tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() { + if (!safe->alive()) + return; + stuff_done_ = true; + })); + } + + bool stuff_done() const { return stuff_done_; } + + private: + TaskQueueBase* const tq_main_; + bool stuff_done_ = false; + PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + }; + + std::unique_ptr owner; + tq1.SendTask( + [&owner]() { + owner.reset(new Owner()); + EXPECT_FALSE(owner->stuff_done()); + }, + RTC_FROM_HERE); + ASSERT_TRUE(owner); + tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE); + tq1.SendTask( + [&owner]() { + EXPECT_TRUE(owner->stuff_done()); + owner.reset(); + }, + RTC_FROM_HERE); + ASSERT_FALSE(owner); +} + +TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) { + TaskQueueForTest tq1("OwnerHere"); + TaskQueueForTest tq2("OwnerNotHere"); + + class Owner { + public: + explicit Owner(bool* stuff_done) + : tq_main_(TaskQueueBase::Current()), stuff_done_(stuff_done) { + RTC_DCHECK(tq_main_); + *stuff_done_ = false; + } + ~Owner() { + RTC_DCHECK(tq_main_->IsCurrent()); + flag_->SetNotAlive(); + } + + void DoStuff() { + RTC_DCHECK(!tq_main_->IsCurrent()); + tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() { + if (!safe->alive()) + return; + *stuff_done_ = true; + })); + } + + private: + TaskQueueBase* const tq_main_; + bool* const stuff_done_; + PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + }; + + std::unique_ptr owner; + bool stuff_done = false; + tq1.SendTask([&owner, &stuff_done]() { owner.reset(new Owner(&stuff_done)); }, + RTC_FROM_HERE); + ASSERT_TRUE(owner); + // Queue up a task on tq1 that will execute before the 'DoStuff' task + // can, and delete the |owner| before the 'stuff' task can execute. + rtc::Event blocker; + tq1.PostTask([&blocker, &owner]() { + blocker.Wait(rtc::Event::kForever); + owner.reset(); + }); + + // Queue up a DoStuff... + tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE); + + ASSERT_TRUE(owner); + blocker.Set(); + + // Run an empty task on tq1 to flush all the queued tasks. + tq1.SendTask([]() {}, RTC_FROM_HERE); + ASSERT_FALSE(owner); + EXPECT_FALSE(stuff_done); +} +} // namespace webrtc diff --git a/test/scenario/call_client.h b/test/scenario/call_client.h index 803b4a8313..33fa2765cb 100644 --- a/test/scenario/call_client.h +++ b/test/scenario/call_client.h @@ -113,6 +113,11 @@ class CallClient : public EmulatedNetworkReceiverInterface { void OnPacketReceived(EmulatedIpPacket packet) override; std::unique_ptr GetLogWriter(std::string name); + // Exposed publicly so that tests can execute tasks such as querying stats + // for media streams in the expected runtime environment (essentially what + // CallClient does internally for GetStats()). + void SendTask(std::function task); + private: friend class Scenario; friend class CallClientPair; @@ -129,7 +134,6 @@ class CallClient : public EmulatedNetworkReceiverInterface { uint32_t GetNextAudioLocalSsrc(); uint32_t GetNextRtxSsrc(); void AddExtensions(std::vector extensions); - void SendTask(std::function task); int16_t Bind(EmulatedEndpoint* endpoint); void UnBind(); diff --git a/test/scenario/stats_collection_unittest.cc b/test/scenario/stats_collection_unittest.cc index fae3365d5d..af3b982838 100644 --- a/test/scenario/stats_collection_unittest.cc +++ b/test/scenario/stats_collection_unittest.cc @@ -25,17 +25,26 @@ void CreateAnalyzedStream(Scenario* s, VideoStreamConfig::Encoder::Implementation::kSoftware; config.hooks.frame_pair_handlers = {analyzer->Handler()}; auto* caller = s->CreateClient("caller", CallClientConfig()); + auto* callee = s->CreateClient("callee", CallClientConfig()); auto route = - s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, - s->CreateClient("callee", CallClientConfig()), + s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, callee, {s->CreateSimulationNode(NetworkSimulationConfig())}); - auto* video = s->CreateVideoStream(route->forward(), config); + VideoStreamPair* video = s->CreateVideoStream(route->forward(), config); auto* audio = s->CreateAudioStream(route->forward(), AudioStreamConfig()); s->Every(TimeDelta::Seconds(1), [=] { collectors->call.AddStats(caller->GetStats()); - collectors->audio_receive.AddStats(audio->receive()->GetStats()); collectors->video_send.AddStats(video->send()->GetStats(), s->Now()); - collectors->video_receive.AddStats(video->receive()->GetStats()); + collectors->audio_receive.AddStats(audio->receive()->GetStats()); + + // Querying the video stats from within the expected runtime environment + // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that + // we're currently on). + VideoReceiveStream::Stats video_receive_stats; + auto* video_stream = video->receive(); + callee->SendTask([&video_stream, &video_receive_stats]() { + video_receive_stats = video_stream->GetStats(); + }); + collectors->video_receive.AddStats(video_receive_stats); }); } } // namespace diff --git a/video/BUILD.gn b/video/BUILD.gn index 14109c3494..9d26ee2c37 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -115,6 +115,7 @@ rtc_library("video") { "../rtc_base/experiments:rate_control_settings", "../rtc_base/synchronization:sequence_checker", "../rtc_base/system:thread_registry", + "../rtc_base/task_utils:pending_task_safety_flag", "../rtc_base/task_utils:repeating_task", "../rtc_base/task_utils:to_queued_task", "../rtc_base/time:timestamp_extrapolator", diff --git a/video/end_to_end_tests/retransmission_tests.cc b/video/end_to_end_tests/retransmission_tests.cc index 407aa5f2dc..c28b12960f 100644 --- a/video/end_to_end_tests/retransmission_tests.cc +++ b/video/end_to_end_tests/retransmission_tests.cc @@ -18,8 +18,8 @@ #include "call/simulated_network.h" #include "modules/rtp_rtcp/source/rtp_packet.h" #include "modules/video_coding/codecs/vp8/include/vp8.h" +#include "rtc_base/event.h" #include "rtc_base/task_queue_for_test.h" -#include "system_wrappers/include/sleep.h" #include "test/call_test.h" #include "test/field_trial.h" #include "test/gtest.h" @@ -203,7 +203,7 @@ TEST_F(RetransmissionEndToEndTest, ReceivesNackAndRetransmitsAudio) { TEST_F(RetransmissionEndToEndTest, StopSendingKeyframeRequestsForInactiveStream) { - class KeyframeRequestObserver : public test::EndToEndTest { + class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask { public: explicit KeyframeRequestObserver(TaskQueueBase* task_queue) : clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {} @@ -216,28 +216,59 @@ TEST_F(RetransmissionEndToEndTest, receive_stream_ = receive_streams[0]; } - void PerformTest() override { - bool frame_decoded = false; - int64_t start_time = clock_->TimeInMilliseconds(); - while (clock_->TimeInMilliseconds() - start_time <= 5000) { - if (receive_stream_->GetStats().frames_decoded > 0) { - frame_decoded = true; - break; - } - SleepMs(100); + Action OnReceiveRtcp(const uint8_t* packet, size_t length) override { + test::RtcpPacketParser parser; + EXPECT_TRUE(parser.Parse(packet, length)); + if (parser.pli()->num_packets() > 0) + task_queue_->PostTask(std::unique_ptr(this)); + return SEND_PACKET; + } + + bool PollStats() { + if (receive_stream_->GetStats().frames_decoded > 0) { + frame_decoded_ = true; + } else if (clock_->TimeInMilliseconds() - start_time_ < 5000) { + task_queue_->PostDelayedTask(std::unique_ptr(this), 100); + return false; } - ASSERT_TRUE(frame_decoded); - SendTask(RTC_FROM_HERE, task_queue_, [this]() { send_stream_->Stop(); }); - SleepMs(10000); - ASSERT_EQ( - 1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets); + return true; + } + + void PerformTest() override { + start_time_ = clock_->TimeInMilliseconds(); + task_queue_->PostTask(std::unique_ptr(this)); + test_done_.Wait(rtc::Event::kForever); + } + + bool Run() override { + if (!frame_decoded_) { + if (PollStats()) { + send_stream_->Stop(); + if (!frame_decoded_) { + test_done_.Set(); + } else { + // Now we wait for the PLI packet. Once we receive it, a task + // will be posted (see OnReceiveRtcp) and we'll check the stats + // once more before signaling that we're done. + } + } + } else { + EXPECT_EQ( + 1U, + receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets); + test_done_.Set(); + } + return false; } private: - Clock* clock_; + Clock* const clock_; VideoSendStream* send_stream_; VideoReceiveStream* receive_stream_; TaskQueueBase* const task_queue_; + rtc::Event test_done_; + bool frame_decoded_ = false; + int64_t start_time_ = 0; } test(task_queue()); RunBaseTest(&test); diff --git a/video/end_to_end_tests/stats_tests.cc b/video/end_to_end_tests/stats_tests.cc index b43f79df0a..32bcedb9c8 100644 --- a/video/end_to_end_tests/stats_tests.cc +++ b/video/end_to_end_tests/stats_tests.cc @@ -297,6 +297,7 @@ TEST_F(StatsEndToEndTest, GetStats) { const std::vector& receive_streams) override { send_stream_ = send_stream; receive_streams_ = receive_streams; + task_queue_ = TaskQueueBase::Current(); } void PerformTest() override { @@ -307,8 +308,10 @@ TEST_F(StatsEndToEndTest, GetStats) { bool send_ok = false; while (now_ms < stop_time_ms) { - if (!receive_ok) - receive_ok = CheckReceiveStats(); + if (!receive_ok && task_queue_) { + SendTask(RTC_FROM_HERE, task_queue_, + [&]() { receive_ok = CheckReceiveStats(); }); + } if (!send_ok) send_ok = CheckSendStats(); @@ -346,6 +349,7 @@ TEST_F(StatsEndToEndTest, GetStats) { rtc::Event check_stats_event_; ReceiveStreamRenderer receive_stream_renderer_; + TaskQueueBase* task_queue_ = nullptr; } test; RunBaseTest(&test); @@ -377,22 +381,28 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) { VideoSendStream* send_stream, const std::vector& receive_streams) override { receive_streams_ = receive_streams; + task_queue_ = TaskQueueBase::Current(); } void PerformTest() override { // No frames reported initially. - for (const auto& receive_stream : receive_streams_) { - EXPECT_FALSE(receive_stream->GetStats().timing_frame_info); - } + SendTask(RTC_FROM_HERE, task_queue_, [&]() { + for (const auto& receive_stream : receive_streams_) { + EXPECT_FALSE(receive_stream->GetStats().timing_frame_info); + } + }); // Wait for at least one timing frame to be sent with 100ms grace period. SleepMs(kDefaultTimingFramesDelayMs + 100); // Check that timing frames are reported for each stream. - for (const auto& receive_stream : receive_streams_) { - EXPECT_TRUE(receive_stream->GetStats().timing_frame_info); - } + SendTask(RTC_FROM_HERE, task_queue_, [&]() { + for (const auto& receive_stream : receive_streams_) { + EXPECT_TRUE(receive_stream->GetStats().timing_frame_info); + } + }); } std::vector receive_streams_; + TaskQueueBase* task_queue_ = nullptr; } test; RunBaseTest(&test); @@ -400,7 +410,8 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) { TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) { static const size_t kNumRtpPacketsToSend = 5; - class ReceivedRtpStatsObserver : public test::EndToEndTest { + class ReceivedRtpStatsObserver : public test::EndToEndTest, + public QueuedTask { public: ReceivedRtpStatsObserver() : EndToEndTest(kDefaultTimeoutMs), @@ -412,14 +423,14 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) { VideoSendStream* send_stream, const std::vector& receive_streams) override { receive_stream_ = receive_streams[0]; + task_queue_ = TaskQueueBase::Current(); + EXPECT_TRUE(task_queue_ != nullptr); } Action OnSendRtp(const uint8_t* packet, size_t length) override { if (sent_rtp_ >= kNumRtpPacketsToSend) { - VideoReceiveStream::Stats stats = receive_stream_->GetStats(); - if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) { - observation_complete_.Set(); - } + // Need to check the stats on the correct thread. + task_queue_->PostTask(std::unique_ptr(this)); return DROP_PACKET; } ++sent_rtp_; @@ -431,8 +442,17 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) { << "Timed out while verifying number of received RTP packets."; } + bool Run() override { + VideoReceiveStream::Stats stats = receive_stream_->GetStats(); + if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) { + observation_complete_.Set(); + } + return false; + } + VideoReceiveStream* receive_stream_; uint32_t sent_rtp_; + TaskQueueBase* task_queue_ = nullptr; } test; RunBaseTest(&test); @@ -578,7 +598,7 @@ TEST_F(StatsEndToEndTest, MAYBE_ContentTypeSwitches) { TEST_F(StatsEndToEndTest, VerifyNackStats) { static const int kPacketNumberToDrop = 200; - class NackObserver : public test::EndToEndTest { + class NackObserver : public test::EndToEndTest, public QueuedTask { public: NackObserver() : EndToEndTest(kLongTimeoutMs), @@ -598,7 +618,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) { dropped_rtp_packet_ = header.sequenceNumber; return DROP_PACKET; } - VerifyStats(); + task_queue_->PostTask(std::unique_ptr(this)); return SEND_PACKET; } @@ -659,6 +679,14 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) { const std::vector& receive_streams) override { send_stream_ = send_stream; receive_streams_ = receive_streams; + task_queue_ = TaskQueueBase::Current(); + EXPECT_TRUE(task_queue_ != nullptr); + } + + bool Run() override { + rtc::CritScope lock(&crit_); + VerifyStats(); + return false; } void PerformTest() override { @@ -673,6 +701,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) { std::vector receive_streams_; VideoSendStream* send_stream_; absl::optional start_runtime_ms_; + TaskQueueBase* task_queue_ = nullptr; } test; metrics::Reset();