Add 1 sec timer to ModuleRtpRtcpImpl2 instead of frequent polling.

This reduces the number of times we grab a few locks down from
somewhere upwards of around a thousand time a second to a few times.

* Update the RTT value on the worker thread and fire callbacks.
* Trigger NotifyTmmbrUpdated() calls from the worker.
* Update the tests to use a GlobalSimulatedTimeController.

Change-Id: Ib81582494066b9460ae0aa84271f32311f30fbce
Bug: webrtc:11581
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177664
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31602}
This commit is contained in:
Tomas Gunnarsson 2020-07-01 08:53:21 +02:00 committed by Commit Bot
parent 20f45823e3
commit ba0ba71e93
8 changed files with 254 additions and 197 deletions

View File

@ -560,6 +560,7 @@ if (rtc_include_tests) {
"../../test:rtp_test_utils",
"../../test:test_common",
"../../test:test_support",
"../../test/time_controller:time_controller",
"../video_coding:codec_globals_headers",
]
absl_deps = [

View File

@ -12,6 +12,7 @@
#include <string.h>
#include <algorithm>
#include <limits>
#include <map>
#include <memory>
@ -63,8 +64,8 @@ const int64_t kRtcpMinFrameLengthMs = 17;
// Maximum number of received RRTRs that will be stored.
const size_t kMaxNumberOfStoredRrtrs = 300;
constexpr int32_t kDefaultVideoReportInterval = 1000;
constexpr int32_t kDefaultAudioReportInterval = 5000;
constexpr TimeDelta kDefaultVideoReportInterval = TimeDelta::Seconds(1);
constexpr TimeDelta kDefaultAudioReportInterval = TimeDelta::Seconds(5);
std::set<uint32_t> GetRegisteredSsrcs(
const RtpRtcpInterface::Configuration& config) {
@ -81,6 +82,22 @@ std::set<uint32_t> GetRegisteredSsrcs(
}
return ssrcs;
}
// Returns true if the |timestamp| has exceeded the |interval *
// kRrTimeoutIntervals| period and was reset (set to PlusInfinity()). Returns
// false if the timer was either already reset or if it has not expired.
bool ResetTimestampIfExpired(const Timestamp now,
Timestamp& timestamp,
TimeDelta interval) {
if (timestamp.IsInfinite() ||
now <= timestamp + interval * kRrTimeoutIntervals) {
return false;
}
timestamp = Timestamp::PlusInfinity();
return true;
}
} // namespace
struct RTCPReceiver::PacketInformation {
@ -150,8 +167,8 @@ RTCPReceiver::RTCPReceiver(const RtpRtcpInterface::Configuration& config,
network_state_estimate_observer_(config.network_state_estimate_observer),
transport_feedback_observer_(config.transport_feedback_callback),
bitrate_allocation_observer_(config.bitrate_allocation_observer),
report_interval_ms_(config.rtcp_report_interval_ms > 0
? config.rtcp_report_interval_ms
report_interval_(config.rtcp_report_interval_ms > 0
? TimeDelta::Millis(config.rtcp_report_interval_ms)
: (config.audio ? kDefaultAudioReportInterval
: kDefaultVideoReportInterval)),
// TODO(bugs.webrtc.org/10774): Remove fallback.
@ -160,8 +177,6 @@ RTCPReceiver::RTCPReceiver(const RtpRtcpInterface::Configuration& config,
xr_rrtr_status_(false),
xr_rr_rtt_ms_(0),
oldest_tmmbr_info_ms_(0),
last_received_rb_ms_(0),
last_increased_sequence_number_ms_(0),
stats_callback_(config.rtcp_statistics_callback),
cname_callback_(config.rtcp_cname_callback),
report_block_data_observer_(config.report_block_data_observer),
@ -185,9 +200,11 @@ void RTCPReceiver::IncomingPacket(rtc::ArrayView<const uint8_t> packet) {
TriggerCallbacksFromRtcpPacket(packet_information);
}
// This method is only used by test and legacy code, so we should be able to
// remove it soon.
int64_t RTCPReceiver::LastReceivedReportBlockMs() const {
rtc::CritScope lock(&rtcp_receiver_lock_);
return last_received_rb_ms_;
return last_received_rb_.IsFinite() ? last_received_rb_.ms() : 0;
}
void RTCPReceiver::SetRemoteSSRC(uint32_t ssrc) {
@ -255,6 +272,60 @@ bool RTCPReceiver::GetAndResetXrRrRtt(int64_t* rtt_ms) {
return true;
}
// Called regularly (1/sec) on the worker thread to do rtt calculations.
absl::optional<TimeDelta> RTCPReceiver::OnPeriodicRttUpdate(
Timestamp newer_than,
bool sending) {
// Running on the worker thread (same as construction thread).
absl::optional<TimeDelta> rtt;
if (sending) {
// Check if we've received a report block within the last kRttUpdateInterval
// amount of time.
rtc::CritScope lock(&rtcp_receiver_lock_);
if (last_received_rb_.IsInfinite() || last_received_rb_ > newer_than) {
// Stow away the report block for the main ssrc. We'll use the associated
// data map to look up each sender and check the last_rtt_ms().
auto main_report_it = received_report_blocks_.find(main_ssrc_);
if (main_report_it != received_report_blocks_.end()) {
const ReportBlockDataMap& main_data_map = main_report_it->second;
int64_t max_rtt = 0;
for (const auto& reports_per_receiver : received_report_blocks_) {
for (const auto& report : reports_per_receiver.second) {
const RTCPReportBlock& block = report.second.report_block();
auto it_info = main_data_map.find(block.sender_ssrc);
if (it_info != main_data_map.end()) {
const ReportBlockData* report_block_data = &it_info->second;
if (report_block_data->num_rtts() > 0) {
max_rtt = std::max(report_block_data->last_rtt_ms(), max_rtt);
}
}
}
}
if (max_rtt)
rtt.emplace(TimeDelta::Millis(max_rtt));
}
}
// Check for expired timers and if so, log and reset.
auto now = clock_->CurrentTime();
if (RtcpRrTimeoutLocked(now)) {
RTC_LOG_F(LS_WARNING) << "Timeout: No RTCP RR received.";
} else if (RtcpRrSequenceNumberTimeoutLocked(now)) {
RTC_LOG_F(LS_WARNING) << "Timeout: No increase in RTCP RR extended "
"highest sequence number.";
}
} else {
// Report rtt from receiver.
int64_t rtt_ms;
if (GetAndResetXrRrRtt(&rtt_ms)) {
rtt.emplace(TimeDelta::Millis(rtt_ms));
}
}
return rtt;
}
bool RTCPReceiver::NTP(uint32_t* received_ntp_secs,
uint32_t* received_ntp_frac,
uint32_t* rtcp_arrival_time_secs,
@ -499,8 +570,7 @@ void RTCPReceiver::HandleReportBlock(const ReportBlock& report_block,
if (registered_ssrcs_.count(report_block.source_ssrc()) == 0)
return;
const Timestamp now = clock_->CurrentTime();
last_received_rb_ms_ = now.ms();
last_received_rb_ = clock_->CurrentTime();
ReportBlockData* report_block_data =
&received_report_blocks_[report_block.source_ssrc()][remote_ssrc];
@ -513,7 +583,7 @@ void RTCPReceiver::HandleReportBlock(const ReportBlock& report_block,
report_block_data->report_block().extended_highest_sequence_number) {
// We have successfully delivered new RTP packets to the remote side after
// the last RR was sent from the remote side.
last_increased_sequence_number_ms_ = now.ms();
last_increased_sequence_number_ = last_received_rb_;
}
rtcp_report_block.extended_highest_sequence_number =
report_block.extended_high_seq_num();
@ -539,7 +609,8 @@ void RTCPReceiver::HandleReportBlock(const ReportBlock& report_block,
if (send_time_ntp != 0) {
uint32_t delay_ntp = report_block.delay_since_last_sr();
// Local NTP time.
uint32_t receive_time_ntp = CompactNtp(TimeMicrosToNtp(now.us()));
uint32_t receive_time_ntp =
CompactNtp(TimeMicrosToNtp(last_received_rb_.us()));
// RTT in 1/(2^16) seconds.
uint32_t rtt_ntp = receive_time_ntp - delay_ntp - send_time_ntp;
@ -578,33 +649,18 @@ RTCPReceiver::TmmbrInformation* RTCPReceiver::GetTmmbrInformation(
return &it->second;
}
// These two methods (RtcpRrTimeout and RtcpRrSequenceNumberTimeout) only exist
// for tests and legacy code (rtp_rtcp_impl.cc). We should be able to to delete
// the methods and require that access to the locked variables only happens on
// the worker thread and thus no locking is needed.
bool RTCPReceiver::RtcpRrTimeout() {
rtc::CritScope lock(&rtcp_receiver_lock_);
if (last_received_rb_ms_ == 0)
return false;
int64_t time_out_ms = kRrTimeoutIntervals * report_interval_ms_;
if (clock_->TimeInMilliseconds() > last_received_rb_ms_ + time_out_ms) {
// Reset the timer to only trigger one log.
last_received_rb_ms_ = 0;
return true;
}
return false;
return RtcpRrTimeoutLocked(clock_->CurrentTime());
}
bool RTCPReceiver::RtcpRrSequenceNumberTimeout() {
rtc::CritScope lock(&rtcp_receiver_lock_);
if (last_increased_sequence_number_ms_ == 0)
return false;
int64_t time_out_ms = kRrTimeoutIntervals * report_interval_ms_;
if (clock_->TimeInMilliseconds() >
last_increased_sequence_number_ms_ + time_out_ms) {
// Reset the timer to only trigger one log.
last_increased_sequence_number_ms_ = 0;
return true;
}
return false;
return RtcpRrSequenceNumberTimeoutLocked(clock_->CurrentTime());
}
bool RTCPReceiver::UpdateTmmbrTimers() {
@ -1153,4 +1209,13 @@ std::vector<rtcp::TmmbItem> RTCPReceiver::TmmbrReceived() {
return candidates;
}
bool RTCPReceiver::RtcpRrTimeoutLocked(Timestamp now) {
return ResetTimestampIfExpired(now, last_received_rb_, report_interval_);
}
bool RTCPReceiver::RtcpRrSequenceNumberTimeoutLocked(Timestamp now) {
return ResetTimestampIfExpired(now, last_increased_sequence_number_,
report_interval_);
}
} // namespace webrtc

View File

@ -89,6 +89,11 @@ class RTCPReceiver final {
void SetRtcpXrRrtrStatus(bool enable);
bool GetAndResetXrRrRtt(int64_t* rtt_ms);
// Called once per second on the worker thread to do rtt calculations.
// Returns an optional rtt value if one is available.
absl::optional<TimeDelta> OnPeriodicRttUpdate(Timestamp newer_than,
bool sending);
// Get statistics.
int32_t StatisticsReceived(std::vector<RTCPReportBlock>* receiveBlocks) const;
// A snapshot of Report Blocks with additional data of interest to statistics.
@ -210,6 +215,12 @@ class RTCPReceiver final {
PacketInformation* packet_information)
RTC_EXCLUSIVE_LOCKS_REQUIRED(rtcp_receiver_lock_);
bool RtcpRrTimeoutLocked(Timestamp now)
RTC_EXCLUSIVE_LOCKS_REQUIRED(rtcp_receiver_lock_);
bool RtcpRrSequenceNumberTimeoutLocked(Timestamp now)
RTC_EXCLUSIVE_LOCKS_REQUIRED(rtcp_receiver_lock_);
Clock* const clock_;
const bool receiver_only_;
ModuleRtpRtcp* const rtp_rtcp_;
@ -222,7 +233,7 @@ class RTCPReceiver final {
NetworkStateEstimateObserver* const network_state_estimate_observer_;
TransportFeedbackObserver* const transport_feedback_observer_;
VideoBitrateAllocationObserver* const bitrate_allocation_observer_;
const int report_interval_ms_;
const TimeDelta report_interval_;
rtc::CriticalSection rtcp_receiver_lock_;
uint32_t remote_ssrc_ RTC_GUARDED_BY(rtcp_receiver_lock_);
@ -256,11 +267,12 @@ class RTCPReceiver final {
RTC_GUARDED_BY(rtcp_receiver_lock_);
// The last time we received an RTCP Report block for this module.
int64_t last_received_rb_ms_ RTC_GUARDED_BY(rtcp_receiver_lock_);
Timestamp last_received_rb_ RTC_GUARDED_BY(rtcp_receiver_lock_) =
Timestamp::PlusInfinity();
// The time we last received an RTCP RR telling we have successfully
// delivered RTP packet to the remote side.
int64_t last_increased_sequence_number_ms_;
Timestamp last_increased_sequence_number_ = Timestamp::PlusInfinity();
RtcpStatisticsCallback* const stats_callback_;
RtcpCnameCallback* const cname_callback_;

View File

@ -33,8 +33,9 @@
namespace webrtc {
namespace {
const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5;
const int64_t kRtpRtcpRttProcessTimeMs = 1000;
const int64_t kDefaultExpectedRetransmissionTimeMs = 125;
constexpr TimeDelta kRttUpdateInterval = TimeDelta::Millis(1000);
} // namespace
ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
@ -75,10 +76,19 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
// webrtc::VideoSendStream::Config::Rtp::kDefaultMaxPacketSize.
const size_t kTcpOverIpv4HeaderSize = 40;
SetMaxRtpPacketSize(IP_PACKET_SIZE - kTcpOverIpv4HeaderSize);
if (rtt_stats_) {
rtt_update_task_ = RepeatingTaskHandle::DelayedStart(
worker_queue_, kRttUpdateInterval, [this]() {
PeriodicUpdate();
return kRttUpdateInterval;
});
}
}
ModuleRtpRtcpImpl2::~ModuleRtpRtcpImpl2() {
RTC_DCHECK_RUN_ON(worker_queue_);
rtt_update_task_.Stop();
}
// static
@ -100,53 +110,17 @@ int64_t ModuleRtpRtcpImpl2::TimeUntilNextProcess() {
// Process any pending tasks such as timeouts (non time critical events).
void ModuleRtpRtcpImpl2::Process() {
RTC_DCHECK_RUN_ON(&process_thread_checker_);
const int64_t now = clock_->TimeInMilliseconds();
const Timestamp now = clock_->CurrentTime();
// TODO(bugs.webrtc.org/11581): Figure out why we need to call Process() 200
// times a second.
next_process_time_ = now + kRtpRtcpMaxIdleTimeProcessMs;
next_process_time_ = now.ms() + kRtpRtcpMaxIdleTimeProcessMs;
// TODO(bugs.webrtc.org/11581): We update the RTT once a second, whereas other
// things that run in this method are updated much more frequently. Move the
// RTT checking over to the worker thread, which matches better with where the
// stats are maintained.
bool process_rtt = now >= last_rtt_process_time_ + kRtpRtcpRttProcessTimeMs;
if (rtcp_sender_.Sending()) {
// Process RTT if we have received a report block and we haven't
// processed RTT for at least |kRtpRtcpRttProcessTimeMs| milliseconds.
// Note that LastReceivedReportBlockMs() grabs a lock, so check
// |process_rtt| first.
if (process_rtt &&
rtcp_receiver_.LastReceivedReportBlockMs() > last_rtt_process_time_) {
std::vector<RTCPReportBlock> receive_blocks;
rtcp_receiver_.StatisticsReceived(&receive_blocks);
int64_t max_rtt = 0;
for (std::vector<RTCPReportBlock>::iterator it = receive_blocks.begin();
it != receive_blocks.end(); ++it) {
int64_t rtt = 0;
rtcp_receiver_.RTT(it->sender_ssrc, &rtt, NULL, NULL, NULL);
max_rtt = (rtt > max_rtt) ? rtt : max_rtt;
}
// Report the rtt.
if (rtt_stats_ && max_rtt != 0)
rtt_stats_->OnRttUpdate(max_rtt);
}
// Verify receiver reports are delivered and the reported sequence number
// is increasing.
// TODO(bugs.webrtc.org/11581): The timeout value needs to be checked every
// few seconds (see internals of RtcpRrTimeout). Here, we may be polling it
// a couple of hundred times a second, which isn't great since it grabs a
// lock. Note also that LastReceivedReportBlockMs() (called above) and
// RtcpRrTimeout() both grab the same lock and check the same timer, so
// it should be possible to consolidate that work somehow.
if (rtcp_receiver_.RtcpRrTimeout()) {
RTC_LOG_F(LS_WARNING) << "Timeout: No RTCP RR received.";
} else if (rtcp_receiver_.RtcpRrSequenceNumberTimeout()) {
RTC_LOG_F(LS_WARNING) << "Timeout: No increase in RTCP RR extended "
"highest sequence number.";
}
if (remote_bitrate_ && rtcp_sender_.TMMBR()) {
// TODO(bugs.webrtc.org/11581): once we don't use Process() to trigger
// calls to SendRTCP(), the only remaining timer will require remote_bitrate_
// to be not null. In that case, we can disable the timer when it is null.
if (remote_bitrate_ && rtcp_sender_.Sending() && rtcp_sender_.TMMBR()) {
unsigned int target_bitrate = 0;
std::vector<unsigned int> ssrcs;
if (remote_bitrate_->LatestEstimate(&ssrcs, &target_bitrate)) {
@ -156,38 +130,11 @@ void ModuleRtpRtcpImpl2::Process() {
rtcp_sender_.SetTargetBitrate(target_bitrate);
}
}
} else {
// Report rtt from receiver.
if (process_rtt) {
int64_t rtt_ms;
if (rtt_stats_ && rtcp_receiver_.GetAndResetXrRrRtt(&rtt_ms)) {
rtt_stats_->OnRttUpdate(rtt_ms);
}
}
}
// Get processed rtt.
if (process_rtt) {
last_rtt_process_time_ = now;
// TODO(bugs.webrtc.org/11581): Is this a bug? At the top of the function,
// next_process_time_ is incremented by 5ms, here we effectively do a
// std::min() of (now + 5ms, now + 1000ms). Seems like this is a no-op?
next_process_time_ = std::min(
next_process_time_, last_rtt_process_time_ + kRtpRtcpRttProcessTimeMs);
if (rtt_stats_) {
// Make sure we have a valid RTT before setting.
int64_t last_rtt = rtt_stats_->LastProcessedRtt();
if (last_rtt >= 0)
set_rtt_ms(last_rtt);
}
}
// TODO(bugs.webrtc.org/11581): Run this on a separate set of delayed tasks
// based off of next_time_to_send_rtcp_ in RTCPSender.
if (rtcp_sender_.TimeToSendRTCPReport())
rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
if (rtcp_sender_.TMMBR() && rtcp_receiver_.UpdateTmmbrTimers()) {
rtcp_receiver_.NotifyTmmbrUpdated();
}
}
void ModuleRtpRtcpImpl2::SetRtxSendStatus(int mode) {
@ -290,7 +237,7 @@ void ModuleRtpRtcpImpl2::SetCsrcs(const std::vector<uint32_t>& csrcs) {
// feedbacks).
RTCPSender::FeedbackState ModuleRtpRtcpImpl2::GetFeedbackState() {
// TODO(bugs.webrtc.org/11581): Called by potentially multiple threads.
// "Send*" methods and on the ProcessThread. Make sure it's only called on the
// Mostly "Send*" methods. Make sure it's only called on the
// construction thread.
RTCPSender::FeedbackState state;
@ -465,7 +412,10 @@ int32_t ModuleRtpRtcpImpl2::RemoteNTP(uint32_t* received_ntpsecs,
: -1;
}
// Get RoundTripTime.
// TODO(tommi): Check if |avg_rtt_ms|, |min_rtt_ms|, |max_rtt_ms| params are
// actually used in practice (some callers ask for it but don't use it). It
// could be that only |rtt| is needed and if so, then the fast path could be to
// just call rtt_ms() and rely on the calculation being done periodically.
int32_t ModuleRtpRtcpImpl2::RTT(const uint32_t remote_ssrc,
int64_t* rtt,
int64_t* avg_rtt,
@ -484,7 +434,7 @@ int64_t ModuleRtpRtcpImpl2::ExpectedRetransmissionTimeMs() const {
if (expected_retransmission_time_ms > 0) {
return expected_retransmission_time_ms;
}
// No rtt available (|kRtpRtcpRttProcessTimeMs| not yet passed?), so try to
// No rtt available (|kRttUpdateInterval| not yet passed?), so try to
// poll avg_rtt_ms directly from rtcp receiver.
if (rtcp_receiver_.RTT(rtcp_receiver_.RemoteSSRC(), nullptr,
&expected_retransmission_time_ms, nullptr,
@ -731,6 +681,7 @@ bool ModuleRtpRtcpImpl2::LastReceivedNTP(
}
void ModuleRtpRtcpImpl2::set_rtt_ms(int64_t rtt_ms) {
RTC_DCHECK_RUN_ON(worker_queue_);
{
rtc::CritScope cs(&critical_section_rtt_);
rtt_ms_ = rtt_ms;
@ -758,4 +709,23 @@ const RTPSender* ModuleRtpRtcpImpl2::RtpSender() const {
return rtp_sender_ ? &rtp_sender_->packet_generator : nullptr;
}
void ModuleRtpRtcpImpl2::PeriodicUpdate() {
RTC_DCHECK_RUN_ON(worker_queue_);
Timestamp check_since = clock_->CurrentTime() - kRttUpdateInterval;
absl::optional<TimeDelta> rtt =
rtcp_receiver_.OnPeriodicRttUpdate(check_since, rtcp_sender_.Sending());
if (rtt) {
rtt_stats_->OnRttUpdate(rtt->ms());
set_rtt_ms(rtt->ms());
}
// kTmmbrTimeoutIntervalMs is 25 seconds, so an order of seconds.
// Instead of this polling approach, consider having an optional timer in the
// RTCPReceiver class that is started/stopped based on the state of
// rtcp_sender_.TMMBR().
if (rtcp_sender_.TMMBR() && rtcp_receiver_.UpdateTmmbrTimers())
rtcp_receiver_.NotifyTmmbrUpdated();
}
} // namespace webrtc

View File

@ -37,6 +37,8 @@
#include "rtc_base/critical_section.h"
#include "rtc_base/gtest_prod_util.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/repeating_task.h"
namespace webrtc {
@ -284,6 +286,10 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
bool TimeToSendFullNackList(int64_t now) const;
// Called on a timer, once a second, on the worker_queue_, to update the RTT,
// check if we need to send RTCP report, send TMMBR updates and fire events.
void PeriodicUpdate();
TaskQueueBase* const worker_queue_;
SequenceChecker process_thread_checker_;
@ -305,6 +311,7 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
RemoteBitrateEstimator* const remote_bitrate_;
RtcpRttStats* const rtt_stats_;
RepeatingTaskHandle rtt_update_task_ RTC_GUARDED_BY(worker_queue_);
// The processed RTT from RtcpRttStats.
rtc::CriticalSection critical_section_rtt_;

View File

@ -27,6 +27,7 @@
#include "test/rtcp_packet_parser.h"
#include "test/rtp_header_parser.h"
#include "test/run_loop.h"
#include "test/time_controller/simulated_time_controller.h"
using ::testing::ElementsAre;
@ -53,14 +54,14 @@ class SendTransport : public Transport {
public:
SendTransport()
: receiver_(nullptr),
clock_(nullptr),
time_controller_(nullptr),
delay_ms_(0),
rtp_packets_sent_(0),
rtcp_packets_sent_(0) {}
void SetRtpRtcpModule(ModuleRtpRtcpImpl2* receiver) { receiver_ = receiver; }
void SimulateNetworkDelay(int64_t delay_ms, SimulatedClock* clock) {
clock_ = clock;
void SimulateNetworkDelay(int64_t delay_ms, TimeController* time_controller) {
time_controller_ = time_controller;
delay_ms_ = delay_ms;
}
bool SendRtp(const uint8_t* data,
@ -78,17 +79,19 @@ class SendTransport : public Transport {
parser.Parse(data, len);
last_nack_list_ = parser.nack()->packet_ids();
if (clock_) {
clock_->AdvanceTimeMilliseconds(delay_ms_);
if (time_controller_) {
time_controller_->AdvanceTime(TimeDelta::Millis(delay_ms_));
}
EXPECT_TRUE(receiver_);
receiver_->IncomingRtcpPacket(data, len);
++rtcp_packets_sent_;
return true;
}
size_t NumRtcpSent() { return rtcp_packets_sent_; }
ModuleRtpRtcpImpl2* receiver_;
SimulatedClock* clock_;
TimeController* time_controller_;
int64_t delay_ms_;
int rtp_packets_sent_;
size_t rtcp_packets_sent_;
@ -98,12 +101,13 @@ class SendTransport : public Transport {
class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
public:
RtpRtcpModule(SimulatedClock* clock, bool is_sender)
RtpRtcpModule(TimeController* time_controller, bool is_sender)
: is_sender_(is_sender),
receive_statistics_(ReceiveStatistics::Create(clock)),
clock_(clock) {
receive_statistics_(
ReceiveStatistics::Create(time_controller->GetClock())),
time_controller_(time_controller) {
CreateModuleImpl();
transport_.SimulateNetworkDelay(kOneWayNetworkDelayMs, clock);
transport_.SimulateNetworkDelay(kOneWayNetworkDelayMs, time_controller);
}
const bool is_sender_;
@ -146,7 +150,7 @@ class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
void CreateModuleImpl() {
RtpRtcpInterface::Configuration config;
config.audio = false;
config.clock = clock_;
config.clock = time_controller_->GetClock();
config.outgoing_transport = &transport_;
config.receive_statistics = receive_statistics_.get();
config.rtcp_packet_type_counter_observer = this;
@ -160,7 +164,7 @@ class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
impl_->SetRTCPStatus(RtcpMode::kCompound);
}
SimulatedClock* const clock_;
TimeController* const time_controller_;
std::map<uint32_t, RtcpPacketTypeCounter> counter_map_;
};
} // namespace
@ -168,9 +172,9 @@ class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
class RtpRtcpImpl2Test : public ::testing::Test {
protected:
RtpRtcpImpl2Test()
: clock_(133590000000000),
sender_(&clock_, /*is_sender=*/true),
receiver_(&clock_, /*is_sender=*/false) {}
: time_controller_(Timestamp::Micros(133590000000000)),
sender_(&time_controller_, /*is_sender=*/true),
receiver_(&time_controller_, /*is_sender=*/false) {}
void SetUp() override {
// Send module.
@ -181,7 +185,7 @@ class RtpRtcpImpl2Test : public ::testing::Test {
FieldTrialBasedConfig field_trials;
RTPSenderVideo::Config video_config;
video_config.clock = &clock_;
video_config.clock = time_controller_.GetClock();
video_config.rtp_sender = sender_.impl_->RtpSender();
video_config.field_trials = &field_trials;
sender_video_ = std::make_unique<RTPSenderVideo>(video_config);
@ -199,8 +203,13 @@ class RtpRtcpImpl2Test : public ::testing::Test {
receiver_.transport_.SetRtpRtcpModule(sender_.impl_.get());
}
test::RunLoop loop_;
SimulatedClock clock_;
void AdvanceTimeMs(int64_t milliseconds) {
time_controller_.AdvanceTime(TimeDelta::Millis(milliseconds));
}
GlobalSimulatedTimeController time_controller_;
// test::RunLoop loop_;
// SimulatedClock clock_;
RtpRtcpModule sender_;
std::unique_ptr<RTPSenderVideo> sender_video_;
RtpRtcpModule receiver_;
@ -256,7 +265,7 @@ TEST_F(RtpRtcpImpl2Test, RetransmitsAllLayers) {
EXPECT_EQ(kSequenceNumber + 2, sender_.LastRtpSequenceNumber());
// Min required delay until retransmit = 5 + RTT ms (RTT = 0).
clock_.AdvanceTimeMilliseconds(5);
AdvanceTimeMs(5);
// Frame with kBaseLayerTid re-sent.
IncomingRtcpNack(&sender_, kSequenceNumber);
@ -286,7 +295,7 @@ TEST_F(RtpRtcpImpl2Test, Rtt) {
EXPECT_EQ(0, sender_.impl_->SendRTCP(kRtcpReport));
// Receiver module should send a RR with a response to the last received SR.
clock_.AdvanceTimeMilliseconds(1000);
AdvanceTimeMs(1000);
EXPECT_EQ(0, receiver_.impl_->SendRTCP(kRtcpReport));
// Verify RTT.
@ -308,7 +317,8 @@ TEST_F(RtpRtcpImpl2Test, Rtt) {
// Verify RTT from rtt_stats config.
EXPECT_EQ(0, sender_.rtt_stats_.LastProcessedRtt());
EXPECT_EQ(0, sender_.impl_->rtt_ms());
sender_.impl_->Process();
AdvanceTimeMs(1000);
EXPECT_NEAR(2 * kOneWayNetworkDelayMs, sender_.rtt_stats_.LastProcessedRtt(),
1);
EXPECT_NEAR(2 * kOneWayNetworkDelayMs, sender_.impl_->rtt_ms(), 1);
@ -327,7 +337,7 @@ TEST_F(RtpRtcpImpl2Test, RttForReceiverOnly) {
EXPECT_EQ(0, receiver_.impl_->SendRTCP(kRtcpReport));
// Sender module should send a response to the last received RTRR (DLRR).
clock_.AdvanceTimeMilliseconds(1000);
AdvanceTimeMs(1000);
// Send Frame before sending a SR.
SendFrame(&sender_, sender_video_.get(), kBaseLayerTid);
EXPECT_EQ(0, sender_.impl_->SendRTCP(kRtcpReport));
@ -335,7 +345,7 @@ TEST_F(RtpRtcpImpl2Test, RttForReceiverOnly) {
// Verify RTT.
EXPECT_EQ(0, receiver_.rtt_stats_.LastProcessedRtt());
EXPECT_EQ(0, receiver_.impl_->rtt_ms());
receiver_.impl_->Process();
AdvanceTimeMs(1000);
EXPECT_NEAR(2 * kOneWayNetworkDelayMs,
receiver_.rtt_stats_.LastProcessedRtt(), 1);
EXPECT_NEAR(2 * kOneWayNetworkDelayMs, receiver_.impl_->rtt_ms(), 1);
@ -343,16 +353,16 @@ TEST_F(RtpRtcpImpl2Test, RttForReceiverOnly) {
TEST_F(RtpRtcpImpl2Test, NoSrBeforeMedia) {
// Ignore fake transport delays in this test.
sender_.transport_.SimulateNetworkDelay(0, &clock_);
receiver_.transport_.SimulateNetworkDelay(0, &clock_);
sender_.transport_.SimulateNetworkDelay(0, &time_controller_);
receiver_.transport_.SimulateNetworkDelay(0, &time_controller_);
sender_.impl_->Process();
EXPECT_EQ(-1, sender_.RtcpSent().first_packet_time_ms);
// Verify no SR is sent before media has been sent, RR should still be sent
// from the receiving module though.
clock_.AdvanceTimeMilliseconds(2000);
int64_t current_time = clock_.TimeInMilliseconds();
AdvanceTimeMs(2000);
int64_t current_time = time_controller_.GetClock()->TimeInMilliseconds();
sender_.impl_->Process();
receiver_.impl_->Process();
EXPECT_EQ(-1, sender_.RtcpSent().first_packet_time_ms);
@ -460,7 +470,7 @@ TEST_F(RtpRtcpImpl2Test, SendsExtendedNackList) {
}
TEST_F(RtpRtcpImpl2Test, ReSendsNackListAfterRttMs) {
sender_.transport_.SimulateNetworkDelay(0, &clock_);
sender_.transport_.SimulateNetworkDelay(0, &time_controller_);
// Send module sends a NACK.
const uint16_t kNackLength = 2;
uint16_t nack_list[kNackLength] = {123, 125};
@ -473,19 +483,19 @@ TEST_F(RtpRtcpImpl2Test, ReSendsNackListAfterRttMs) {
// Same list not re-send, rtt interval has not passed.
const int kStartupRttMs = 100;
clock_.AdvanceTimeMilliseconds(kStartupRttMs);
AdvanceTimeMs(kStartupRttMs);
EXPECT_EQ(0, sender_.impl_->SendNACK(nack_list, kNackLength));
EXPECT_EQ(1U, sender_.RtcpSent().nack_packets);
// Rtt interval passed, full list sent.
clock_.AdvanceTimeMilliseconds(1);
AdvanceTimeMs(1);
EXPECT_EQ(0, sender_.impl_->SendNACK(nack_list, kNackLength));
EXPECT_EQ(2U, sender_.RtcpSent().nack_packets);
EXPECT_THAT(sender_.LastNackListSent(), ElementsAre(123, 125));
}
TEST_F(RtpRtcpImpl2Test, UniqueNackRequests) {
receiver_.transport_.SimulateNetworkDelay(0, &clock_);
receiver_.transport_.SimulateNetworkDelay(0, &time_controller_);
EXPECT_EQ(0U, receiver_.RtcpSent().nack_packets);
EXPECT_EQ(0U, receiver_.RtcpSent().nack_requests);
EXPECT_EQ(0U, receiver_.RtcpSent().unique_nack_requests);
@ -508,7 +518,7 @@ TEST_F(RtpRtcpImpl2Test, UniqueNackRequests) {
// Receive module sends new request with duplicated packets.
const int kStartupRttMs = 100;
clock_.AdvanceTimeMilliseconds(kStartupRttMs + 1);
AdvanceTimeMs(kStartupRttMs + 1);
const uint16_t kNackLength2 = 4;
uint16_t nack_list2[kNackLength2] = {11, 18, 20, 21};
EXPECT_EQ(0, receiver_.impl_->SendNACK(nack_list2, kNackLength2));
@ -539,13 +549,13 @@ TEST_F(RtpRtcpImpl2Test, ConfigurableRtcpReportInterval) {
EXPECT_EQ(0u, sender_.transport_.NumRtcpSent());
// Move ahead to the last ms before a rtcp is expected, no action.
clock_.AdvanceTimeMilliseconds(kVideoReportInterval / 2 - 1);
AdvanceTimeMs(kVideoReportInterval / 2 - 1);
sender_.impl_->Process();
EXPECT_EQ(sender_.RtcpSent().first_packet_time_ms, -1);
EXPECT_EQ(sender_.transport_.NumRtcpSent(), 0u);
// Move ahead to the first rtcp. Send RTCP.
clock_.AdvanceTimeMilliseconds(1);
AdvanceTimeMs(1);
sender_.impl_->Process();
EXPECT_GT(sender_.RtcpSent().first_packet_time_ms, -1);
EXPECT_EQ(sender_.transport_.NumRtcpSent(), 1u);
@ -553,21 +563,21 @@ TEST_F(RtpRtcpImpl2Test, ConfigurableRtcpReportInterval) {
SendFrame(&sender_, sender_video_.get(), kBaseLayerTid);
// Move ahead to the last possible second before second rtcp is expected.
clock_.AdvanceTimeMilliseconds(kVideoReportInterval * 1 / 2 - 1);
AdvanceTimeMs(kVideoReportInterval * 1 / 2 - 1);
sender_.impl_->Process();
EXPECT_EQ(sender_.transport_.NumRtcpSent(), 1u);
// Move ahead into the range of second rtcp, the second rtcp may be sent.
clock_.AdvanceTimeMilliseconds(1);
AdvanceTimeMs(1);
sender_.impl_->Process();
EXPECT_GE(sender_.transport_.NumRtcpSent(), 1u);
clock_.AdvanceTimeMilliseconds(kVideoReportInterval / 2);
AdvanceTimeMs(kVideoReportInterval / 2);
sender_.impl_->Process();
EXPECT_GE(sender_.transport_.NumRtcpSent(), 1u);
// Move out the range of second rtcp, the second rtcp must have been sent.
clock_.AdvanceTimeMilliseconds(kVideoReportInterval / 2);
AdvanceTimeMs(kVideoReportInterval / 2);
sender_.impl_->Process();
EXPECT_EQ(sender_.transport_.NumRtcpSent(), 2u);
}
@ -588,7 +598,7 @@ TEST_F(RtpRtcpImpl2Test, StoresPacketInfoForSentPackets) {
packet.set_first_packet_of_frame(true);
packet.SetMarker(true);
sender_.impl_->TrySendPacket(&packet, pacing_info);
loop_.Flush();
AdvanceTimeMs(1);
std::vector<RtpSequenceNumberMap::Info> seqno_info =
sender_.impl_->GetSentRtpPacketInfos(std::vector<uint16_t>{1});
@ -613,7 +623,7 @@ TEST_F(RtpRtcpImpl2Test, StoresPacketInfoForSentPackets) {
packet.SetMarker(true);
sender_.impl_->TrySendPacket(&packet, pacing_info);
loop_.Flush();
AdvanceTimeMs(1);
seqno_info =
sender_.impl_->GetSentRtpPacketInfos(std::vector<uint16_t>{2, 3, 4});

View File

@ -12,6 +12,7 @@
#include <algorithm>
#include <memory>
#include <utility>
#include "absl/algorithm/container.h"
#include "modules/utility/include/process_thread.h"
@ -95,24 +96,17 @@ CallStats::~CallStats() {
void CallStats::UpdateAndReport() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
// |avg_rtt_ms_| is allowed to be read on the construction thread since that's
// the only thread that modifies the value.
int64_t avg_rtt_ms = avg_rtt_ms_;
RemoveOldReports(clock_->CurrentTime().ms(), &reports_);
max_rtt_ms_ = GetMaxRttMs(reports_);
avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms);
{
rtc::CritScope lock(&avg_rtt_ms_lock_);
avg_rtt_ms_ = avg_rtt_ms;
}
avg_rtt_ms_ = GetNewAvgRttMs(reports_, avg_rtt_ms_);
// If there is a valid rtt, update all observers with the max rtt.
if (max_rtt_ms_ >= 0) {
RTC_DCHECK_GE(avg_rtt_ms, 0);
RTC_DCHECK_GE(avg_rtt_ms_, 0);
for (CallStatsObserver* observer : observers_)
observer->OnRttUpdate(avg_rtt_ms, max_rtt_ms_);
observer->OnRttUpdate(avg_rtt_ms_, max_rtt_ms_);
// Sum for Histogram of average RTT reported over the entire call.
sum_avg_rtt_ms_ += avg_rtt_ms;
sum_avg_rtt_ms_ += avg_rtt_ms_;
++num_avg_rtt_;
}
}
@ -134,23 +128,24 @@ int64_t CallStats::LastProcessedRtt() const {
return avg_rtt_ms_;
}
int64_t CallStats::LastProcessedRttFromProcessThread() const {
RTC_DCHECK_RUN_ON(&process_thread_checker_);
rtc::CritScope lock(&avg_rtt_ms_lock_);
return avg_rtt_ms_;
}
void CallStats::OnRttUpdate(int64_t rtt) {
RTC_DCHECK_RUN_ON(&process_thread_checker_);
// This callback may for some RtpRtcp module instances (video send stream) be
// invoked from a separate task queue, in other cases, we should already be
// on the correct TQ.
int64_t now_ms = clock_->TimeInMilliseconds();
task_queue_->PostTask(ToQueuedTask(task_safety_, [this, rtt, now_ms]() {
auto update = [this, rtt, now_ms]() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
reports_.push_back(RttTime(rtt, now_ms));
if (time_of_first_rtt_ms_ == -1)
time_of_first_rtt_ms_ = now_ms;
UpdateAndReport();
}));
};
if (task_queue_->IsCurrent()) {
update();
} else {
task_queue_->PostTask(ToQueuedTask(task_safety_, std::move(update)));
}
}
void CallStats::UpdateHistograms() {

View File

@ -70,7 +70,6 @@ class CallStats {
private:
// Part of the RtcpRttStats implementation. Called by RtcpRttStatsImpl.
void OnRttUpdate(int64_t rtt);
int64_t LastProcessedRttFromProcessThread() const;
void UpdateAndReport();
@ -80,24 +79,28 @@ class CallStats {
class RtcpRttStatsImpl : public RtcpRttStats {
public:
explicit RtcpRttStatsImpl(CallStats* owner) : owner_(owner) {
process_thread_checker_.Detach();
}
explicit RtcpRttStatsImpl(CallStats* owner) : owner_(owner) {}
~RtcpRttStatsImpl() override = default;
private:
void OnRttUpdate(int64_t rtt) override {
RTC_DCHECK_RUN_ON(&process_thread_checker_);
// For video send streams (video/video_send_stream.cc), the RtpRtcp module
// is currently created on a transport worker TaskQueue and not the worker
// thread - which is what happens in other cases. We should probably fix
// that so that the call consistently comes in on the right thread.
owner_->OnRttUpdate(rtt);
}
int64_t LastProcessedRtt() const override {
RTC_DCHECK_RUN_ON(&process_thread_checker_);
return owner_->LastProcessedRttFromProcessThread();
// This call path shouldn't be used anymore. This impl is only for
// propagating the rtt from the RtpRtcp module, which does not call
// LastProcessedRtt(). Down the line we should consider removing
// LastProcessedRtt() and use the interface for event notifications only.
RTC_NOTREACHED() << "Legacy call path";
return 0;
}
CallStats* const owner_;
SequenceChecker process_thread_checker_;
} rtcp_rtt_stats_impl_{this};
Clock* const clock_;
@ -109,14 +112,8 @@ class CallStats {
// The last RTT in the statistics update (zero if there is no valid estimate).
int64_t max_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_);
// Accessed from two separate threads.
// |avg_rtt_ms_| may be read on the construction thread without a lock.
// |avg_rtt_ms_lock_| must be held elsewhere for reading.
// |avg_rtt_ms_lock_| must be held on the construction thread for writing.
int64_t avg_rtt_ms_;
// Protects |avg_rtt_ms_|.
rtc::CriticalSection avg_rtt_ms_lock_;
// Last reported average RTT value.
int64_t avg_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_);
// |sum_avg_rtt_ms_|, |num_avg_rtt_| and |time_of_first_rtt_ms_| are only used
// on the ProcessThread when running. When the Process Thread is not running,