Use separate queue for alive frames when self view is enabled in DVQA

Bug: b/195652126
Change-Id: Ief1c6ba5216147e0dbfe280e7c001902e1a4d6fc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/229100
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34790}
This commit is contained in:
Artem Titov 2021-08-17 20:35:58 +02:00 committed by WebRTC LUCI CQ
parent e57a493301
commit 9d8c3d9010
4 changed files with 198 additions and 55 deletions

View File

@ -790,26 +790,25 @@ double DefaultVideoQualityAnalyzer::GetCpuUsagePercent() {
}
uint16_t DefaultVideoQualityAnalyzer::StreamState::PopFront(size_t peer) {
absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer);
size_t peer_queue = GetPeerQueueIndex(peer);
size_t alive_frames_queue = GetAliveFramesQueueIndex();
absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer_queue);
RTC_DCHECK(frame_id.has_value());
// If alive's frame queue is longer than all others, than also pop frame from
// it, because that frame is received by all receivers.
size_t owner_size = frame_ids_.size(owner_);
size_t alive_size = frame_ids_.size(alive_frames_queue);
size_t other_size = 0;
for (size_t i = 0; i < frame_ids_.readers_count(); ++i) {
size_t cur_size = frame_ids_.size(i);
if (i != owner_ && cur_size > other_size) {
if (i != alive_frames_queue && cur_size > other_size) {
other_size = cur_size;
}
}
// Pops frame from owner queue if owner's queue is the longest and one of
// next conditions is true:
// 1. If `enable_receive_own_stream_` and `peer` == `owner_`
// 2. If !`enable_receive_own_stream_`
if (owner_size > other_size &&
(!enable_receive_own_stream_ || peer == owner_)) {
absl::optional<uint16_t> alive_frame_id = frame_ids_.PopFront(owner_);
// Pops frame from alive queue if alive's queue is the longest one.
if (alive_size > other_size) {
absl::optional<uint16_t> alive_frame_id =
frame_ids_.PopFront(alive_frames_queue);
RTC_DCHECK(alive_frame_id.has_value());
RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value());
}
@ -818,7 +817,8 @@ uint16_t DefaultVideoQualityAnalyzer::StreamState::PopFront(size_t peer) {
}
uint16_t DefaultVideoQualityAnalyzer::StreamState::MarkNextAliveFrameAsDead() {
absl::optional<uint16_t> frame_id = frame_ids_.PopFront(owner_);
absl::optional<uint16_t> frame_id =
frame_ids_.PopFront(GetAliveFramesQueueIndex());
RTC_DCHECK(frame_id.has_value());
return frame_id.value();
}
@ -840,6 +840,31 @@ DefaultVideoQualityAnalyzer::StreamState::last_rendered_frame_time(
return MaybeGetValue(last_rendered_frame_time_, peer);
}
size_t DefaultVideoQualityAnalyzer::StreamState::GetPeerQueueIndex(
size_t peer_index) const {
// When sender isn't expecting to receive its own stream we will use their
// queue for tracking alive frames. Otherwise we will use the queue #0 to
// track alive frames and will shift all other queues for peers on 1.
// It means when `enable_receive_own_stream_` is true peer's queue will have
// index equal to `peer_index` + 1 and when `enable_receive_own_stream_` is
// false peer's queue will have index equal to `peer_index`.
if (!enable_receive_own_stream_) {
return peer_index;
}
return peer_index + 1;
}
size_t DefaultVideoQualityAnalyzer::StreamState::GetAliveFramesQueueIndex()
const {
// When sender isn't expecting to receive its own stream we will use their
// queue for tracking alive frames. Otherwise we will use the queue #0 to
// track alive frames and will shift all other queues for peers on 1.
if (!enable_receive_own_stream_) {
return owner_;
}
return 0;
}
bool DefaultVideoQualityAnalyzer::FrameInFlight::RemoveFrame() {
if (!frame_) {
return false;

View File

@ -104,7 +104,8 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
: owner_(owner),
enable_receive_own_stream_(enable_receive_own_stream),
stream_started_time_(stream_started_time),
frame_ids_(peers_count) {}
frame_ids_(enable_receive_own_stream ? peers_count + 1
: peers_count) {}
size_t owner() const { return owner_; }
Timestamp stream_started_time() const { return stream_started_time_; }
@ -113,22 +114,37 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
// Crash if state is empty. Guarantees that there can be no alive frames
// that are not in the owner queue
uint16_t PopFront(size_t peer);
bool IsEmpty(size_t peer) const { return frame_ids_.IsEmpty(peer); }
bool IsEmpty(size_t peer) const {
return frame_ids_.IsEmpty(GetPeerQueueIndex(peer));
}
// Crash if state is empty.
uint16_t Front(size_t peer) const { return frame_ids_.Front(peer).value(); }
uint16_t Front(size_t peer) const {
return frame_ids_.Front(GetPeerQueueIndex(peer)).value();
}
// When new peer is added - all current alive frames will be sent to it as
// well. So we need to register them as expected by copying owner_ head to
// the new head.
void AddPeer() { frame_ids_.AddHead(owner_); }
void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); }
size_t GetAliveFramesCount() { return frame_ids_.size(owner_); }
size_t GetAliveFramesCount() const {
return frame_ids_.size(GetAliveFramesQueueIndex());
}
uint16_t MarkNextAliveFrameAsDead();
void SetLastRenderedFrameTime(size_t peer, Timestamp time);
absl::optional<Timestamp> last_rendered_frame_time(size_t peer) const;
private:
// Returns index of the `frame_ids_` queue which is used for specified
// `peer_index`.
size_t GetPeerQueueIndex(size_t peer_index) const;
// Returns index of the `frame_ids_` queue which is used to track alive
// frames for this stream. The frame is alive if it contains VideoFrame
// payload in `captured_frames_in_flight_`.
size_t GetAliveFramesQueueIndex() const;
// Index of the owner. Owner's queue in `frame_ids_` will keep alive frames.
const size_t owner_;
const bool enable_receive_own_stream_;
@ -145,10 +161,6 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
// If we received frame with id frame_id3, then we will pop frame_id1 and
// frame_id2 and consider that frames as dropped and then compare received
// frame with the one from `captured_frames_in_flight_` with id frame_id3.
//
// To track alive frames (frames that contains frame's payload in
// `captured_frames_in_flight_`) the head which corresponds to `owner_` will
// be used. So that head will point to the first alive frame in frames list.
MultiHeadQueue<uint16_t> frame_ids_;
std::map<size_t, Timestamp> last_rendered_frame_time_;
};

View File

@ -965,7 +965,8 @@ TEST(DefaultVideoQualityAnalyzerTest,
EXPECT_EQ(frame_counters.rendered, 2);
}
TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) {
TEST(DefaultVideoQualityAnalyzerTest,
FrameCanBeReceivedBySenderAfterItWasReceivedByReceiver) {
std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
/*type=*/absl::nullopt,
@ -978,37 +979,45 @@ TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) {
std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
kAnalyzerMaxThreadsCount);
VideoFrame frame = NextFrame(frame_generator.get(), 1);
frame.set_id(analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
analyzer.OnFramePreEncode(kSenderPeerName, frame);
analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
VideoQualityAnalyzerInterface::EncoderStats());
std::vector<VideoFrame> frames;
for (int i = 0; i < 3; ++i) {
VideoFrame frame = NextFrame(frame_generator.get(), 1);
frame.set_id(
analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
frames.push_back(frame);
analyzer.OnFramePreEncode(kSenderPeerName, frame);
analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
VideoQualityAnalyzerInterface::EncoderStats());
}
// Receive by 2nd peer.
VideoFrame received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
for (VideoFrame& frame : frames) {
VideoFrame received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
}
// Check that we still have that frame in flight.
AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats();
std::vector<StatsSample> frames_in_flight_sizes =
GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
EXPECT_EQ(frames_in_flight_sizes.back().value, 1)
EXPECT_EQ(frames_in_flight_sizes.back().value, 3)
<< "Expected that frame is still in flight, "
<< "because it wasn't received by sender"
<< ToString(frames_in_flight_sizes);
// Receive by sender
received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(kSenderPeerName, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(kSenderPeerName, received_frame);
for (VideoFrame& frame : frames) {
VideoFrame received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(kSenderPeerName, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(kSenderPeerName, received_frame);
}
// Give analyzer some time to process frames on async thread. The computations
// have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
@ -1017,7 +1026,7 @@ TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) {
analyzer.Stop();
analyzer_stats = analyzer.GetAnalyzerStats();
EXPECT_EQ(analyzer_stats.comparisons_done, 2);
EXPECT_EQ(analyzer_stats.comparisons_done, 6);
frames_in_flight_sizes =
GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
@ -1025,29 +1034,124 @@ TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) {
<< ToString(frames_in_flight_sizes);
FrameCounters frame_counters = analyzer.GetGlobalCounters();
EXPECT_EQ(frame_counters.captured, 1);
EXPECT_EQ(frame_counters.rendered, 2);
EXPECT_EQ(frame_counters.captured, 3);
EXPECT_EQ(frame_counters.rendered, 6);
EXPECT_EQ(analyzer.GetStats().size(), 2lu);
{
FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
StatsKey(kStreamLabel, kSenderPeerName, kReceiverPeerName));
EXPECT_EQ(stream_conters.captured, 1);
EXPECT_EQ(stream_conters.pre_encoded, 1);
EXPECT_EQ(stream_conters.encoded, 1);
EXPECT_EQ(stream_conters.received, 1);
EXPECT_EQ(stream_conters.decoded, 1);
EXPECT_EQ(stream_conters.rendered, 1);
EXPECT_EQ(stream_conters.captured, 3);
EXPECT_EQ(stream_conters.pre_encoded, 3);
EXPECT_EQ(stream_conters.encoded, 3);
EXPECT_EQ(stream_conters.received, 3);
EXPECT_EQ(stream_conters.decoded, 3);
EXPECT_EQ(stream_conters.rendered, 3);
}
{
FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
StatsKey(kStreamLabel, kSenderPeerName, kSenderPeerName));
EXPECT_EQ(stream_conters.captured, 1);
EXPECT_EQ(stream_conters.pre_encoded, 1);
EXPECT_EQ(stream_conters.encoded, 1);
EXPECT_EQ(stream_conters.received, 1);
EXPECT_EQ(stream_conters.decoded, 1);
EXPECT_EQ(stream_conters.rendered, 1);
EXPECT_EQ(stream_conters.captured, 3);
EXPECT_EQ(stream_conters.pre_encoded, 3);
EXPECT_EQ(stream_conters.encoded, 3);
EXPECT_EQ(stream_conters.received, 3);
EXPECT_EQ(stream_conters.decoded, 3);
EXPECT_EQ(stream_conters.rendered, 3);
}
}
TEST(DefaultVideoQualityAnalyzerTest,
FrameCanBeReceivedByRecieverAfterItWasReceivedBySender) {
std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
/*type=*/absl::nullopt,
/*num_squares=*/absl::nullopt);
DefaultVideoQualityAnalyzerOptions options = AnalyzerOptionsForTest();
options.enable_receive_own_stream = true;
DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(), options);
analyzer.Start("test_case",
std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
kAnalyzerMaxThreadsCount);
std::vector<VideoFrame> frames;
for (int i = 0; i < 3; ++i) {
VideoFrame frame = NextFrame(frame_generator.get(), 1);
frame.set_id(
analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
frames.push_back(frame);
analyzer.OnFramePreEncode(kSenderPeerName, frame);
analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
VideoQualityAnalyzerInterface::EncoderStats());
}
// Receive by sender
for (VideoFrame& frame : frames) {
VideoFrame received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(kSenderPeerName, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(kSenderPeerName, received_frame);
}
// Check that we still have that frame in flight.
AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats();
std::vector<StatsSample> frames_in_flight_sizes =
GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
EXPECT_EQ(frames_in_flight_sizes.back().value, 3)
<< "Expected that frame is still in flight, "
<< "because it wasn't received by sender"
<< ToString(frames_in_flight_sizes);
// Receive by 2nd peer.
for (VideoFrame& frame : frames) {
VideoFrame received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
}
// Give analyzer some time to process frames on async thread. The computations
// have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
// means we have an issue!
SleepMs(100);
analyzer.Stop();
analyzer_stats = analyzer.GetAnalyzerStats();
EXPECT_EQ(analyzer_stats.comparisons_done, 6);
frames_in_flight_sizes =
GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
EXPECT_EQ(frames_in_flight_sizes.back().value, 0)
<< ToString(frames_in_flight_sizes);
FrameCounters frame_counters = analyzer.GetGlobalCounters();
EXPECT_EQ(frame_counters.captured, 3);
EXPECT_EQ(frame_counters.rendered, 6);
EXPECT_EQ(analyzer.GetStats().size(), 2lu);
{
FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
StatsKey(kStreamLabel, kSenderPeerName, kReceiverPeerName));
EXPECT_EQ(stream_conters.captured, 3);
EXPECT_EQ(stream_conters.pre_encoded, 3);
EXPECT_EQ(stream_conters.encoded, 3);
EXPECT_EQ(stream_conters.received, 3);
EXPECT_EQ(stream_conters.decoded, 3);
EXPECT_EQ(stream_conters.rendered, 3);
}
{
FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
StatsKey(kStreamLabel, kSenderPeerName, kSenderPeerName));
EXPECT_EQ(stream_conters.captured, 3);
EXPECT_EQ(stream_conters.pre_encoded, 3);
EXPECT_EQ(stream_conters.encoded, 3);
EXPECT_EQ(stream_conters.received, 3);
EXPECT_EQ(stream_conters.decoded, 3);
EXPECT_EQ(stream_conters.rendered, 3);
}
}

View File

@ -36,6 +36,8 @@ class MultiHeadQueue {
}
// Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()).
// `copy_index` - index of the queue that will be used as a source for
// copying.
void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); }
// Add value to the end of the queue. Complexity O(readers_count).