Replace the implementation of GetContributingSources() on the video side.

This change replaces the `ContributingSources`-implementation of `GetContributingSources()` and `GetSynchronizationSources()` on the video side with the spec-compliant `SourceTracker`-implementation.

The most noticeable impact is that the per-frame dictionaries are now updated when frames are delivered to the RTCRtpReceiver's MediaStreamTrack rather than when RTP packets are received on the network.

Bug: webrtc:10545
Change-Id: I895b5790280ac94c1501801d226c643633c67349
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/143177
Reviewed-by: Niels Moller <nisse@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Commit-Queue: Chen Xing <chxg@google.com>
Cr-Commit-Position: refs/heads/master@{#28386}
This commit is contained in:
Chen Xing 2019-06-25 10:16:14 +02:00 committed by Commit Bot
parent 3472b9ae22
commit 90f3b89550
6 changed files with 133 additions and 162 deletions

View File

@ -60,6 +60,8 @@
#include "test/gmock.h"
using ::testing::Field;
using ::testing::IsEmpty;
using ::testing::SizeIs;
using webrtc::BitrateConstraints;
using webrtc::RtpExtension;
@ -7514,137 +7516,44 @@ TEST_F(WebRtcVideoChannelSimulcastTest,
false);
}
class WebRtcVideoFakeClock {
public:
WebRtcVideoFakeClock() {
fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1)); // avoid time=0
}
rtc::ScopedFakeClock fake_clock_;
};
TEST_F(WebRtcVideoChannelBaseTest, GetSources) {
EXPECT_THAT(channel_->GetSources(kSsrc), IsEmpty());
// The fake clock needs to be initialized before the call, and not
// destroyed until after all threads spawned by the test have been stopped.
// This mixin ensures that.
class WebRtcVideoChannelTestWithClock : public WebRtcVideoFakeClock,
public WebRtcVideoChannelBaseTest {};
TEST_F(WebRtcVideoChannelTestWithClock, GetSources) {
uint8_t data1[] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
rtc::CopyOnWriteBuffer packet1(data1, sizeof(data1));
rtc::SetBE32(packet1.data() + 8, kSsrc);
channel_->SetSink(kDefaultReceiveSsrc, NULL);
EXPECT_TRUE(channel_->SetSink(kDefaultReceiveSsrc, &renderer_));
EXPECT_TRUE(SetDefaultCodec());
EXPECT_TRUE(SetSend(true));
EXPECT_EQ(0, renderer_.num_rendered_frames());
channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
EXPECT_EQ(renderer_.num_rendered_frames(), 0);
std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
EXPECT_EQ(1u, sources.size());
EXPECT_EQ(webrtc::RtpSourceType::SSRC, sources[0].source_type());
int64_t timestamp1 = sources[0].timestamp_ms();
// Send and receive one frame.
SendFrame();
EXPECT_FRAME_WAIT(1, kVideoWidth, kVideoHeight, kTimeout);
// a new packet.
int64_t timeDeltaMs = 1;
fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(timeDeltaMs));
channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
int64_t timestamp2 = channel_->GetSources(kSsrc)[0].timestamp_ms();
EXPECT_EQ(timestamp2, timestamp1 + timeDeltaMs);
EXPECT_THAT(channel_->GetSources(kSsrc - 1), IsEmpty());
EXPECT_THAT(channel_->GetSources(kSsrc), SizeIs(1));
EXPECT_THAT(channel_->GetSources(kSsrc + 1), IsEmpty());
// It only keeps 10s of history.
fake_clock_.AdvanceTime(webrtc::TimeDelta::seconds(10));
fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1));
EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
}
webrtc::RtpSource source = channel_->GetSources(kSsrc)[0];
EXPECT_EQ(source.source_id(), kSsrc);
EXPECT_EQ(source.source_type(), webrtc::RtpSourceType::SSRC);
int64_t rtp_timestamp_1 = source.rtp_timestamp();
int64_t timestamp_ms_1 = source.timestamp_ms();
TEST_F(WebRtcVideoChannelTestWithClock, GetContributingSources) {
uint8_t data1[] = {0x81, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
// Send and receive another frame.
SendFrame();
EXPECT_FRAME_WAIT(2, kVideoWidth, kVideoHeight, kTimeout);
uint32_t kCsrc = 4321u;
EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
EXPECT_THAT(channel_->GetSources(kSsrc - 1), IsEmpty());
EXPECT_THAT(channel_->GetSources(kSsrc), SizeIs(1));
EXPECT_THAT(channel_->GetSources(kSsrc + 1), IsEmpty());
rtc::CopyOnWriteBuffer packet1(data1, sizeof(data1));
rtc::SetBE32(packet1.data() + 8, kSsrc);
rtc::SetBE32(packet1.data() + 12, kCsrc);
channel_->SetSink(kDefaultReceiveSsrc, NULL);
EXPECT_TRUE(SetDefaultCodec());
EXPECT_TRUE(SetSend(true));
EXPECT_EQ(0, renderer_.num_rendered_frames());
channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
source = channel_->GetSources(kSsrc)[0];
EXPECT_EQ(source.source_id(), kSsrc);
EXPECT_EQ(source.source_type(), webrtc::RtpSourceType::SSRC);
int64_t rtp_timestamp_2 = source.rtp_timestamp();
int64_t timestamp_ms_2 = source.timestamp_ms();
{
ASSERT_EQ(2u, channel_->GetSources(kSsrc).size());
EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
EXPECT_EQ(sources[0].timestamp_ms(), sources[1].timestamp_ms());
// 1 SSRC and 1 CSRC.
EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::SSRC;
}));
EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::CSRC;
}));
}
int64_t timestamp1 = channel_->GetSources(kSsrc)[0].timestamp_ms();
// a new packet with only ssrc (i.e no csrc).
uint8_t data2[] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
rtc::CopyOnWriteBuffer packet2(data2, sizeof(data2));
rtc::SetBE32(packet2.data() + 8, kSsrc);
int64_t timeDeltaMs = 1;
fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(timeDeltaMs));
channel_->OnPacketReceived(packet2, /*packet_time_us=*/-1);
{
ASSERT_EQ(2u, channel_->GetSources(kSsrc).size());
EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
EXPECT_NE(sources[0].timestamp_ms(), sources[1].timestamp_ms());
// 1 SSRC and 1 CSRC.
EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::SSRC;
}));
EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::CSRC;
}));
auto ssrcSource =
absl::c_find_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::SSRC;
});
auto csrcSource =
absl::c_find_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::CSRC;
});
EXPECT_EQ(ssrcSource->timestamp_ms(), timestamp1 + timeDeltaMs);
EXPECT_EQ(csrcSource->timestamp_ms(), timestamp1);
}
// It only keeps 10s of history.
fake_clock_.AdvanceTime(webrtc::TimeDelta::seconds(10));
{
ASSERT_EQ(1u, channel_->GetSources(kSsrc).size());
EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::SSRC;
}));
EXPECT_EQ(0, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
return source.source_type() == webrtc::RtpSourceType::CSRC;
}));
}
fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1));
EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
EXPECT_GT(rtp_timestamp_2, rtp_timestamp_1);
EXPECT_GT(timestamp_ms_2, timestamp_ms_1);
}
TEST_F(WebRtcVideoChannelTest, SetsRidsOnSendStream) {

View File

@ -307,7 +307,7 @@ absl::optional<Syncable::Info> RtpVideoStreamReceiver::GetSyncInfo() const {
return absl::nullopt;
}
{
rtc::CritScope lock(&rtp_sources_lock_);
rtc::CritScope lock(&sync_info_lock_);
if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) {
return absl::nullopt;
}
@ -423,14 +423,9 @@ void RtpVideoStreamReceiver::OnRtpPacket(const RtpPacketReceived& packet) {
// TODO(nisse): Exclude out-of-order packets?
int64_t now_ms = clock_->TimeInMilliseconds();
{
rtc::CritScope cs(&rtp_sources_lock_);
rtc::CritScope cs(&sync_info_lock_);
last_received_rtp_timestamp_ = packet.Timestamp();
last_received_rtp_system_time_ms_ = now_ms;
std::vector<uint32_t> csrcs = packet.Csrcs();
contributing_sources_.Update(now_ms, csrcs,
/* audio level */ absl::nullopt,
packet.Timestamp());
}
// Periodically log the RTP header of incoming packets.
if (now_ms - last_packet_log_ms_ > kPacketLogIntervalMs) {
@ -894,22 +889,4 @@ void RtpVideoStreamReceiver::InsertSpsPpsIntoTracker(uint8_t payload_type) {
sprop_decoder.pps_nalu());
}
std::vector<webrtc::RtpSource> RtpVideoStreamReceiver::GetSources() const {
int64_t now_ms = rtc::TimeMillis();
std::vector<RtpSource> sources;
{
rtc::CritScope cs(&rtp_sources_lock_);
sources = contributing_sources_.GetSources(now_ms);
if (last_received_rtp_system_time_ms_ >=
now_ms - ContributingSources::kHistoryMs) {
RTC_DCHECK(last_received_rtp_timestamp_.has_value());
sources.emplace_back(*last_received_rtp_system_time_ms_,
config_.rtp.remote_ssrc, RtpSourceType::SSRC,
/* audio_level */ absl::nullopt,
*last_received_rtp_timestamp_);
}
}
return sources;
}
} // namespace webrtc

View File

@ -175,8 +175,6 @@ class RtpVideoStreamReceiver : public LossNotificationSender,
void AddSecondarySink(RtpPacketSinkInterface* sink);
void RemoveSecondarySink(const RtpPacketSinkInterface* sink);
std::vector<webrtc::RtpSource> GetSources() const;
private:
// Used for buffering RTCP feedback messages and sending them all together.
// Note:
@ -298,14 +296,13 @@ class RtpVideoStreamReceiver : public LossNotificationSender,
std::vector<RtpPacketSinkInterface*> secondary_sinks_
RTC_GUARDED_BY(worker_task_checker_);
// Info for GetSources and GetSyncInfo is updated on network or worker thread,
// queried on the worker thread.
rtc::CriticalSection rtp_sources_lock_;
ContributingSources contributing_sources_ RTC_GUARDED_BY(&rtp_sources_lock_);
// Info for GetSyncInfo is updated on network or worker thread, and queried on
// the worker thread.
rtc::CriticalSection sync_info_lock_;
absl::optional<uint32_t> last_received_rtp_timestamp_
RTC_GUARDED_BY(rtp_sources_lock_);
RTC_GUARDED_BY(sync_info_lock_);
absl::optional<int64_t> last_received_rtp_system_time_ms_
RTC_GUARDED_BY(rtp_sources_lock_);
RTC_GUARDED_BY(sync_info_lock_);
// Used to validate the buffered frame decryptor is always run on the correct
// thread.

View File

@ -191,6 +191,7 @@ VideoReceiveStream::VideoReceiveStream(
"DecodingThread",
rtc::kHighestPriority),
call_stats_(call_stats),
source_tracker_(clock_),
stats_proxy_(&config_, clock_),
rtp_receive_statistics_(
ReceiveStatistics::Create(clock_, &stats_proxy_, &stats_proxy_)),
@ -503,6 +504,7 @@ void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) {
}
config_.renderer->OnFrame(video_frame);
source_tracker_.OnFrameDelivered(video_frame.packet_infos());
// TODO(tommi): OnRenderFrame grabs a lock too.
stats_proxy_.OnRenderedFrame(video_frame);
}
@ -742,7 +744,7 @@ void VideoReceiveStream::UpdatePlayoutDelays() const {
}
std::vector<webrtc::RtpSource> VideoReceiveStream::GetSources() const {
return rtp_video_stream_receiver_.GetSources();
return source_tracker_.GetSources();
}
} // namespace internal

View File

@ -20,6 +20,7 @@
#include "call/syncable.h"
#include "call/video_receive_stream.h"
#include "modules/rtp_rtcp/include/flexfec_receiver.h"
#include "modules/rtp_rtcp/source/source_tracker.h"
#include "modules/video_coding/frame_buffer2.h"
#include "modules/video_coding/video_coding_impl.h"
#include "rtc_base/synchronization/sequence_checker.h"
@ -164,6 +165,7 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true;
SourceTracker source_tracker_;
ReceiveStatisticsProxy stats_proxy_;
// Shared by media and rtx stream receivers, since the latter has no RtpRtcp
// module of its own.

View File

@ -8,6 +8,7 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#include <algorithm>
#include <utility>
#include <vector>
@ -37,7 +38,10 @@ namespace webrtc {
namespace {
using ::testing::_;
using ::testing::ElementsAreArray;
using ::testing::Invoke;
using ::testing::IsEmpty;
using ::testing::SizeIs;
constexpr int kDefaultTimeOutMs = 50;
@ -107,14 +111,14 @@ class VideoReceiveStreamTest : public ::testing::Test {
null_decoder.decoder_factory = &null_decoder_factory_;
config_.decoders.push_back(null_decoder);
Clock* clock = Clock::GetRealTimeClock();
timing_ = new VCMTiming(clock);
clock_ = Clock::GetRealTimeClock();
timing_ = new VCMTiming(clock_);
video_receive_stream_ =
absl::make_unique<webrtc::internal::VideoReceiveStream>(
task_queue_factory_.get(), &rtp_stream_receiver_controller_,
kDefaultNumCpuCores, &packet_router_, config_.Copy(),
process_thread_.get(), &call_stats_, clock, timing_);
process_thread_.get(), &call_stats_, clock_, timing_);
}
protected:
@ -131,6 +135,7 @@ class VideoReceiveStreamTest : public ::testing::Test {
PacketRouter packet_router_;
RtpStreamReceiverController rtp_stream_receiver_controller_;
std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
Clock* clock_;
VCMTiming* timing_;
};
@ -243,13 +248,13 @@ class VideoReceiveStreamTestWithFakeDecoder : public ::testing::Test {
fake_decoder.video_format = SdpVideoFormat("VP8");
fake_decoder.decoder_factory = &fake_decoder_factory_;
config_.decoders.push_back(fake_decoder);
Clock* clock = Clock::GetRealTimeClock();
timing_ = new VCMTiming(clock);
clock_ = Clock::GetRealTimeClock();
timing_ = new VCMTiming(clock_);
video_receive_stream_.reset(new webrtc::internal::VideoReceiveStream(
task_queue_factory_.get(), &rtp_stream_receiver_controller_,
kDefaultNumCpuCores, &packet_router_, config_.Copy(),
process_thread_.get(), &call_stats_, clock, timing_));
process_thread_.get(), &call_stats_, clock_, timing_));
}
protected:
@ -263,6 +268,7 @@ class VideoReceiveStreamTestWithFakeDecoder : public ::testing::Test {
PacketRouter packet_router_;
RtpStreamReceiverController rtp_stream_receiver_controller_;
std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
Clock* clock_;
VCMTiming* timing_;
};
@ -320,7 +326,85 @@ TEST_F(VideoReceiveStreamTestWithFakeDecoder, PassesPacketInfos) {
video_receive_stream_->OnCompleteFrame(std::move(test_frame));
EXPECT_TRUE(fake_renderer_.WaitForRenderedFrame(kDefaultTimeOutMs));
EXPECT_EQ(fake_renderer_.packet_infos().size(), 3U);
EXPECT_THAT(fake_renderer_.packet_infos(), ElementsAreArray(packet_infos));
}
TEST_F(VideoReceiveStreamTestWithFakeDecoder, RenderedFrameUpdatesGetSources) {
constexpr uint32_t kSsrc = 1111;
constexpr uint32_t kCsrc = 9001;
constexpr uint32_t kRtpTimestamp = 12345;
// Prepare one video frame with per-packet information.
auto test_frame = absl::make_unique<FrameObjectFake>();
test_frame->SetPayloadType(99);
test_frame->id.picture_id = 0;
RtpPacketInfos packet_infos;
{
RtpPacketInfos::vector_type infos;
RtpPacketInfo info;
info.set_ssrc(kSsrc);
info.set_csrcs({kCsrc});
info.set_rtp_timestamp(kRtpTimestamp);
info.set_receive_time_ms(clock_->TimeInMilliseconds() - 5000);
infos.push_back(info);
info.set_receive_time_ms(clock_->TimeInMilliseconds() - 3000);
infos.push_back(info);
info.set_receive_time_ms(clock_->TimeInMilliseconds() - 2000);
infos.push_back(info);
info.set_receive_time_ms(clock_->TimeInMilliseconds() - 4000);
infos.push_back(info);
packet_infos = RtpPacketInfos(std::move(infos));
}
test_frame->SetPacketInfos(packet_infos);
// Start receive stream.
video_receive_stream_->Start();
EXPECT_THAT(video_receive_stream_->GetSources(), IsEmpty());
// Render one video frame.
int64_t timestamp_ms_min = clock_->TimeInMilliseconds();
video_receive_stream_->OnCompleteFrame(std::move(test_frame));
EXPECT_TRUE(fake_renderer_.WaitForRenderedFrame(kDefaultTimeOutMs));
int64_t timestamp_ms_max = clock_->TimeInMilliseconds();
// Verify that the per-packet information is passed to the renderer.
EXPECT_THAT(fake_renderer_.packet_infos(), ElementsAreArray(packet_infos));
// Verify that the per-packet information also updates |GetSources()|.
std::vector<RtpSource> sources = video_receive_stream_->GetSources();
ASSERT_THAT(sources, SizeIs(2));
{
auto it = std::find_if(sources.begin(), sources.end(),
[](const RtpSource& source) {
return source.source_type() == RtpSourceType::SSRC;
});
ASSERT_NE(it, sources.end());
EXPECT_EQ(it->source_id(), kSsrc);
EXPECT_EQ(it->source_type(), RtpSourceType::SSRC);
EXPECT_EQ(it->rtp_timestamp(), kRtpTimestamp);
EXPECT_GE(it->timestamp_ms(), timestamp_ms_min);
EXPECT_LE(it->timestamp_ms(), timestamp_ms_max);
}
{
auto it = std::find_if(sources.begin(), sources.end(),
[](const RtpSource& source) {
return source.source_type() == RtpSourceType::CSRC;
});
ASSERT_NE(it, sources.end());
EXPECT_EQ(it->source_id(), kCsrc);
EXPECT_EQ(it->source_type(), RtpSourceType::CSRC);
EXPECT_EQ(it->rtp_timestamp(), kRtpTimestamp);
EXPECT_GE(it->timestamp_ms(), timestamp_ms_min);
EXPECT_LE(it->timestamp_ms(), timestamp_ms_max);
}
}
} // namespace webrtc