Reland of the test portion of:

https://webrtc-review.googlesource.com/c/src/+/172847

------------ original description --------------

Preparation for ReceiveStatisticsProxy lock reduction.

Update tests to call VideoReceiveStream::GetStats() in the same or at
least similar way it gets called in production (construction thread,
same TQ/thread).

Mapped out threads and context for ReceiveStatisticsProxy,
VideoQualityObserver and VideoReceiveStream. Added
follow-up TODOs for webrtc:11489.

One functional change in ReceiveStatisticsProxy is that when sender
side RtcpPacketTypesCounterUpdated calls are made, the counter is
updated asynchronously since the sender calls the method on a different
thread than the receiver.

Make CallClient::SendTask public to allow tests to run tasks in the
right context. CallClient already does this internally for GetStats.

Remove 10 sec sleep in StopSendingKeyframeRequestsForInactiveStream.

Bug: webrtc:11489
Change-Id: I491e13344b9fa714de0741dd927d907de7e39e83
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/173583
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31077}
This commit is contained in:
Tommi 2020-04-15 16:45:47 +02:00 committed by Commit Bot
parent 4a5bce96e8
commit 3c9bcc1f7a
13 changed files with 440 additions and 69 deletions

View File

@ -547,6 +547,7 @@ if (rtc_include_tests) {
"rtc_base:weak_ptr_unittests",
"rtc_base/experiments:experiments_unittests",
"rtc_base/synchronization:sequence_checker_unittests",
"rtc_base/task_utils:pending_task_safety_flag_unittests",
"rtc_base/task_utils:to_queued_task_unittests",
"sdk:sdk_tests",
"test:rtp_test_utils",

View File

@ -96,21 +96,24 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
static const int kMinRunTimeMs = 30000;
public:
explicit VideoRtcpAndSyncObserver(Clock* clock, const std::string& test_label)
explicit VideoRtcpAndSyncObserver(TaskQueueBase* task_queue,
Clock* clock,
const std::string& test_label)
: test::RtpRtcpObserver(CallPerfTest::kLongTimeoutMs),
clock_(clock),
test_label_(test_label),
creation_time_ms_(clock_->TimeInMilliseconds()),
first_time_in_sync_(-1),
receive_stream_(nullptr) {}
task_queue_(task_queue) {}
void OnFrame(const VideoFrame& video_frame) override {
VideoReceiveStream::Stats stats;
{
rtc::CritScope lock(&crit_);
if (receive_stream_)
stats = receive_stream_->GetStats();
}
task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
}
void CheckStats() {
if (!receive_stream_)
return;
VideoReceiveStream::Stats stats = receive_stream_->GetStats();
if (stats.sync_offset_ms == std::numeric_limits<int>::max())
return;
@ -135,7 +138,8 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
}
void set_receive_stream(VideoReceiveStream* receive_stream) {
rtc::CritScope lock(&crit_);
RTC_DCHECK_EQ(task_queue_, TaskQueueBase::Current());
// Note that receive_stream may be nullptr.
receive_stream_ = receive_stream;
}
@ -148,10 +152,10 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
Clock* const clock_;
std::string test_label_;
const int64_t creation_time_ms_;
int64_t first_time_in_sync_;
rtc::CriticalSection crit_;
VideoReceiveStream* receive_stream_ RTC_GUARDED_BY(crit_);
int64_t first_time_in_sync_ = -1;
VideoReceiveStream* receive_stream_ = nullptr;
std::vector<double> sync_offset_ms_list_;
TaskQueueBase* const task_queue_;
};
void CallPerfTest::TestAudioVideoSync(FecMode fec,
@ -168,7 +172,8 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_net_config.queue_delay_ms = 500;
audio_net_config.loss_percent = 5;
VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock(), test_label);
auto observer = std::make_unique<VideoRtcpAndSyncObserver>(
task_queue(), Clock::GetRealTimeClock(), test_label);
std::map<uint8_t, MediaType> audio_pt_map;
std::map<uint8_t, MediaType> video_pt_map;
@ -218,7 +223,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
});
audio_send_transport = std::make_unique<test::PacketTransport>(
task_queue(), sender_call_.get(), &observer,
task_queue(), sender_call_.get(), observer.get(),
test::PacketTransport::kSender, audio_pt_map,
std::make_unique<FakeNetworkPipe>(
Clock::GetRealTimeClock(),
@ -226,7 +231,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_send_transport->SetReceiver(receiver_call_->Receiver());
video_send_transport = std::make_unique<test::PacketTransport>(
task_queue(), sender_call_.get(), &observer,
task_queue(), sender_call_.get(), observer.get(),
test::PacketTransport::kSender, video_pt_map,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@ -234,7 +239,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
video_send_transport->SetReceiver(receiver_call_->Receiver());
receive_transport = std::make_unique<test::PacketTransport>(
task_queue(), receiver_call_.get(), &observer,
task_queue(), receiver_call_.get(), observer.get(),
test::PacketTransport::kReceiver, payload_type_map_,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@ -259,7 +264,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
video_receive_configs_[0].rtp.ulpfec_payload_type = kUlpfecPayloadType;
}
video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000;
video_receive_configs_[0].renderer = &observer;
video_receive_configs_[0].renderer = observer.get();
video_receive_configs_[0].sync_group = kSyncGroup;
AudioReceiveStream::Config audio_recv_config;
@ -281,7 +286,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
receiver_call_->CreateAudioReceiveStream(audio_recv_config);
}
EXPECT_EQ(1u, video_receive_streams_.size());
observer.set_receive_stream(video_receive_streams_[0]);
observer->set_receive_stream(video_receive_streams_[0]);
drifting_clock = std::make_unique<DriftingClock>(clock_, video_ntp_speed);
CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed,
kDefaultFramerate, kDefaultWidth,
@ -293,10 +298,13 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_receive_stream->Start();
});
EXPECT_TRUE(observer.Wait())
EXPECT_TRUE(observer->Wait())
<< "Timed out while waiting for audio and video to be synchronized.";
SendTask(RTC_FROM_HERE, task_queue(), [&]() {
// Clear the pointer to the receive stream since it will now be deleted.
observer->set_receive_stream(nullptr);
audio_send_stream->Stop();
audio_receive_stream->Stop();
@ -314,7 +322,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
DestroyCalls();
});
observer.PrintResults();
observer->PrintResults();
// In quick test synchronization may not be achieved in time.
if (!field_trial::IsEnabled("WebRTC-QuickPerfTest")) {
@ -323,6 +331,9 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.AVSyncOffsetInMs"));
#endif
}
task_queue()->PostTask(
ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
}
TEST_F(CallPerfTest, PlaysOutAudioAndVideoInSyncWithoutClockDrift) {

View File

@ -526,9 +526,9 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
test::NetworkSimulationConfig net_conf;
net_conf.bandwidth = DataRate::KilobitsPerSec(300);
auto send_node = s.CreateSimulationNode(net_conf);
auto* callee = s.CreateClient("return", call_conf);
auto* route = s.CreateRoutes(s.CreateClient("send", call_conf), {send_node},
s.CreateClient("return", call_conf),
{s.CreateSimulationNode(net_conf)});
callee, {s.CreateSimulationNode(net_conf)});
test::VideoStreamConfig lossy_config;
lossy_config.source.framerate = 5;
@ -556,14 +556,20 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
// from initial probing.
s.RunFor(TimeDelta::Seconds(1));
rtx_packets = 0;
int decoded_baseline = lossy->receive()->GetStats().frames_decoded;
int decoded_baseline = 0;
callee->SendTask([&decoded_baseline, &lossy]() {
decoded_baseline = lossy->receive()->GetStats().frames_decoded;
});
s.RunFor(TimeDelta::Seconds(1));
// We expect both that RTX packets were sent and that an appropriate number of
// frames were received. This is somewhat redundant but reduces the risk of
// false positives in future regressions (e.g. RTX is send due to probing).
EXPECT_GE(rtx_packets, 1);
int frames_decoded =
lossy->receive()->GetStats().frames_decoded - decoded_baseline;
int frames_decoded = 0;
callee->SendTask([&decoded_baseline, &frames_decoded, &lossy]() {
frames_decoded =
lossy->receive()->GetStats().frames_decoded - decoded_baseline;
});
EXPECT_EQ(frames_decoded, 5);
}

View File

@ -537,8 +537,8 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) {
auto ret_net = {s.CreateSimulationNode(net_conf)};
auto* client = s.CreateClient("send", CallClientConfig());
auto* route = s.CreateRoutes(
client, send_net, s.CreateClient("return", CallClientConfig()), ret_net);
auto* callee = s.CreateClient("return", CallClientConfig());
auto* route = s.CreateRoutes(client, send_net, callee, ret_net);
// TODO(srte): Make this work with RTX enabled or remove it.
auto* video = s.CreateVideoStream(route->forward(), [](VideoStreamConfig* c) {
c->stream.use_rtx = false;
@ -553,9 +553,17 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) {
s.net()->StopCrossTraffic(tcp_traffic);
s.RunFor(TimeDelta::Seconds(20));
}
return DataSize::Bytes(video->receive()
->GetStats()
.rtp_stats.packet_counter.TotalBytes()) /
// Querying the video stats from within the expected runtime environment
// (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
// we're currently on).
VideoReceiveStream::Stats video_receive_stats;
auto* video_stream = video->receive();
callee->SendTask([&video_stream, &video_receive_stats]() {
video_receive_stats = video_stream->GetStats();
});
return DataSize::Bytes(
video_receive_stats.rtp_stats.packet_counter.TotalBytes()) /
s.TimeSinceStart();
}

View File

@ -26,12 +26,39 @@ rtc_library("repeating_task") {
]
}
rtc_library("pending_task_safety_flag") {
sources = [
"pending_task_safety_flag.cc",
"pending_task_safety_flag.h",
]
deps = [
"..:checks",
"..:refcount",
"..:thread_checker",
"../../api:scoped_refptr",
"../synchronization:sequence_checker",
]
}
rtc_source_set("to_queued_task") {
sources = [ "to_queued_task.h" ]
deps = [ "../../api/task_queue" ]
}
if (rtc_include_tests) {
rtc_library("pending_task_safety_flag_unittests") {
testonly = true
sources = [ "pending_task_safety_flag_unittest.cc" ]
deps = [
":pending_task_safety_flag",
":to_queued_task",
"..:rtc_base_approved",
"..:rtc_task_queue",
"..:task_queue_for_test",
"../../test:test_support",
]
}
rtc_library("repeating_task_unittests") {
testonly = true
sources = [ "repeating_task_unittest.cc" ]

View File

@ -0,0 +1,32 @@
/*
* Copyright 2020 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/ref_counted_object.h"
namespace webrtc {
// static
PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
}
void PendingTaskSafetyFlag::SetNotAlive() {
RTC_DCHECK_RUN_ON(&main_sequence_);
alive_ = false;
}
bool PendingTaskSafetyFlag::alive() const {
RTC_DCHECK_RUN_ON(&main_sequence_);
return alive_;
}
} // namespace webrtc

View File

@ -0,0 +1,61 @@
/*
* Copyright 2020 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
#include "api/scoped_refptr.h"
#include "rtc_base/checks.h"
#include "rtc_base/ref_count.h"
#include "rtc_base/synchronization/sequence_checker.h"
namespace webrtc {
// Use this flag to drop pending tasks that have been posted to the "main"
// thread/TQ and end up running after the owning instance has been
// deleted. The owning instance signals deletion by calling SetNotAlive() from
// its destructor.
//
// When posting a task, post a copy (capture by-value in a lambda) of the flag
// instance and before performing the work, check the |alive()| state. Abort if
// alive() returns |false|:
//
// // Running outside of the main thread.
// my_task_queue_->PostTask(ToQueuedTask(
// [safety = pending_task_safety_flag_, this]() {
// // Now running on the main thread.
// if (!safety->alive())
// return;
// MyMethod();
// }));
//
// Note that checking the state only works on the construction/destruction
// thread of the ReceiveStatisticsProxy instance.
class PendingTaskSafetyFlag : public rtc::RefCountInterface {
public:
using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
static Pointer Create();
~PendingTaskSafetyFlag() = default;
void SetNotAlive();
bool alive() const;
protected:
PendingTaskSafetyFlag() = default;
private:
bool alive_ = true;
SequenceChecker main_sequence_;
};
} // namespace webrtc
#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_

View File

@ -0,0 +1,151 @@
/*
* Copyright 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include <memory>
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
using ::testing::AtLeast;
using ::testing::Invoke;
using ::testing::MockFunction;
using ::testing::NiceMock;
using ::testing::Return;
} // namespace
TEST(PendingTaskSafetyFlagTest, Basic) {
PendingTaskSafetyFlag::Pointer safety_flag;
{
// Scope for the |owner| instance.
class Owner {
public:
Owner() = default;
~Owner() { flag_->SetNotAlive(); }
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
} owner;
EXPECT_TRUE(owner.flag_->alive());
safety_flag = owner.flag_;
EXPECT_TRUE(safety_flag->alive());
}
EXPECT_FALSE(safety_flag->alive());
}
TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) {
TaskQueueForTest tq1("OwnerHere");
TaskQueueForTest tq2("OwnerNotHere");
class Owner {
public:
Owner() : tq_main_(TaskQueueBase::Current()) { RTC_DCHECK(tq_main_); }
~Owner() {
RTC_DCHECK(tq_main_->IsCurrent());
flag_->SetNotAlive();
}
void DoStuff() {
RTC_DCHECK(!tq_main_->IsCurrent());
tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
if (!safe->alive())
return;
stuff_done_ = true;
}));
}
bool stuff_done() const { return stuff_done_; }
private:
TaskQueueBase* const tq_main_;
bool stuff_done_ = false;
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
};
std::unique_ptr<Owner> owner;
tq1.SendTask(
[&owner]() {
owner.reset(new Owner());
EXPECT_FALSE(owner->stuff_done());
},
RTC_FROM_HERE);
ASSERT_TRUE(owner);
tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
tq1.SendTask(
[&owner]() {
EXPECT_TRUE(owner->stuff_done());
owner.reset();
},
RTC_FROM_HERE);
ASSERT_FALSE(owner);
}
TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) {
TaskQueueForTest tq1("OwnerHere");
TaskQueueForTest tq2("OwnerNotHere");
class Owner {
public:
explicit Owner(bool* stuff_done)
: tq_main_(TaskQueueBase::Current()), stuff_done_(stuff_done) {
RTC_DCHECK(tq_main_);
*stuff_done_ = false;
}
~Owner() {
RTC_DCHECK(tq_main_->IsCurrent());
flag_->SetNotAlive();
}
void DoStuff() {
RTC_DCHECK(!tq_main_->IsCurrent());
tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
if (!safe->alive())
return;
*stuff_done_ = true;
}));
}
private:
TaskQueueBase* const tq_main_;
bool* const stuff_done_;
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
};
std::unique_ptr<Owner> owner;
bool stuff_done = false;
tq1.SendTask([&owner, &stuff_done]() { owner.reset(new Owner(&stuff_done)); },
RTC_FROM_HERE);
ASSERT_TRUE(owner);
// Queue up a task on tq1 that will execute before the 'DoStuff' task
// can, and delete the |owner| before the 'stuff' task can execute.
rtc::Event blocker;
tq1.PostTask([&blocker, &owner]() {
blocker.Wait(rtc::Event::kForever);
owner.reset();
});
// Queue up a DoStuff...
tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
ASSERT_TRUE(owner);
blocker.Set();
// Run an empty task on tq1 to flush all the queued tasks.
tq1.SendTask([]() {}, RTC_FROM_HERE);
ASSERT_FALSE(owner);
EXPECT_FALSE(stuff_done);
}
} // namespace webrtc

View File

@ -113,6 +113,11 @@ class CallClient : public EmulatedNetworkReceiverInterface {
void OnPacketReceived(EmulatedIpPacket packet) override;
std::unique_ptr<RtcEventLogOutput> GetLogWriter(std::string name);
// Exposed publicly so that tests can execute tasks such as querying stats
// for media streams in the expected runtime environment (essentially what
// CallClient does internally for GetStats()).
void SendTask(std::function<void()> task);
private:
friend class Scenario;
friend class CallClientPair;
@ -129,7 +134,6 @@ class CallClient : public EmulatedNetworkReceiverInterface {
uint32_t GetNextAudioLocalSsrc();
uint32_t GetNextRtxSsrc();
void AddExtensions(std::vector<RtpExtension> extensions);
void SendTask(std::function<void()> task);
int16_t Bind(EmulatedEndpoint* endpoint);
void UnBind();

View File

@ -25,17 +25,26 @@ void CreateAnalyzedStream(Scenario* s,
VideoStreamConfig::Encoder::Implementation::kSoftware;
config.hooks.frame_pair_handlers = {analyzer->Handler()};
auto* caller = s->CreateClient("caller", CallClientConfig());
auto* callee = s->CreateClient("callee", CallClientConfig());
auto route =
s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)},
s->CreateClient("callee", CallClientConfig()),
s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, callee,
{s->CreateSimulationNode(NetworkSimulationConfig())});
auto* video = s->CreateVideoStream(route->forward(), config);
VideoStreamPair* video = s->CreateVideoStream(route->forward(), config);
auto* audio = s->CreateAudioStream(route->forward(), AudioStreamConfig());
s->Every(TimeDelta::Seconds(1), [=] {
collectors->call.AddStats(caller->GetStats());
collectors->audio_receive.AddStats(audio->receive()->GetStats());
collectors->video_send.AddStats(video->send()->GetStats(), s->Now());
collectors->video_receive.AddStats(video->receive()->GetStats());
collectors->audio_receive.AddStats(audio->receive()->GetStats());
// Querying the video stats from within the expected runtime environment
// (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
// we're currently on).
VideoReceiveStream::Stats video_receive_stats;
auto* video_stream = video->receive();
callee->SendTask([&video_stream, &video_receive_stats]() {
video_receive_stats = video_stream->GetStats();
});
collectors->video_receive.AddStats(video_receive_stats);
});
}
} // namespace

View File

@ -115,6 +115,7 @@ rtc_library("video") {
"../rtc_base/experiments:rate_control_settings",
"../rtc_base/synchronization:sequence_checker",
"../rtc_base/system:thread_registry",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:repeating_task",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/time:timestamp_extrapolator",

View File

@ -18,8 +18,8 @@
#include "call/simulated_network.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "modules/video_coding/codecs/vp8/include/vp8.h"
#include "rtc_base/event.h"
#include "rtc_base/task_queue_for_test.h"
#include "system_wrappers/include/sleep.h"
#include "test/call_test.h"
#include "test/field_trial.h"
#include "test/gtest.h"
@ -203,7 +203,7 @@ TEST_F(RetransmissionEndToEndTest, ReceivesNackAndRetransmitsAudio) {
TEST_F(RetransmissionEndToEndTest,
StopSendingKeyframeRequestsForInactiveStream) {
class KeyframeRequestObserver : public test::EndToEndTest {
class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask {
public:
explicit KeyframeRequestObserver(TaskQueueBase* task_queue)
: clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {}
@ -216,28 +216,59 @@ TEST_F(RetransmissionEndToEndTest,
receive_stream_ = receive_streams[0];
}
void PerformTest() override {
bool frame_decoded = false;
int64_t start_time = clock_->TimeInMilliseconds();
while (clock_->TimeInMilliseconds() - start_time <= 5000) {
if (receive_stream_->GetStats().frames_decoded > 0) {
frame_decoded = true;
break;
}
SleepMs(100);
Action OnReceiveRtcp(const uint8_t* packet, size_t length) override {
test::RtcpPacketParser parser;
EXPECT_TRUE(parser.Parse(packet, length));
if (parser.pli()->num_packets() > 0)
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
return SEND_PACKET;
}
bool PollStats() {
if (receive_stream_->GetStats().frames_decoded > 0) {
frame_decoded_ = true;
} else if (clock_->TimeInMilliseconds() - start_time_ < 5000) {
task_queue_->PostDelayedTask(std::unique_ptr<QueuedTask>(this), 100);
return false;
}
ASSERT_TRUE(frame_decoded);
SendTask(RTC_FROM_HERE, task_queue_, [this]() { send_stream_->Stop(); });
SleepMs(10000);
ASSERT_EQ(
1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
return true;
}
void PerformTest() override {
start_time_ = clock_->TimeInMilliseconds();
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
test_done_.Wait(rtc::Event::kForever);
}
bool Run() override {
if (!frame_decoded_) {
if (PollStats()) {
send_stream_->Stop();
if (!frame_decoded_) {
test_done_.Set();
} else {
// Now we wait for the PLI packet. Once we receive it, a task
// will be posted (see OnReceiveRtcp) and we'll check the stats
// once more before signaling that we're done.
}
}
} else {
EXPECT_EQ(
1U,
receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
test_done_.Set();
}
return false;
}
private:
Clock* clock_;
Clock* const clock_;
VideoSendStream* send_stream_;
VideoReceiveStream* receive_stream_;
TaskQueueBase* const task_queue_;
rtc::Event test_done_;
bool frame_decoded_ = false;
int64_t start_time_ = 0;
} test(task_queue());
RunBaseTest(&test);

View File

@ -297,6 +297,7 @@ TEST_F(StatsEndToEndTest, GetStats) {
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
@ -307,8 +308,10 @@ TEST_F(StatsEndToEndTest, GetStats) {
bool send_ok = false;
while (now_ms < stop_time_ms) {
if (!receive_ok)
receive_ok = CheckReceiveStats();
if (!receive_ok && task_queue_) {
SendTask(RTC_FROM_HERE, task_queue_,
[&]() { receive_ok = CheckReceiveStats(); });
}
if (!send_ok)
send_ok = CheckSendStats();
@ -346,6 +349,7 @@ TEST_F(StatsEndToEndTest, GetStats) {
rtc::Event check_stats_event_;
ReceiveStreamRenderer receive_stream_renderer_;
TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@ -377,22 +381,28 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) {
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_streams_ = receive_streams;
task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
// No frames reported initially.
for (const auto& receive_stream : receive_streams_) {
EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
}
SendTask(RTC_FROM_HERE, task_queue_, [&]() {
for (const auto& receive_stream : receive_streams_) {
EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
}
});
// Wait for at least one timing frame to be sent with 100ms grace period.
SleepMs(kDefaultTimingFramesDelayMs + 100);
// Check that timing frames are reported for each stream.
for (const auto& receive_stream : receive_streams_) {
EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
}
SendTask(RTC_FROM_HERE, task_queue_, [&]() {
for (const auto& receive_stream : receive_streams_) {
EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
}
});
}
std::vector<VideoReceiveStream*> receive_streams_;
TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@ -400,7 +410,8 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) {
TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
static const size_t kNumRtpPacketsToSend = 5;
class ReceivedRtpStatsObserver : public test::EndToEndTest {
class ReceivedRtpStatsObserver : public test::EndToEndTest,
public QueuedTask {
public:
ReceivedRtpStatsObserver()
: EndToEndTest(kDefaultTimeoutMs),
@ -412,14 +423,14 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_stream_ = receive_streams[0];
task_queue_ = TaskQueueBase::Current();
EXPECT_TRUE(task_queue_ != nullptr);
}
Action OnSendRtp(const uint8_t* packet, size_t length) override {
if (sent_rtp_ >= kNumRtpPacketsToSend) {
VideoReceiveStream::Stats stats = receive_stream_->GetStats();
if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
observation_complete_.Set();
}
// Need to check the stats on the correct thread.
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
return DROP_PACKET;
}
++sent_rtp_;
@ -431,8 +442,17 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
<< "Timed out while verifying number of received RTP packets.";
}
bool Run() override {
VideoReceiveStream::Stats stats = receive_stream_->GetStats();
if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
observation_complete_.Set();
}
return false;
}
VideoReceiveStream* receive_stream_;
uint32_t sent_rtp_;
TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@ -578,7 +598,7 @@ TEST_F(StatsEndToEndTest, MAYBE_ContentTypeSwitches) {
TEST_F(StatsEndToEndTest, VerifyNackStats) {
static const int kPacketNumberToDrop = 200;
class NackObserver : public test::EndToEndTest {
class NackObserver : public test::EndToEndTest, public QueuedTask {
public:
NackObserver()
: EndToEndTest(kLongTimeoutMs),
@ -598,7 +618,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
dropped_rtp_packet_ = header.sequenceNumber;
return DROP_PACKET;
}
VerifyStats();
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
return SEND_PACKET;
}
@ -659,6 +679,14 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
task_queue_ = TaskQueueBase::Current();
EXPECT_TRUE(task_queue_ != nullptr);
}
bool Run() override {
rtc::CritScope lock(&crit_);
VerifyStats();
return false;
}
void PerformTest() override {
@ -673,6 +701,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
std::vector<VideoReceiveStream*> receive_streams_;
VideoSendStream* send_stream_;
absl::optional<int64_t> start_runtime_ms_;
TaskQueueBase* task_queue_ = nullptr;
} test;
metrics::Reset();