[DVQA] Change API to pause and resume all streams from a sender.

Also make it possible to pause an already paused stream by making it a no-op.

Change-Id: Id10f74a4c6464067ae63208162194f020c6470eb
Bug: b/271542055
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/298202
Commit-Queue: Jeremy Leconte <jleconte@google.com>
Reviewed-by: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39620}
This commit is contained in:
Jeremy Leconte 2023-03-21 14:18:16 +01:00 committed by WebRTC LUCI CQ
parent 1fabbac6b6
commit 2148f8ed71
6 changed files with 196 additions and 83 deletions

View File

@ -150,16 +150,15 @@ class VideoQualityAnalyzerInterface
// call.
virtual void UnregisterParticipantInCall(absl::string_view peer_name) {}
// Informs analyzer that peer `peer_name` is expected to receive stream
// `stream_label`.
virtual void OnPeerStartedReceiveVideoStream(absl::string_view peer_name,
absl::string_view stream_label) {
}
// Informs analyzer that peer `peer_name` shouldn't receive stream
// `stream_label`.
virtual void OnPeerStoppedReceiveVideoStream(absl::string_view peer_name,
absl::string_view stream_label) {
}
// Informs analyzer that peer `receiver_peer_name` shouldn't receive all
// streams from sender `sender_peer_name`.
virtual void OnPauseAllStreamsFrom(absl::string_view sender_peer_name,
absl::string_view receiver_peer_name) {}
// Informs analyzer that peer `receiver_peer_name` is expected to receive all
// streams from `sender_peer_name`.
virtual void OnResumeAllStreamsFrom(absl::string_view sender_peer_name,
absl::string_view receiver_peer_name) {}
// Tells analyzer that analysis complete and it should calculate final
// statistics.

View File

@ -719,32 +719,36 @@ void DefaultVideoQualityAnalyzer::UnregisterParticipantInCall(
}
}
void DefaultVideoQualityAnalyzer::OnPeerStartedReceiveVideoStream(
absl::string_view peer_name,
absl::string_view stream_label) {
void DefaultVideoQualityAnalyzer::OnPauseAllStreamsFrom(
absl::string_view sender_peer_name,
absl::string_view receiver_peer_name) {
MutexLock lock(&mutex_);
RTC_CHECK(peers_->HasName(peer_name));
size_t peer_index = peers_->index(peer_name);
RTC_CHECK(streams_.HasName(stream_label));
size_t stream_index = streams_.index(stream_label);
RTC_CHECK(peers_->HasName(sender_peer_name));
size_t sender_peer_index = peers_->index(sender_peer_name);
RTC_CHECK(peers_->HasName(receiver_peer_name));
size_t receiver_peer_index = peers_->index(receiver_peer_name);
auto it = stream_states_.find(stream_index);
RTC_CHECK(it != stream_states_.end());
it->second.GetPausableState(peer_index)->Resume();
for (auto& [unused, stream_state] : stream_states_) {
if (stream_state.sender() == sender_peer_index) {
stream_state.GetPausableState(receiver_peer_index)->Pause();
}
}
}
void DefaultVideoQualityAnalyzer::OnPeerStoppedReceiveVideoStream(
absl::string_view peer_name,
absl::string_view stream_label) {
void DefaultVideoQualityAnalyzer::OnResumeAllStreamsFrom(
absl::string_view sender_peer_name,
absl::string_view receiver_peer_name) {
MutexLock lock(&mutex_);
RTC_CHECK(peers_->HasName(peer_name));
size_t peer_index = peers_->index(peer_name);
RTC_CHECK(streams_.HasName(stream_label));
size_t stream_index = streams_.index(stream_label);
RTC_CHECK(peers_->HasName(sender_peer_name));
size_t sender_peer_index = peers_->index(sender_peer_name);
RTC_CHECK(peers_->HasName(receiver_peer_name));
size_t receiver_peer_index = peers_->index(receiver_peer_name);
auto it = stream_states_.find(stream_index);
RTC_CHECK(it != stream_states_.end());
it->second.GetPausableState(peer_index)->Pause();
for (auto& [unused, stream_state] : stream_states_) {
if (stream_state.sender() == sender_peer_index) {
stream_state.GetPausableState(receiver_peer_index)->Resume();
}
}
}
void DefaultVideoQualityAnalyzer::Stop() {

View File

@ -81,10 +81,10 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
void RegisterParticipantInCall(absl::string_view peer_name) override;
void UnregisterParticipantInCall(absl::string_view peer_name) override;
void OnPeerStartedReceiveVideoStream(absl::string_view peer_name,
absl::string_view stream_label) override;
void OnPeerStoppedReceiveVideoStream(absl::string_view peer_name,
absl::string_view stream_label) override;
void OnPauseAllStreamsFrom(absl::string_view sender_peer_name,
absl::string_view receiver_peer_name) override;
void OnResumeAllStreamsFrom(absl::string_view sender_peer_name,
absl::string_view receiver_peer_name) override;
void Stop() override;
std::string GetStreamLabel(uint16_t frame_id) override;

View File

@ -144,21 +144,23 @@ void FakeCPULoad() {
void PassFramesThroughAnalyzerSenderOnly(
DefaultVideoQualityAnalyzer& analyzer,
absl::string_view sender,
absl::string_view stream_label,
std::vector<absl::string_view> stream_labels,
std::vector<absl::string_view> receivers,
int frames_count,
test::FrameGeneratorInterface& frame_generator,
int interframe_delay_ms = 0,
TimeController* time_controller = nullptr) {
for (int i = 0; i < frames_count; ++i) {
VideoFrame frame = NextFrame(&frame_generator, /*timestamp_us=*/1);
uint16_t frame_id =
analyzer.OnFrameCaptured(sender, std::string(stream_label), frame);
frame.set_id(frame_id);
analyzer.OnFramePreEncode(sender, frame);
analyzer.OnFrameEncoded(sender, frame.id(), FakeEncode(frame),
VideoQualityAnalyzerInterface::EncoderStats(),
false);
for (absl::string_view stream_label : stream_labels) {
VideoFrame frame = NextFrame(&frame_generator, /*timestamp_us=*/1);
uint16_t frame_id =
analyzer.OnFrameCaptured(sender, std::string(stream_label), frame);
frame.set_id(frame_id);
analyzer.OnFramePreEncode(sender, frame);
analyzer.OnFrameEncoded(sender, frame.id(), FakeEncode(frame),
VideoQualityAnalyzerInterface::EncoderStats(),
false);
}
if (i < frames_count - 1 && interframe_delay_ms > 0) {
if (time_controller == nullptr) {
SleepMs(interframe_delay_ms);
@ -171,28 +173,30 @@ void PassFramesThroughAnalyzerSenderOnly(
void PassFramesThroughAnalyzer(DefaultVideoQualityAnalyzer& analyzer,
absl::string_view sender,
absl::string_view stream_label,
std::vector<absl::string_view> stream_labels,
std::vector<absl::string_view> receivers,
int frames_count,
test::FrameGeneratorInterface& frame_generator,
int interframe_delay_ms = 0,
TimeController* time_controller = nullptr) {
for (int i = 0; i < frames_count; ++i) {
VideoFrame frame = NextFrame(&frame_generator, /*timestamp_us=*/1);
uint16_t frame_id =
analyzer.OnFrameCaptured(sender, std::string(stream_label), frame);
frame.set_id(frame_id);
analyzer.OnFramePreEncode(sender, frame);
analyzer.OnFrameEncoded(sender, frame.id(), FakeEncode(frame),
VideoQualityAnalyzerInterface::EncoderStats(),
false);
for (absl::string_view receiver : receivers) {
VideoFrame received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(receiver, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(receiver, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(receiver, received_frame);
for (absl::string_view stream_label : stream_labels) {
VideoFrame frame = NextFrame(&frame_generator, /*timestamp_us=*/1);
uint16_t frame_id =
analyzer.OnFrameCaptured(sender, std::string(stream_label), frame);
frame.set_id(frame_id);
analyzer.OnFramePreEncode(sender, frame);
analyzer.OnFrameEncoded(sender, frame.id(), FakeEncode(frame),
VideoQualityAnalyzerInterface::EncoderStats(),
false);
for (absl::string_view receiver : receivers) {
VideoFrame received_frame = DeepCopy(frame);
analyzer.OnFramePreDecode(receiver, received_frame.id(),
FakeEncode(received_frame));
analyzer.OnFrameDecoded(receiver, received_frame,
VideoQualityAnalyzerInterface::DecoderStats());
analyzer.OnFrameRendered(receiver, received_frame);
}
}
if (i < frames_count - 1 && interframe_delay_ms > 0) {
if (time_controller == nullptr) {
@ -1612,10 +1616,10 @@ TEST(DefaultVideoQualityAnalyzerTest, ReceiverRemovedBeforeCapturing2ndFrame) {
analyzer.Start("test_case", std::vector<std::string>{"alice", "bob"},
kAnalyzerMaxThreadsCount);
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {"bob"},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {"bob"},
/*frames_count=*/1, *frame_generator);
analyzer.UnregisterParticipantInCall("bob");
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {},
/*frames_count=*/1, *frame_generator);
// Give analyzer some time to process frames on async thread. The computations
@ -1788,12 +1792,12 @@ TEST(DefaultVideoQualityAnalyzerTest, UnregisterOneAndRegisterAnother) {
std::vector<std::string>{"alice", "bob", "charlie"},
kAnalyzerMaxThreadsCount);
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video",
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"},
{"bob", "charlie"},
/*frames_count=*/2, *frame_generator);
analyzer.UnregisterParticipantInCall("bob");
analyzer.RegisterParticipantInCall("david");
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video",
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"},
{"charlie", "david"},
/*frames_count=*/4, *frame_generator);
@ -1850,14 +1854,14 @@ TEST(DefaultVideoQualityAnalyzerTest,
std::vector<std::string>{"alice", "bob", "charlie"},
kAnalyzerMaxThreadsCount);
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video",
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"},
{"bob", "charlie"},
/*frames_count=*/2, *frame_generator);
analyzer.UnregisterParticipantInCall("bob");
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {"charlie"},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {"charlie"},
/*frames_count=*/4, *frame_generator);
analyzer.RegisterParticipantInCall("bob");
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video",
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"},
{"bob", "charlie"},
/*frames_count=*/6, *frame_generator);
@ -1940,7 +1944,7 @@ TEST(DefaultVideoQualityAnalyzerTest, InfraMetricsAreReportedWhenRequested) {
analyzer.Start("test_case", std::vector<std::string>{"alice", "bob"},
kAnalyzerMaxThreadsCount);
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {"bob"},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {"bob"},
/*frames_count=*/1, *frame_generator);
// Give analyzer some time to process frames on async thread. The computations
@ -1972,7 +1976,7 @@ TEST(DefaultVideoQualityAnalyzerTest, InfraMetricsNotCollectedByDefault) {
analyzer.Start("test_case", std::vector<std::string>{"alice", "bob"},
kAnalyzerMaxThreadsCount);
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {"bob"},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {"bob"},
/*frames_count=*/1, *frame_generator);
// Give analyzer some time to process frames on async thread. The computations
@ -2017,7 +2021,7 @@ TEST(DefaultVideoQualityAnalyzerTest,
VideoFrame received_to_be_dropped_frame = DeepCopy(to_be_dropped_frame);
analyzer.OnFramePreDecode("bob", received_to_be_dropped_frame.id(),
FakeEncode(received_to_be_dropped_frame));
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {"bob"},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {"bob"},
/*frames_count=*/1, *frame_generator);
// Give analyzer some time to process frames on async thread. The computations
@ -2046,7 +2050,7 @@ TEST_P(DefaultVideoQualityAnalyzerTimeBetweenFreezesTest,
analyzer.Start("test_case", std::vector<std::string>{"alice", "bob"},
kAnalyzerMaxThreadsCount);
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {"bob"},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {"bob"},
/*frames_count=*/5, *frame_generator,
/*interframe_delay_ms=*/50);
if (GetParam()) {
@ -2102,25 +2106,25 @@ TEST_F(DefaultVideoQualityAnalyzerSimulatedTimeTest,
kAnalyzerMaxThreadsCount);
// Pass 20 frames as 20 fps.
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video",
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"},
{"bob", "charlie"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
AdvanceTime(TimeDelta::Millis(50));
// Mark stream paused for Bob, but not for Charlie.
analyzer.OnPeerStoppedReceiveVideoStream("bob", "alice_video");
analyzer.OnPauseAllStreamsFrom("alice", "bob");
// Freeze for 1 second.
PassFramesThroughAnalyzerSenderOnly(
analyzer, "alice", "alice_video", {"bob", "charlie"},
analyzer, "alice", {"alice_video"}, {"bob", "charlie"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
AdvanceTime(TimeDelta::Millis(50));
// Unpause stream for Bob.
analyzer.OnPeerStartedReceiveVideoStream("bob", "alice_video");
analyzer.OnResumeAllStreamsFrom("alice", "bob");
// Pass 20 frames as 20 fps.
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video",
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"},
{"bob", "charlie"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
@ -2162,6 +2166,84 @@ TEST_F(DefaultVideoQualityAnalyzerSimulatedTimeTest,
EXPECT_NEAR(charlie_stream_stats.harmonic_framerate_fps, 2.463465, 1e-6);
}
TEST_F(DefaultVideoQualityAnalyzerSimulatedTimeTest,
PausedAndResumedTwoStreamsAreAccountedInStatsCorrectly) {
std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
/*type=*/absl::nullopt,
/*num_squares=*/absl::nullopt);
DefaultVideoQualityAnalyzer analyzer(
GetClock(), test::GetGlobalMetricsLogger(), AnalyzerOptionsForTest());
analyzer.Start("test_case", std::vector<std::string>{"alice", "bob"},
kAnalyzerMaxThreadsCount);
// Pass 20 frames as 20 fps on 2 streams.
PassFramesThroughAnalyzer(analyzer, "alice",
{"alice_video_1", "alice_video_2"}, {"bob"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
AdvanceTime(TimeDelta::Millis(50));
// Mark streams paused.
analyzer.OnPauseAllStreamsFrom("alice", "bob");
// Freeze for 1 second.
PassFramesThroughAnalyzerSenderOnly(
analyzer, "alice", {"alice_video_1", "alice_video_2"}, {"bob"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
AdvanceTime(TimeDelta::Millis(50));
// Unpause streams.
analyzer.OnResumeAllStreamsFrom("alice", "bob");
// Pass 20 frames as 20 fps on the 2 streams.
PassFramesThroughAnalyzer(analyzer, "alice",
{"alice_video_1", "alice_video_2"}, {"bob"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
analyzer.Stop();
// Bob should have 20 fps without freeze on both streams.
std::map<StatsKey, StreamStats> streams_stats = analyzer.GetStats();
std::map<StatsKey, FrameCounters> frame_counters =
analyzer.GetPerStreamCounters();
StreamStats bob_stream_stats1 =
streams_stats.at(StatsKey("alice_video_1", "bob"));
FrameCounters bob_frame_counters1 =
frame_counters.at(StatsKey("alice_video_1", "bob"));
EXPECT_THAT(bob_frame_counters1.dropped, Eq(0));
EXPECT_THAT(bob_frame_counters1.rendered, Eq(40));
EXPECT_THAT(GetTimeSortedValues(bob_stream_stats1.freeze_time_ms),
ElementsAre(0.0));
// TODO(bugs.webrtc.org/14995): value should exclude pause
EXPECT_THAT(GetTimeSortedValues(bob_stream_stats1.time_between_freezes_ms),
ElementsAre(2950.0));
// TODO(bugs.webrtc.org/14995): Fix capture_frame_rate (has to be ~20.0)
ExpectRateIs(bob_stream_stats1.capture_frame_rate, 13.559322);
// TODO(bugs.webrtc.org/14995): Fix encode_frame_rate (has to be ~20.0)
ExpectRateIs(bob_stream_stats1.encode_frame_rate, 13.559322);
EXPECT_DOUBLE_EQ(bob_stream_stats1.harmonic_framerate_fps, 20.0);
// Bob should have 20 fps without freeze on both streams.
StreamStats bob_stream_stats_2 =
streams_stats.at(StatsKey("alice_video_2", "bob"));
FrameCounters bob_frame_counters_2 =
frame_counters.at(StatsKey("alice_video_2", "bob"));
EXPECT_THAT(bob_frame_counters_2.dropped, Eq(0));
EXPECT_THAT(bob_frame_counters_2.rendered, Eq(40));
EXPECT_THAT(GetTimeSortedValues(bob_stream_stats_2.freeze_time_ms),
ElementsAre(0.0));
// TODO(bugs.webrtc.org/14995): value should exclude pause
EXPECT_THAT(GetTimeSortedValues(bob_stream_stats_2.time_between_freezes_ms),
ElementsAre(2950.0));
// TODO(bugs.webrtc.org/14995): Fix capture_frame_rate (has to be ~20.0)
ExpectRateIs(bob_stream_stats_2.capture_frame_rate, 13.559322);
// TODO(bugs.webrtc.org/14995): Fix encode_frame_rate (has to be ~20.0)
ExpectRateIs(bob_stream_stats_2.encode_frame_rate, 13.559322);
EXPECT_DOUBLE_EQ(bob_stream_stats_2.harmonic_framerate_fps, 20.0);
}
TEST_F(DefaultVideoQualityAnalyzerSimulatedTimeTest,
PausedStreamIsAccountedInStatsCorrectly) {
std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
@ -2178,23 +2260,23 @@ TEST_F(DefaultVideoQualityAnalyzerSimulatedTimeTest,
kAnalyzerMaxThreadsCount);
// Pass 20 frames as 20 fps.
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video",
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"},
{"bob", "charlie"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
AdvanceTime(TimeDelta::Millis(50));
// Mark stream paused for Bob, but not for Charlie.
analyzer.OnPeerStoppedReceiveVideoStream("bob", "alice_video");
analyzer.OnPauseAllStreamsFrom("alice", "bob");
// Freeze for 1 second.
PassFramesThroughAnalyzerSenderOnly(
analyzer, "alice", "alice_video", {"bob", "charlie"},
analyzer, "alice", {"alice_video"}, {"bob", "charlie"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());
AdvanceTime(TimeDelta::Millis(50));
// Pass 20 frames as 20 fps.
PassFramesThroughAnalyzer(analyzer, "alice", "alice_video", {"charlie"},
PassFramesThroughAnalyzer(analyzer, "alice", {"alice_video"}, {"charlie"},
/*frames_count=*/20, *frame_generator,
/*interframe_delay_ms=*/50, time_controller());

View File

@ -19,13 +19,15 @@
namespace webrtc {
void PausableState::Pause() {
RTC_CHECK(!IsPaused());
events_.push_back(Event{.time = clock_->CurrentTime(), .is_paused = true});
if (!IsPaused()) {
events_.push_back(Event{.time = clock_->CurrentTime(), .is_paused = true});
}
}
void PausableState::Resume() {
RTC_CHECK(IsPaused());
events_.push_back(Event{.time = clock_->CurrentTime(), .is_paused = false});
if (IsPaused()) {
events_.push_back(Event{.time = clock_->CurrentTime(), .is_paused = false});
}
}
bool PausableState::IsPaused() const {

View File

@ -59,6 +59,32 @@ TEST_F(PausableStateTest, IsActiveAfterResume) {
EXPECT_FALSE(state.IsPaused());
}
TEST_F(PausableStateTest, PauseAlreadyPausedIsNoOp) {
PausableState state(GetClock());
AdvanceTime(TimeDelta::Seconds(1));
Timestamp test_time = Now();
state.Pause();
AdvanceTime(TimeDelta::Seconds(1));
state.Pause();
EXPECT_EQ(state.GetLastEventTime(), test_time);
}
TEST_F(PausableStateTest, ResumeAlreadyResumedIsNoOp) {
PausableState state(GetClock());
AdvanceTime(TimeDelta::Seconds(1));
state.Pause();
AdvanceTime(TimeDelta::Seconds(1));
Timestamp test_time = Now();
state.Resume();
AdvanceTime(TimeDelta::Seconds(1));
state.Resume();
EXPECT_EQ(state.GetLastEventTime(), test_time);
}
TEST_F(PausableStateTest, WasPausedAtFalseWhenMultiplePauseResumeAtSameTime) {
PausableState state(GetClock());