diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc index 04fdaca770..f93e66e009 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc @@ -790,26 +790,25 @@ double DefaultVideoQualityAnalyzer::GetCpuUsagePercent() { } uint16_t DefaultVideoQualityAnalyzer::StreamState::PopFront(size_t peer) { - absl::optional frame_id = frame_ids_.PopFront(peer); + size_t peer_queue = GetPeerQueueIndex(peer); + size_t alive_frames_queue = GetAliveFramesQueueIndex(); + absl::optional frame_id = frame_ids_.PopFront(peer_queue); RTC_DCHECK(frame_id.has_value()); // If alive's frame queue is longer than all others, than also pop frame from // it, because that frame is received by all receivers. - size_t owner_size = frame_ids_.size(owner_); + size_t alive_size = frame_ids_.size(alive_frames_queue); size_t other_size = 0; for (size_t i = 0; i < frame_ids_.readers_count(); ++i) { size_t cur_size = frame_ids_.size(i); - if (i != owner_ && cur_size > other_size) { + if (i != alive_frames_queue && cur_size > other_size) { other_size = cur_size; } } - // Pops frame from owner queue if owner's queue is the longest and one of - // next conditions is true: - // 1. If `enable_receive_own_stream_` and `peer` == `owner_` - // 2. If !`enable_receive_own_stream_` - if (owner_size > other_size && - (!enable_receive_own_stream_ || peer == owner_)) { - absl::optional alive_frame_id = frame_ids_.PopFront(owner_); + // Pops frame from alive queue if alive's queue is the longest one. + if (alive_size > other_size) { + absl::optional alive_frame_id = + frame_ids_.PopFront(alive_frames_queue); RTC_DCHECK(alive_frame_id.has_value()); RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value()); } @@ -818,7 +817,8 @@ uint16_t DefaultVideoQualityAnalyzer::StreamState::PopFront(size_t peer) { } uint16_t DefaultVideoQualityAnalyzer::StreamState::MarkNextAliveFrameAsDead() { - absl::optional frame_id = frame_ids_.PopFront(owner_); + absl::optional frame_id = + frame_ids_.PopFront(GetAliveFramesQueueIndex()); RTC_DCHECK(frame_id.has_value()); return frame_id.value(); } @@ -840,6 +840,31 @@ DefaultVideoQualityAnalyzer::StreamState::last_rendered_frame_time( return MaybeGetValue(last_rendered_frame_time_, peer); } +size_t DefaultVideoQualityAnalyzer::StreamState::GetPeerQueueIndex( + size_t peer_index) const { + // When sender isn't expecting to receive its own stream we will use their + // queue for tracking alive frames. Otherwise we will use the queue #0 to + // track alive frames and will shift all other queues for peers on 1. + // It means when `enable_receive_own_stream_` is true peer's queue will have + // index equal to `peer_index` + 1 and when `enable_receive_own_stream_` is + // false peer's queue will have index equal to `peer_index`. + if (!enable_receive_own_stream_) { + return peer_index; + } + return peer_index + 1; +} + +size_t DefaultVideoQualityAnalyzer::StreamState::GetAliveFramesQueueIndex() + const { + // When sender isn't expecting to receive its own stream we will use their + // queue for tracking alive frames. Otherwise we will use the queue #0 to + // track alive frames and will shift all other queues for peers on 1. + if (!enable_receive_own_stream_) { + return owner_; + } + return 0; +} + bool DefaultVideoQualityAnalyzer::FrameInFlight::RemoveFrame() { if (!frame_) { return false; diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h index 57b202e894..04fe5f2cfc 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h @@ -104,7 +104,8 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface { : owner_(owner), enable_receive_own_stream_(enable_receive_own_stream), stream_started_time_(stream_started_time), - frame_ids_(peers_count) {} + frame_ids_(enable_receive_own_stream ? peers_count + 1 + : peers_count) {} size_t owner() const { return owner_; } Timestamp stream_started_time() const { return stream_started_time_; } @@ -113,22 +114,37 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface { // Crash if state is empty. Guarantees that there can be no alive frames // that are not in the owner queue uint16_t PopFront(size_t peer); - bool IsEmpty(size_t peer) const { return frame_ids_.IsEmpty(peer); } + bool IsEmpty(size_t peer) const { + return frame_ids_.IsEmpty(GetPeerQueueIndex(peer)); + } // Crash if state is empty. - uint16_t Front(size_t peer) const { return frame_ids_.Front(peer).value(); } + uint16_t Front(size_t peer) const { + return frame_ids_.Front(GetPeerQueueIndex(peer)).value(); + } // When new peer is added - all current alive frames will be sent to it as // well. So we need to register them as expected by copying owner_ head to // the new head. - void AddPeer() { frame_ids_.AddHead(owner_); } + void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); } - size_t GetAliveFramesCount() { return frame_ids_.size(owner_); } + size_t GetAliveFramesCount() const { + return frame_ids_.size(GetAliveFramesQueueIndex()); + } uint16_t MarkNextAliveFrameAsDead(); void SetLastRenderedFrameTime(size_t peer, Timestamp time); absl::optional last_rendered_frame_time(size_t peer) const; private: + // Returns index of the `frame_ids_` queue which is used for specified + // `peer_index`. + size_t GetPeerQueueIndex(size_t peer_index) const; + + // Returns index of the `frame_ids_` queue which is used to track alive + // frames for this stream. The frame is alive if it contains VideoFrame + // payload in `captured_frames_in_flight_`. + size_t GetAliveFramesQueueIndex() const; + // Index of the owner. Owner's queue in `frame_ids_` will keep alive frames. const size_t owner_; const bool enable_receive_own_stream_; @@ -145,10 +161,6 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface { // If we received frame with id frame_id3, then we will pop frame_id1 and // frame_id2 and consider that frames as dropped and then compare received // frame with the one from `captured_frames_in_flight_` with id frame_id3. - // - // To track alive frames (frames that contains frame's payload in - // `captured_frames_in_flight_`) the head which corresponds to `owner_` will - // be used. So that head will point to the first alive frame in frames list. MultiHeadQueue frame_ids_; std::map last_rendered_frame_time_; }; diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc index 50d62f4b89..523bb0aa64 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc @@ -965,7 +965,8 @@ TEST(DefaultVideoQualityAnalyzerTest, EXPECT_EQ(frame_counters.rendered, 2); } -TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) { +TEST(DefaultVideoQualityAnalyzerTest, + FrameCanBeReceivedBySenderAfterItWasReceivedByReceiver) { std::unique_ptr frame_generator = test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight, /*type=*/absl::nullopt, @@ -978,37 +979,45 @@ TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) { std::vector{kSenderPeerName, kReceiverPeerName}, kAnalyzerMaxThreadsCount); - VideoFrame frame = NextFrame(frame_generator.get(), 1); - - frame.set_id(analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame)); - analyzer.OnFramePreEncode(kSenderPeerName, frame); - analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame), - VideoQualityAnalyzerInterface::EncoderStats()); + std::vector frames; + for (int i = 0; i < 3; ++i) { + VideoFrame frame = NextFrame(frame_generator.get(), 1); + frame.set_id( + analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame)); + frames.push_back(frame); + analyzer.OnFramePreEncode(kSenderPeerName, frame); + analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame), + VideoQualityAnalyzerInterface::EncoderStats()); + } // Receive by 2nd peer. - VideoFrame received_frame = DeepCopy(frame); - analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(), - FakeEncode(received_frame)); - analyzer.OnFrameDecoded(kReceiverPeerName, received_frame, - VideoQualityAnalyzerInterface::DecoderStats()); - analyzer.OnFrameRendered(kReceiverPeerName, received_frame); + for (VideoFrame& frame : frames) { + VideoFrame received_frame = DeepCopy(frame); + analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(), + FakeEncode(received_frame)); + analyzer.OnFrameDecoded(kReceiverPeerName, received_frame, + VideoQualityAnalyzerInterface::DecoderStats()); + analyzer.OnFrameRendered(kReceiverPeerName, received_frame); + } // Check that we still have that frame in flight. AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats(); std::vector frames_in_flight_sizes = GetSortedSamples(analyzer_stats.frames_in_flight_left_count); - EXPECT_EQ(frames_in_flight_sizes.back().value, 1) + EXPECT_EQ(frames_in_flight_sizes.back().value, 3) << "Expected that frame is still in flight, " << "because it wasn't received by sender" << ToString(frames_in_flight_sizes); // Receive by sender - received_frame = DeepCopy(frame); - analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(), - FakeEncode(received_frame)); - analyzer.OnFrameDecoded(kSenderPeerName, received_frame, - VideoQualityAnalyzerInterface::DecoderStats()); - analyzer.OnFrameRendered(kSenderPeerName, received_frame); + for (VideoFrame& frame : frames) { + VideoFrame received_frame = DeepCopy(frame); + analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(), + FakeEncode(received_frame)); + analyzer.OnFrameDecoded(kSenderPeerName, received_frame, + VideoQualityAnalyzerInterface::DecoderStats()); + analyzer.OnFrameRendered(kSenderPeerName, received_frame); + } // Give analyzer some time to process frames on async thread. The computations // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it @@ -1017,7 +1026,7 @@ TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) { analyzer.Stop(); analyzer_stats = analyzer.GetAnalyzerStats(); - EXPECT_EQ(analyzer_stats.comparisons_done, 2); + EXPECT_EQ(analyzer_stats.comparisons_done, 6); frames_in_flight_sizes = GetSortedSamples(analyzer_stats.frames_in_flight_left_count); @@ -1025,29 +1034,124 @@ TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) { << ToString(frames_in_flight_sizes); FrameCounters frame_counters = analyzer.GetGlobalCounters(); - EXPECT_EQ(frame_counters.captured, 1); - EXPECT_EQ(frame_counters.rendered, 2); + EXPECT_EQ(frame_counters.captured, 3); + EXPECT_EQ(frame_counters.rendered, 6); EXPECT_EQ(analyzer.GetStats().size(), 2lu); { FrameCounters stream_conters = analyzer.GetPerStreamCounters().at( StatsKey(kStreamLabel, kSenderPeerName, kReceiverPeerName)); - EXPECT_EQ(stream_conters.captured, 1); - EXPECT_EQ(stream_conters.pre_encoded, 1); - EXPECT_EQ(stream_conters.encoded, 1); - EXPECT_EQ(stream_conters.received, 1); - EXPECT_EQ(stream_conters.decoded, 1); - EXPECT_EQ(stream_conters.rendered, 1); + EXPECT_EQ(stream_conters.captured, 3); + EXPECT_EQ(stream_conters.pre_encoded, 3); + EXPECT_EQ(stream_conters.encoded, 3); + EXPECT_EQ(stream_conters.received, 3); + EXPECT_EQ(stream_conters.decoded, 3); + EXPECT_EQ(stream_conters.rendered, 3); } { FrameCounters stream_conters = analyzer.GetPerStreamCounters().at( StatsKey(kStreamLabel, kSenderPeerName, kSenderPeerName)); - EXPECT_EQ(stream_conters.captured, 1); - EXPECT_EQ(stream_conters.pre_encoded, 1); - EXPECT_EQ(stream_conters.encoded, 1); - EXPECT_EQ(stream_conters.received, 1); - EXPECT_EQ(stream_conters.decoded, 1); - EXPECT_EQ(stream_conters.rendered, 1); + EXPECT_EQ(stream_conters.captured, 3); + EXPECT_EQ(stream_conters.pre_encoded, 3); + EXPECT_EQ(stream_conters.encoded, 3); + EXPECT_EQ(stream_conters.received, 3); + EXPECT_EQ(stream_conters.decoded, 3); + EXPECT_EQ(stream_conters.rendered, 3); + } +} + +TEST(DefaultVideoQualityAnalyzerTest, + FrameCanBeReceivedByRecieverAfterItWasReceivedBySender) { + std::unique_ptr frame_generator = + test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight, + /*type=*/absl::nullopt, + /*num_squares=*/absl::nullopt); + + DefaultVideoQualityAnalyzerOptions options = AnalyzerOptionsForTest(); + options.enable_receive_own_stream = true; + DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(), options); + analyzer.Start("test_case", + std::vector{kSenderPeerName, kReceiverPeerName}, + kAnalyzerMaxThreadsCount); + + std::vector frames; + for (int i = 0; i < 3; ++i) { + VideoFrame frame = NextFrame(frame_generator.get(), 1); + frame.set_id( + analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame)); + frames.push_back(frame); + analyzer.OnFramePreEncode(kSenderPeerName, frame); + analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame), + VideoQualityAnalyzerInterface::EncoderStats()); + } + + // Receive by sender + for (VideoFrame& frame : frames) { + VideoFrame received_frame = DeepCopy(frame); + analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(), + FakeEncode(received_frame)); + analyzer.OnFrameDecoded(kSenderPeerName, received_frame, + VideoQualityAnalyzerInterface::DecoderStats()); + analyzer.OnFrameRendered(kSenderPeerName, received_frame); + } + + // Check that we still have that frame in flight. + AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats(); + std::vector frames_in_flight_sizes = + GetSortedSamples(analyzer_stats.frames_in_flight_left_count); + EXPECT_EQ(frames_in_flight_sizes.back().value, 3) + << "Expected that frame is still in flight, " + << "because it wasn't received by sender" + << ToString(frames_in_flight_sizes); + + // Receive by 2nd peer. + for (VideoFrame& frame : frames) { + VideoFrame received_frame = DeepCopy(frame); + analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(), + FakeEncode(received_frame)); + analyzer.OnFrameDecoded(kReceiverPeerName, received_frame, + VideoQualityAnalyzerInterface::DecoderStats()); + analyzer.OnFrameRendered(kReceiverPeerName, received_frame); + } + + // Give analyzer some time to process frames on async thread. The computations + // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it + // means we have an issue! + SleepMs(100); + analyzer.Stop(); + + analyzer_stats = analyzer.GetAnalyzerStats(); + EXPECT_EQ(analyzer_stats.comparisons_done, 6); + + frames_in_flight_sizes = + GetSortedSamples(analyzer_stats.frames_in_flight_left_count); + EXPECT_EQ(frames_in_flight_sizes.back().value, 0) + << ToString(frames_in_flight_sizes); + + FrameCounters frame_counters = analyzer.GetGlobalCounters(); + EXPECT_EQ(frame_counters.captured, 3); + EXPECT_EQ(frame_counters.rendered, 6); + + EXPECT_EQ(analyzer.GetStats().size(), 2lu); + { + FrameCounters stream_conters = analyzer.GetPerStreamCounters().at( + StatsKey(kStreamLabel, kSenderPeerName, kReceiverPeerName)); + EXPECT_EQ(stream_conters.captured, 3); + EXPECT_EQ(stream_conters.pre_encoded, 3); + EXPECT_EQ(stream_conters.encoded, 3); + EXPECT_EQ(stream_conters.received, 3); + EXPECT_EQ(stream_conters.decoded, 3); + EXPECT_EQ(stream_conters.rendered, 3); + } + { + FrameCounters stream_conters = analyzer.GetPerStreamCounters().at( + StatsKey(kStreamLabel, kSenderPeerName, kSenderPeerName)); + EXPECT_EQ(stream_conters.captured, 3); + EXPECT_EQ(stream_conters.pre_encoded, 3); + EXPECT_EQ(stream_conters.encoded, 3); + EXPECT_EQ(stream_conters.received, 3); + EXPECT_EQ(stream_conters.decoded, 3); + EXPECT_EQ(stream_conters.rendered, 3); } } diff --git a/test/pc/e2e/analyzer/video/multi_head_queue.h b/test/pc/e2e/analyzer/video/multi_head_queue.h index 0af6a19675..1486780656 100644 --- a/test/pc/e2e/analyzer/video/multi_head_queue.h +++ b/test/pc/e2e/analyzer/video/multi_head_queue.h @@ -36,6 +36,8 @@ class MultiHeadQueue { } // Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()). + // `copy_index` - index of the queue that will be used as a source for + // copying. void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); } // Add value to the end of the queue. Complexity O(readers_count).