From af17595879227c085690c5016c8693a16f309982 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niels=20M=C3=B6ller?= Date: Mon, 13 Aug 2018 13:23:08 +0200 Subject: [PATCH] Refactor RtpReceiverImpl, extracting CSRC book-keeping to its own class Bug: webrtc:7135 Change-Id: I7ce9afe575241542e4e3f7e2e8459ee3257eec76 Reviewed-on: https://webrtc-review.googlesource.com/93466 Commit-Queue: Niels Moller Reviewed-by: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#24271} --- modules/rtp_rtcp/BUILD.gn | 3 + .../rtp_rtcp/source/contributing_sources.cc | 70 +++++++++++ .../rtp_rtcp/source/contributing_sources.h | 47 ++++++++ .../source/contributing_sources_unittest.cc | 111 ++++++++++++++++++ modules/rtp_rtcp/source/rtp_receiver_impl.cc | 75 ++---------- modules/rtp_rtcp/source/rtp_receiver_impl.h | 19 +-- .../rtp_rtcp/source/rtp_receiver_unittest.cc | 20 +--- 7 files changed, 251 insertions(+), 94 deletions(-) create mode 100644 modules/rtp_rtcp/source/contributing_sources.cc create mode 100644 modules/rtp_rtcp/source/contributing_sources.h create mode 100644 modules/rtp_rtcp/source/contributing_sources_unittest.cc diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index df56b21eeb..8df53805f8 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -113,6 +113,8 @@ rtc_static_library("rtp_rtcp") { "include/rtp_receiver.h", "include/rtp_rtcp.h", "include/ulpfec_receiver.h", + "source/contributing_sources.cc", + "source/contributing_sources.h", "source/dtmf_queue.cc", "source/dtmf_queue.h", "source/fec_private_tables_bursty.cc", @@ -350,6 +352,7 @@ if (rtc_include_tests) { sources = [ "source/byte_io_unittest.cc", + "source/contributing_sources_unittest.cc", "source/fec_private_tables_bursty_unittest.cc", "source/flexfec_header_reader_writer_unittest.cc", "source/flexfec_receiver_unittest.cc", diff --git a/modules/rtp_rtcp/source/contributing_sources.cc b/modules/rtp_rtcp/source/contributing_sources.cc new file mode 100644 index 0000000000..9b82de56ad --- /dev/null +++ b/modules/rtp_rtcp/source/contributing_sources.cc @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/rtp_rtcp/source/contributing_sources.h" + +namespace webrtc { + +namespace { + +// Set by the spec, see +// https://www.w3.org/TR/webrtc/#dom-rtcrtpreceiver-getcontributingsources +constexpr int64_t kHistoryMs = 10 * rtc::kNumMillisecsPerSec; + +// Allow some stale records to accumulate before cleaning. +constexpr int64_t kPruningIntervalMs = 15 * rtc::kNumMillisecsPerSec; + +} // namespace + +ContributingSources::ContributingSources() = default; +ContributingSources::~ContributingSources() = default; + +void ContributingSources::Update(int64_t now_ms, + rtc::ArrayView csrcs) { + for (uint32_t csrc : csrcs) { + last_seen_ms_[csrc] = now_ms; + } + if (!next_pruning_ms_) { + next_pruning_ms_ = now_ms + kPruningIntervalMs; + } else if (now_ms > next_pruning_ms_) { + // To prevent unlimited growth, prune it every 15 seconds. + DeleteOldEntries(now_ms); + } +} + +// Return contributing sources seen the last 10 s. +// TODO(nisse): It would be more efficient to delete any stale entries while +// iterating over the mapping, but then we'd have to make the method +// non-const. +std::vector ContributingSources::GetSources(int64_t now_ms) const { + std::vector sources; + for (auto& record : last_seen_ms_) { + if (record.second >= now_ms - kHistoryMs) { + sources.emplace_back(record.second, record.first, RtpSourceType::CSRC); + } + } + + return sources; +} + +// Delete stale entries. +void ContributingSources::DeleteOldEntries(int64_t now_ms) { + for (auto it = last_seen_ms_.begin(); it != last_seen_ms_.end();) { + if (it->second >= now_ms - kHistoryMs) { + // Still relevant. + ++it; + } else { + it = last_seen_ms_.erase(it); + } + } + next_pruning_ms_ = now_ms + kPruningIntervalMs; +} + +} // namespace webrtc diff --git a/modules/rtp_rtcp/source/contributing_sources.h b/modules/rtp_rtcp/source/contributing_sources.h new file mode 100644 index 0000000000..64d7f52831 --- /dev/null +++ b/modules/rtp_rtcp/source/contributing_sources.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_RTP_RTCP_SOURCE_CONTRIBUTING_SOURCES_H_ +#define MODULES_RTP_RTCP_SOURCE_CONTRIBUTING_SOURCES_H_ + +#include + +#include +#include + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "api/rtpreceiverinterface.h" + +namespace webrtc { + +class ContributingSources { + public: + ContributingSources(); + ~ContributingSources(); + + // TODO(bugs.webrtc.org/3333): Needs to be extended with audio-level, to + // support RFC6465. + void Update(int64_t now_ms, rtc::ArrayView csrcs); + + // Returns contributing sources seen the last 10 s. + std::vector GetSources(int64_t now_ms) const; + + private: + void DeleteOldEntries(int64_t now_ms); + + // Indexed by csrc. + std::map last_seen_ms_; + absl::optional next_pruning_ms_; +}; + +} // namespace webrtc + +#endif // MODULES_RTP_RTCP_SOURCE_CONTRIBUTING_SOURCES_H_ diff --git a/modules/rtp_rtcp/source/contributing_sources_unittest.cc b/modules/rtp_rtcp/source/contributing_sources_unittest.cc new file mode 100644 index 0000000000..8b22d26c94 --- /dev/null +++ b/modules/rtp_rtcp/source/contributing_sources_unittest.cc @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/rtp_rtcp/source/contributing_sources.h" + +#include "rtc_base/timeutils.h" + +#include "test/gmock.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { + +using ::testing::UnorderedElementsAre; + +constexpr uint32_t kCsrc1 = 111; +constexpr uint32_t kCsrc2 = 222; +constexpr uint32_t kCsrc3 = 333; + +} // namespace + +TEST(ContributingSourcesTest, RecordSources) { + ContributingSources csrcs; + constexpr uint32_t kCsrcs[] = {kCsrc1, kCsrc2}; + constexpr int64_t kTime1 = 10; + csrcs.Update(kTime1, kCsrcs); + EXPECT_THAT( + csrcs.GetSources(kTime1), + UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC), + RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC))); +} + +TEST(ContributingSourcesTest, UpdateSources) { + ContributingSources csrcs; + // TODO(nisse): When migrating to absl::Span, the named constant arrays should + // be replaced by unnamed literals where they are passed to csrcs.Update(...). + constexpr uint32_t kCsrcs1[] = {kCsrc1, kCsrc2}; + constexpr uint32_t kCsrcs2[] = {kCsrc2, kCsrc3}; + constexpr int64_t kTime1 = 10; + constexpr int64_t kTime2 = kTime1 + 5 * rtc::kNumMillisecsPerSec; + csrcs.Update(kTime1, kCsrcs1); + EXPECT_THAT( + csrcs.GetSources(kTime1), + UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC), + RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC))); + csrcs.Update(kTime2, kCsrcs2); + EXPECT_THAT( + csrcs.GetSources(kTime2), + UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC), + RtpSource(kTime2, kCsrc2, RtpSourceType::CSRC), + RtpSource(kTime2, kCsrc3, RtpSourceType::CSRC))); +} + +TEST(ContributingSourcesTest, ReturnRecentOnly) { + ContributingSources csrcs; + constexpr uint32_t kCsrcs1[] = {kCsrc1, kCsrc2}; + constexpr uint32_t kCsrcs2[] = {kCsrc2, kCsrc3}; + constexpr int64_t kTime1 = 10; + constexpr int64_t kTime2 = kTime1 + 5 * rtc::kNumMillisecsPerSec; + constexpr int64_t kTime3 = kTime1 + 12 * rtc::kNumMillisecsPerSec; + csrcs.Update(kTime1, kCsrcs1); + EXPECT_THAT( + csrcs.GetSources(kTime1), + UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC), + RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC))); + csrcs.Update(kTime2, kCsrcs2); + EXPECT_THAT( + csrcs.GetSources(kTime3), + UnorderedElementsAre(RtpSource(kTime2, kCsrc2, RtpSourceType::CSRC), + RtpSource(kTime2, kCsrc3, RtpSourceType::CSRC))); +} + +TEST(ContributingSourcesTest, PurgeOldSources) { + ContributingSources csrcs; + constexpr uint32_t kCsrcs1[] = {kCsrc1, kCsrc2}; + constexpr uint32_t kCsrcs2[] = {kCsrc2, kCsrc3}; + constexpr int64_t kTime1 = 10; + constexpr int64_t kTime2 = kTime1 + 10 * rtc::kNumMillisecsPerSec; + constexpr int64_t kTime3 = kTime1 + 20 * rtc::kNumMillisecsPerSec; + csrcs.Update(kTime1, kCsrcs1); + EXPECT_THAT( + csrcs.GetSources(kTime2), + UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC), + RtpSource(kTime1, kCsrc2, RtpSourceType::CSRC))); + csrcs.Update(kTime2, kCsrcs2); + EXPECT_THAT( + csrcs.GetSources(kTime2), + UnorderedElementsAre(RtpSource(kTime1, kCsrc1, RtpSourceType::CSRC), + RtpSource(kTime2, kCsrc2, RtpSourceType::CSRC), + RtpSource(kTime2, kCsrc3, RtpSourceType::CSRC))); + csrcs.Update(kTime3, kCsrcs2); + EXPECT_THAT( + csrcs.GetSources(kTime3), + UnorderedElementsAre(RtpSource(kTime3, kCsrc2, RtpSourceType::CSRC), + RtpSource(kTime3, kCsrc3, RtpSourceType::CSRC))); + // Query at an earlier time; check that old sources really have been purged + // and don't reappear. + EXPECT_THAT( + csrcs.GetSources(kTime2), + UnorderedElementsAre(RtpSource(kTime3, kCsrc2, RtpSourceType::CSRC), + RtpSource(kTime3, kCsrc3, RtpSourceType::CSRC))); +} + +} // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_receiver_impl.cc b/modules/rtp_rtcp/source/rtp_receiver_impl.cc index 23f8b8b7d5..ea63e5ee19 100644 --- a/modules/rtp_rtcp/source/rtp_receiver_impl.cc +++ b/modules/rtp_rtcp/source/rtp_receiver_impl.cc @@ -85,12 +85,8 @@ RtpReceiverImpl::RtpReceiverImpl(Clock* clock, rtp_payload_registry_(rtp_payload_registry), rtp_media_receiver_(rtp_media_receiver), ssrc_(0), - num_csrcs_(0), - current_remote_csrc_(), last_received_timestamp_(0), - last_received_frame_time_ms_(-1) { - memset(current_remote_csrc_, 0, sizeof(current_remote_csrc_)); -} + last_received_frame_time_ms_(-1) {} RtpReceiverImpl::~RtpReceiverImpl() {} @@ -134,9 +130,17 @@ bool RtpReceiverImpl::IncomingRtpPacket(const RTPHeader& rtp_header, // OK, keep-alive packet. return true; } + int64_t now_ms = clock_->TimeInMilliseconds(); + + { + rtc::CritScope lock(&critical_section_rtp_receiver_); + + csrcs_.Update( + now_ms, rtc::MakeArrayView(rtp_header.arrOfCSRCs, rtp_header.numCSRCs)); + } + WebRtcRTPHeader webrtc_rtp_header{}; webrtc_rtp_header.header = rtp_header; - CheckCSRC(webrtc_rtp_header); auto audio_level = rtp_header.extension.hasAudioLevel @@ -145,8 +149,7 @@ bool RtpReceiverImpl::IncomingRtpPacket(const RTPHeader& rtp_header, UpdateSources(audio_level); int32_t ret_val = rtp_media_receiver_->ParseRtpPacket( - &webrtc_rtp_header, payload_specific, payload, payload_length, - clock_->TimeInMilliseconds()); + &webrtc_rtp_header, payload_specific, payload, payload_length, now_ms); if (ret_val < 0) { return false; @@ -174,16 +177,11 @@ std::vector RtpReceiverImpl::GetSources() const { rtc::CritScope lock(&critical_section_rtp_receiver_); int64_t now_ms = clock_->TimeInMilliseconds(); - std::vector sources; - RTC_DCHECK(std::is_sorted(ssrc_sources_.begin(), ssrc_sources_.end(), [](const RtpSource& lhs, const RtpSource& rhs) { return lhs.timestamp_ms() < rhs.timestamp_ms(); })); - RTC_DCHECK(std::is_sorted(csrc_sources_.begin(), csrc_sources_.end(), - [](const RtpSource& lhs, const RtpSource& rhs) { - return lhs.timestamp_ms() < rhs.timestamp_ms(); - })); + std::vector sources = csrcs_.GetSources(now_ms); std::set selected_ssrcs; for (auto rit = ssrc_sources_.rbegin(); rit != ssrc_sources_.rend(); ++rit) { @@ -194,13 +192,6 @@ std::vector RtpReceiverImpl::GetSources() const { sources.push_back(*rit); } } - - for (auto rit = csrc_sources_.rbegin(); rit != csrc_sources_.rend(); ++rit) { - if ((now_ms - rit->timestamp_ms()) > kGetSourcesTimeoutMs) { - break; - } - sources.push_back(*rit); - } return sources; } @@ -223,44 +214,11 @@ void RtpReceiverImpl::CheckSSRCChanged(const RTPHeader& rtp_header) { ssrc_ = rtp_header.ssrc; } -// Implementation note: must not hold critsect when called. -void RtpReceiverImpl::CheckCSRC(const WebRtcRTPHeader& rtp_header) { - const uint8_t num_csrcs = rtp_header.header.numCSRCs; - if (num_csrcs > kRtpCsrcSize) { - // Ignore. - return; - } - { - rtc::CritScope lock(&critical_section_rtp_receiver_); - - // Copy new. - memcpy(current_remote_csrc_, rtp_header.header.arrOfCSRCs, - num_csrcs * sizeof(uint32_t)); - - num_csrcs_ = num_csrcs; - } // End critsect. -} - void RtpReceiverImpl::UpdateSources( const absl::optional& ssrc_audio_level) { rtc::CritScope lock(&critical_section_rtp_receiver_); int64_t now_ms = clock_->TimeInMilliseconds(); - for (size_t i = 0; i < num_csrcs_; ++i) { - auto map_it = iterator_by_csrc_.find(current_remote_csrc_[i]); - if (map_it == iterator_by_csrc_.end()) { - // If it is a new CSRC, append a new object to the end of the list. - csrc_sources_.emplace_back(now_ms, current_remote_csrc_[i], - RtpSourceType::CSRC); - } else { - // If it is an existing CSRC, move the object to the end of the list. - map_it->second->update_timestamp_ms(now_ms); - csrc_sources_.splice(csrc_sources_.end(), csrc_sources_, map_it->second); - } - // Update the unordered_map. - iterator_by_csrc_[current_remote_csrc_[i]] = std::prev(csrc_sources_.end()); - } - // If this is the first packet or the SSRC is changed, insert a new // contributing source that uses the SSRC. if (ssrc_sources_.empty() || ssrc_sources_.rbegin()->source_id() != ssrc_) { @@ -275,15 +233,6 @@ void RtpReceiverImpl::UpdateSources( } void RtpReceiverImpl::RemoveOutdatedSources(int64_t now_ms) { - std::list::iterator it; - for (it = csrc_sources_.begin(); it != csrc_sources_.end(); ++it) { - if ((now_ms - it->timestamp_ms()) <= kGetSourcesTimeoutMs) { - break; - } - iterator_by_csrc_.erase(it->source_id()); - } - csrc_sources_.erase(csrc_sources_.begin(), it); - std::vector::iterator vec_it; for (vec_it = ssrc_sources_.begin(); vec_it != ssrc_sources_.end(); ++vec_it) { diff --git a/modules/rtp_rtcp/source/rtp_receiver_impl.h b/modules/rtp_rtcp/source/rtp_receiver_impl.h index 229c619539..66265f5fb1 100644 --- a/modules/rtp_rtcp/source/rtp_receiver_impl.h +++ b/modules/rtp_rtcp/source/rtp_receiver_impl.h @@ -11,7 +11,6 @@ #ifndef MODULES_RTP_RTCP_SOURCE_RTP_RECEIVER_IMPL_H_ #define MODULES_RTP_RTCP_SOURCE_RTP_RECEIVER_IMPL_H_ -#include #include #include #include @@ -19,6 +18,7 @@ #include "absl/types/optional.h" #include "modules/rtp_rtcp/include/rtp_receiver.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/contributing_sources.h" #include "modules/rtp_rtcp/source/rtp_receiver_strategy.h" #include "rtc_base/criticalsection.h" @@ -53,17 +53,8 @@ class RtpReceiverImpl : public RtpReceiver { std::vector GetSources() const override; - const std::vector& ssrc_sources_for_testing() const { - return ssrc_sources_; - } - - const std::list& csrc_sources_for_testing() const { - return csrc_sources_; - } - private: void CheckSSRCChanged(const RTPHeader& rtp_header); - void CheckCSRC(const WebRtcRTPHeader& rtp_header); void UpdateSources(const absl::optional& ssrc_audio_level); void RemoveOutdatedSources(int64_t now_ms); @@ -77,9 +68,8 @@ class RtpReceiverImpl : public RtpReceiver { // SSRCs. uint32_t ssrc_ RTC_GUARDED_BY(critical_section_rtp_receiver_); - uint8_t num_csrcs_ RTC_GUARDED_BY(critical_section_rtp_receiver_); - uint32_t current_remote_csrc_[kRtpCsrcSize] RTC_GUARDED_BY( - critical_section_rtp_receiver_); + + ContributingSources csrcs_ RTC_GUARDED_BY(critical_section_rtp_receiver_); // Sequence number and timestamps for the latest in-order packet. absl::optional last_received_sequence_number_ @@ -89,10 +79,7 @@ class RtpReceiverImpl : public RtpReceiver { int64_t last_received_frame_time_ms_ RTC_GUARDED_BY(critical_section_rtp_receiver_); - std::unordered_map::iterator> - iterator_by_csrc_; // The RtpSource objects are sorted chronologically. - std::list csrc_sources_; std::vector ssrc_sources_; }; } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_receiver_unittest.cc b/modules/rtp_rtcp/source/rtp_receiver_unittest.cc index e8dccf43af..1400288ee9 100644 --- a/modules/rtp_rtcp/source/rtp_receiver_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_receiver_unittest.cc @@ -16,7 +16,6 @@ #include "modules/rtp_rtcp/include/rtp_receiver.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h" -#include "modules/rtp_rtcp/source/rtp_receiver_impl.h" #include "test/gmock.h" #include "test/gtest.h" @@ -235,20 +234,11 @@ TEST_F(RtpReceiverTest, GetSourcesRemoveOutdatedSource) { header.arrOfCSRCs[0] = kCsrc1; EXPECT_TRUE(rtp_receiver_->IncomingRtpPacket( header, kTestPayload, sizeof(kTestPayload), payload_specific)); - auto* rtp_receiver_impl = static_cast(rtp_receiver_.get()); - auto ssrc_sources = rtp_receiver_impl->ssrc_sources_for_testing(); - ASSERT_EQ(1u, ssrc_sources.size()); - EXPECT_EQ(kSsrc1, ssrc_sources.begin()->source_id()); - EXPECT_EQ(RtpSourceType::SSRC, ssrc_sources.begin()->source_type()); - EXPECT_EQ(fake_clock_.TimeInMilliseconds(), - ssrc_sources.begin()->timestamp_ms()); - - auto csrc_sources = rtp_receiver_impl->csrc_sources_for_testing(); - ASSERT_EQ(1u, csrc_sources.size()); - EXPECT_EQ(kCsrc1, csrc_sources.begin()->source_id()); - EXPECT_EQ(RtpSourceType::CSRC, csrc_sources.begin()->source_type()); - EXPECT_EQ(fake_clock_.TimeInMilliseconds(), - csrc_sources.begin()->timestamp_ms()); + now_ms = fake_clock_.TimeInMilliseconds(); + sources = rtp_receiver_->GetSources(); + EXPECT_THAT(sources, UnorderedElementsAre( + RtpSource(now_ms, kSsrc1, RtpSourceType::SSRC), + RtpSource(now_ms, kCsrc1, RtpSourceType::CSRC))); } // The audio level from the RTPHeader extension should be stored in the