Refactor RtpReceiverImpl, extracting CSRC book-keeping to its own class

Bug: webrtc:7135
Change-Id: I7ce9afe575241542e4e3f7e2e8459ee3257eec76
Reviewed-on: https://webrtc-review.googlesource.com/93466
Commit-Queue: Niels Moller <nisse@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24271}
This commit is contained in:
Niels Möller 2018-08-13 13:23:08 +02:00 committed by Commit Bot
parent ced5cfdb35
commit af17595879
7 changed files with 251 additions and 94 deletions

View File

@ -113,6 +113,8 @@ rtc_static_library("rtp_rtcp") {
"include/rtp_receiver.h",
"include/rtp_rtcp.h",
"include/ulpfec_receiver.h",
"source/contributing_sources.cc",
"source/contributing_sources.h",
"source/dtmf_queue.cc",
"source/dtmf_queue.h",
"source/fec_private_tables_bursty.cc",
@ -350,6 +352,7 @@ if (rtc_include_tests) {
sources = [
"source/byte_io_unittest.cc",
"source/contributing_sources_unittest.cc",
"source/fec_private_tables_bursty_unittest.cc",
"source/flexfec_header_reader_writer_unittest.cc",
"source/flexfec_receiver_unittest.cc",

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/rtp_rtcp/source/contributing_sources.h"
namespace webrtc {
namespace {
// Set by the spec, see
// https://www.w3.org/TR/webrtc/#dom-rtcrtpreceiver-getcontributingsources
constexpr int64_t kHistoryMs = 10 * rtc::kNumMillisecsPerSec;
// Allow some stale records to accumulate before cleaning.
constexpr int64_t kPruningIntervalMs = 15 * rtc::kNumMillisecsPerSec;
} // namespace
ContributingSources::ContributingSources() = default;
ContributingSources::~ContributingSources() = default;
void ContributingSources::Update(int64_t now_ms,
rtc::ArrayView<const uint32_t> csrcs) {
for (uint32_t csrc : csrcs) {
last_seen_ms_[csrc] = now_ms;
}
if (!next_pruning_ms_) {
next_pruning_ms_ = now_ms + kPruningIntervalMs;
} else if (now_ms > next_pruning_ms_) {
// To prevent unlimited growth, prune it every 15 seconds.
DeleteOldEntries(now_ms);
}
}
// Return contributing sources seen the last 10 s.
// TODO(nisse): It would be more efficient to delete any stale entries while
// iterating over the mapping, but then we'd have to make the method
// non-const.
std::vector<RtpSource> ContributingSources::GetSources(int64_t now_ms) const {
std::vector<RtpSource> sources;
for (auto& record : last_seen_ms_) {
if (record.second >= now_ms - kHistoryMs) {
sources.emplace_back(record.second, record.first, RtpSourceType::CSRC);
}
}
return sources;
}
// Delete stale entries.
void ContributingSources::DeleteOldEntries(int64_t now_ms) {
for (auto it = last_seen_ms_.begin(); it != last_seen_ms_.end();) {
if (it->second >= now_ms - kHistoryMs) {
// Still relevant.
++it;
} else {
it = last_seen_ms_.erase(it);
}
}
next_pruning_ms_ = now_ms + kPruningIntervalMs;
}
} // namespace webrtc

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_RTP_RTCP_SOURCE_CONTRIBUTING_SOURCES_H_
#define MODULES_RTP_RTCP_SOURCE_CONTRIBUTING_SOURCES_H_
#include <stdint.h>
#include <map>
#include <vector>
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "api/rtpreceiverinterface.h"
namespace webrtc {
class ContributingSources {
public:
ContributingSources();
~ContributingSources();
// TODO(bugs.webrtc.org/3333): Needs to be extended with audio-level, to
// support RFC6465.
void Update(int64_t now_ms, rtc::ArrayView<const uint32_t> csrcs);
// Returns contributing sources seen the last 10 s.
std::vector<RtpSource> GetSources(int64_t now_ms) const;
private:
void DeleteOldEntries(int64_t now_ms);
// Indexed by csrc.
std::map<uint32_t, int64_t> last_seen_ms_;
absl::optional<int64_t> next_pruning_ms_;
};
} // namespace webrtc
#endif // MODULES_RTP_RTCP_SOURCE_CONTRIBUTING_SOURCES_H_

View File

@ -0,0 +1,111 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/rtp_rtcp/source/contributing_sources.h"
#include "rtc_base/timeutils.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
using ::testing::UnorderedElementsAre;
constexpr uint32_t kCsrc1 = 111;
constexpr uint32_t kCsrc2 = 222;
constexpr uint32_t kCsrc3 = 333;
} // namespace
TEST(ContributingSourcesTest, RecordSources) {
ContributingSources csrcs;
constexpr uint32_t kCsrcs[] = {kCsrc1, kCsrc2};
constexpr int64_t kTime1 = 10;
csrcs.Update(kTime1, kCsrcs);
EXPECT_THAT(
csrcs.GetSources(kTime1),
UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC),
RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC)));
}
TEST(ContributingSourcesTest, UpdateSources) {
ContributingSources csrcs;
// TODO(nisse): When migrating to absl::Span, the named constant arrays should
// be replaced by unnamed literals where they are passed to csrcs.Update(...).
constexpr uint32_t kCsrcs1[] = {kCsrc1, kCsrc2};
constexpr uint32_t kCsrcs2[] = {kCsrc2, kCsrc3};
constexpr int64_t kTime1 = 10;
constexpr int64_t kTime2 = kTime1 + 5 * rtc::kNumMillisecsPerSec;
csrcs.Update(kTime1, kCsrcs1);
EXPECT_THAT(
csrcs.GetSources(kTime1),
UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC),
RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC)));
csrcs.Update(kTime2, kCsrcs2);
EXPECT_THAT(
csrcs.GetSources(kTime2),
UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC),
RtpSource(kTime2, kCsrc2, RtpSourceType::CSRC),
RtpSource(kTime2, kCsrc3, RtpSourceType::CSRC)));
}
TEST(ContributingSourcesTest, ReturnRecentOnly) {
ContributingSources csrcs;
constexpr uint32_t kCsrcs1[] = {kCsrc1, kCsrc2};
constexpr uint32_t kCsrcs2[] = {kCsrc2, kCsrc3};
constexpr int64_t kTime1 = 10;
constexpr int64_t kTime2 = kTime1 + 5 * rtc::kNumMillisecsPerSec;
constexpr int64_t kTime3 = kTime1 + 12 * rtc::kNumMillisecsPerSec;
csrcs.Update(kTime1, kCsrcs1);
EXPECT_THAT(
csrcs.GetSources(kTime1),
UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC),
RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC)));
csrcs.Update(kTime2, kCsrcs2);
EXPECT_THAT(
csrcs.GetSources(kTime3),
UnorderedElementsAre(RtpSource(kTime2, kCsrc2, RtpSourceType::CSRC),
RtpSource(kTime2, kCsrc3, RtpSourceType::CSRC)));
}
TEST(ContributingSourcesTest, PurgeOldSources) {
ContributingSources csrcs;
constexpr uint32_t kCsrcs1[] = {kCsrc1, kCsrc2};
constexpr uint32_t kCsrcs2[] = {kCsrc2, kCsrc3};
constexpr int64_t kTime1 = 10;
constexpr int64_t kTime2 = kTime1 + 10 * rtc::kNumMillisecsPerSec;
constexpr int64_t kTime3 = kTime1 + 20 * rtc::kNumMillisecsPerSec;
csrcs.Update(kTime1, kCsrcs1);
EXPECT_THAT(
csrcs.GetSources(kTime2),
UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC),
RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC)));
csrcs.Update(kTime2, kCsrcs2);
EXPECT_THAT(
csrcs.GetSources(kTime2),
UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC),
RtpSource(kTime2, kCsrc2, RtpSourceType::CSRC),
RtpSource(kTime2, kCsrc3, RtpSourceType::CSRC)));
csrcs.Update(kTime3, kCsrcs2);
EXPECT_THAT(
csrcs.GetSources(kTime3),
UnorderedElementsAre(RtpSource(kTime3, kCsrc2, RtpSourceType::CSRC),
RtpSource(kTime3, kCsrc3, RtpSourceType::CSRC)));
// Query at an earlier time; check that old sources really have been purged
// and don't reappear.
EXPECT_THAT(
csrcs.GetSources(kTime2),
UnorderedElementsAre(RtpSource(kTime3, kCsrc2, RtpSourceType::CSRC),
RtpSource(kTime3, kCsrc3, RtpSourceType::CSRC)));
}
} // namespace webrtc

View File

@ -85,12 +85,8 @@ RtpReceiverImpl::RtpReceiverImpl(Clock* clock,
rtp_payload_registry_(rtp_payload_registry),
rtp_media_receiver_(rtp_media_receiver),
ssrc_(0),
num_csrcs_(0),
current_remote_csrc_(),
last_received_timestamp_(0),
last_received_frame_time_ms_(-1) {
memset(current_remote_csrc_, 0, sizeof(current_remote_csrc_));
}
last_received_frame_time_ms_(-1) {}
RtpReceiverImpl::~RtpReceiverImpl() {}
@ -134,9 +130,17 @@ bool RtpReceiverImpl::IncomingRtpPacket(const RTPHeader& rtp_header,
// OK, keep-alive packet.
return true;
}
int64_t now_ms = clock_->TimeInMilliseconds();
{
rtc::CritScope lock(&critical_section_rtp_receiver_);
csrcs_.Update(
now_ms, rtc::MakeArrayView(rtp_header.arrOfCSRCs, rtp_header.numCSRCs));
}
WebRtcRTPHeader webrtc_rtp_header{};
webrtc_rtp_header.header = rtp_header;
CheckCSRC(webrtc_rtp_header);
auto audio_level =
rtp_header.extension.hasAudioLevel
@ -145,8 +149,7 @@ bool RtpReceiverImpl::IncomingRtpPacket(const RTPHeader& rtp_header,
UpdateSources(audio_level);
int32_t ret_val = rtp_media_receiver_->ParseRtpPacket(
&webrtc_rtp_header, payload_specific, payload, payload_length,
clock_->TimeInMilliseconds());
&webrtc_rtp_header, payload_specific, payload, payload_length, now_ms);
if (ret_val < 0) {
return false;
@ -174,16 +177,11 @@ std::vector<RtpSource> RtpReceiverImpl::GetSources() const {
rtc::CritScope lock(&critical_section_rtp_receiver_);
int64_t now_ms = clock_->TimeInMilliseconds();
std::vector<RtpSource> sources;
RTC_DCHECK(std::is_sorted(ssrc_sources_.begin(), ssrc_sources_.end(),
[](const RtpSource& lhs, const RtpSource& rhs) {
return lhs.timestamp_ms() < rhs.timestamp_ms();
}));
RTC_DCHECK(std::is_sorted(csrc_sources_.begin(), csrc_sources_.end(),
[](const RtpSource& lhs, const RtpSource& rhs) {
return lhs.timestamp_ms() < rhs.timestamp_ms();
}));
std::vector<RtpSource> sources = csrcs_.GetSources(now_ms);
std::set<uint32_t> selected_ssrcs;
for (auto rit = ssrc_sources_.rbegin(); rit != ssrc_sources_.rend(); ++rit) {
@ -194,13 +192,6 @@ std::vector<RtpSource> RtpReceiverImpl::GetSources() const {
sources.push_back(*rit);
}
}
for (auto rit = csrc_sources_.rbegin(); rit != csrc_sources_.rend(); ++rit) {
if ((now_ms - rit->timestamp_ms()) > kGetSourcesTimeoutMs) {
break;
}
sources.push_back(*rit);
}
return sources;
}
@ -223,44 +214,11 @@ void RtpReceiverImpl::CheckSSRCChanged(const RTPHeader& rtp_header) {
ssrc_ = rtp_header.ssrc;
}
// Implementation note: must not hold critsect when called.
void RtpReceiverImpl::CheckCSRC(const WebRtcRTPHeader& rtp_header) {
const uint8_t num_csrcs = rtp_header.header.numCSRCs;
if (num_csrcs > kRtpCsrcSize) {
// Ignore.
return;
}
{
rtc::CritScope lock(&critical_section_rtp_receiver_);
// Copy new.
memcpy(current_remote_csrc_, rtp_header.header.arrOfCSRCs,
num_csrcs * sizeof(uint32_t));
num_csrcs_ = num_csrcs;
} // End critsect.
}
void RtpReceiverImpl::UpdateSources(
const absl::optional<uint8_t>& ssrc_audio_level) {
rtc::CritScope lock(&critical_section_rtp_receiver_);
int64_t now_ms = clock_->TimeInMilliseconds();
for (size_t i = 0; i < num_csrcs_; ++i) {
auto map_it = iterator_by_csrc_.find(current_remote_csrc_[i]);
if (map_it == iterator_by_csrc_.end()) {
// If it is a new CSRC, append a new object to the end of the list.
csrc_sources_.emplace_back(now_ms, current_remote_csrc_[i],
RtpSourceType::CSRC);
} else {
// If it is an existing CSRC, move the object to the end of the list.
map_it->second->update_timestamp_ms(now_ms);
csrc_sources_.splice(csrc_sources_.end(), csrc_sources_, map_it->second);
}
// Update the unordered_map.
iterator_by_csrc_[current_remote_csrc_[i]] = std::prev(csrc_sources_.end());
}
// If this is the first packet or the SSRC is changed, insert a new
// contributing source that uses the SSRC.
if (ssrc_sources_.empty() || ssrc_sources_.rbegin()->source_id() != ssrc_) {
@ -275,15 +233,6 @@ void RtpReceiverImpl::UpdateSources(
}
void RtpReceiverImpl::RemoveOutdatedSources(int64_t now_ms) {
std::list<RtpSource>::iterator it;
for (it = csrc_sources_.begin(); it != csrc_sources_.end(); ++it) {
if ((now_ms - it->timestamp_ms()) <= kGetSourcesTimeoutMs) {
break;
}
iterator_by_csrc_.erase(it->source_id());
}
csrc_sources_.erase(csrc_sources_.begin(), it);
std::vector<RtpSource>::iterator vec_it;
for (vec_it = ssrc_sources_.begin(); vec_it != ssrc_sources_.end();
++vec_it) {

View File

@ -11,7 +11,6 @@
#ifndef MODULES_RTP_RTCP_SOURCE_RTP_RECEIVER_IMPL_H_
#define MODULES_RTP_RTCP_SOURCE_RTP_RECEIVER_IMPL_H_
#include <list>
#include <memory>
#include <unordered_map>
#include <vector>
@ -19,6 +18,7 @@
#include "absl/types/optional.h"
#include "modules/rtp_rtcp/include/rtp_receiver.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/contributing_sources.h"
#include "modules/rtp_rtcp/source/rtp_receiver_strategy.h"
#include "rtc_base/criticalsection.h"
@ -53,17 +53,8 @@ class RtpReceiverImpl : public RtpReceiver {
std::vector<RtpSource> GetSources() const override;
const std::vector<RtpSource>& ssrc_sources_for_testing() const {
return ssrc_sources_;
}
const std::list<RtpSource>& csrc_sources_for_testing() const {
return csrc_sources_;
}
private:
void CheckSSRCChanged(const RTPHeader& rtp_header);
void CheckCSRC(const WebRtcRTPHeader& rtp_header);
void UpdateSources(const absl::optional<uint8_t>& ssrc_audio_level);
void RemoveOutdatedSources(int64_t now_ms);
@ -77,9 +68,8 @@ class RtpReceiverImpl : public RtpReceiver {
// SSRCs.
uint32_t ssrc_ RTC_GUARDED_BY(critical_section_rtp_receiver_);
uint8_t num_csrcs_ RTC_GUARDED_BY(critical_section_rtp_receiver_);
uint32_t current_remote_csrc_[kRtpCsrcSize] RTC_GUARDED_BY(
critical_section_rtp_receiver_);
ContributingSources csrcs_ RTC_GUARDED_BY(critical_section_rtp_receiver_);
// Sequence number and timestamps for the latest in-order packet.
absl::optional<uint16_t> last_received_sequence_number_
@ -89,10 +79,7 @@ class RtpReceiverImpl : public RtpReceiver {
int64_t last_received_frame_time_ms_
RTC_GUARDED_BY(critical_section_rtp_receiver_);
std::unordered_map<uint32_t, std::list<RtpSource>::iterator>
iterator_by_csrc_;
// The RtpSource objects are sorted chronologically.
std::list<RtpSource> csrc_sources_;
std::vector<RtpSource> ssrc_sources_;
};
} // namespace webrtc

View File

@ -16,7 +16,6 @@
#include "modules/rtp_rtcp/include/rtp_receiver.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h"
#include "modules/rtp_rtcp/source/rtp_receiver_impl.h"
#include "test/gmock.h"
#include "test/gtest.h"
@ -235,20 +234,11 @@ TEST_F(RtpReceiverTest, GetSourcesRemoveOutdatedSource) {
header.arrOfCSRCs[0] = kCsrc1;
EXPECT_TRUE(rtp_receiver_->IncomingRtpPacket(
header, kTestPayload, sizeof(kTestPayload), payload_specific));
auto* rtp_receiver_impl = static_cast<RtpReceiverImpl*>(rtp_receiver_.get());
auto ssrc_sources = rtp_receiver_impl->ssrc_sources_for_testing();
ASSERT_EQ(1u, ssrc_sources.size());
EXPECT_EQ(kSsrc1, ssrc_sources.begin()->source_id());
EXPECT_EQ(RtpSourceType::SSRC, ssrc_sources.begin()->source_type());
EXPECT_EQ(fake_clock_.TimeInMilliseconds(),
ssrc_sources.begin()->timestamp_ms());
auto csrc_sources = rtp_receiver_impl->csrc_sources_for_testing();
ASSERT_EQ(1u, csrc_sources.size());
EXPECT_EQ(kCsrc1, csrc_sources.begin()->source_id());
EXPECT_EQ(RtpSourceType::CSRC, csrc_sources.begin()->source_type());
EXPECT_EQ(fake_clock_.TimeInMilliseconds(),
csrc_sources.begin()->timestamp_ms());
now_ms = fake_clock_.TimeInMilliseconds();
sources = rtp_receiver_->GetSources();
EXPECT_THAT(sources, UnorderedElementsAre(
RtpSource(now_ms, kSsrc1, RtpSourceType::SSRC),
RtpSource(now_ms, kCsrc1, RtpSourceType::CSRC)));
}
// The audio level from the RTPHeader extension should be stored in the