Reduce locking when collecting receive statistic

BUG=None

Review-Url: https://codereview.webrtc.org/2997803002
Cr-Commit-Position: refs/heads/master@{#19336}
This commit is contained in:
danilchap 2017-08-14 05:51:02 -07:00 committed by Commit Bot
parent 0ba43b5a20
commit ec86be0962
2 changed files with 34 additions and 47 deletions

View File

@ -29,13 +29,14 @@ const int64_t kStatisticsProcessIntervalMs = 1000;
StreamStatistician::~StreamStatistician() {}
StreamStatisticianImpl::StreamStatisticianImpl(
uint32_t ssrc,
Clock* clock,
RtcpStatisticsCallback* rtcp_callback,
StreamDataCountersCallback* rtp_callback)
: clock_(clock),
: ssrc_(ssrc),
clock_(clock),
incoming_bitrate_(kStatisticsProcessIntervalMs,
RateStatistics::kBpsScale),
ssrc_(0),
max_reordering_threshold_(kDefaultMaxReorderingThreshold),
jitter_q4_(0),
cumulative_loss_(0),
@ -56,16 +57,17 @@ StreamStatisticianImpl::StreamStatisticianImpl(
void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
size_t packet_length,
bool retransmitted) {
UpdateCounters(header, packet_length, retransmitted);
NotifyRtpCallback();
auto counters = UpdateCounters(header, packet_length, retransmitted);
rtp_callback_->DataCountersUpdated(counters, ssrc_);
}
void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
size_t packet_length,
bool retransmitted) {
StreamDataCounters StreamStatisticianImpl::UpdateCounters(
const RTPHeader& header,
size_t packet_length,
bool retransmitted) {
rtc::CritScope cs(&stream_lock_);
bool in_order = InOrderPacketInternal(header.sequenceNumber);
ssrc_ = header.ssrc;
RTC_DCHECK_EQ(ssrc_, header.ssrc);
incoming_bitrate_.Update(packet_length, clock_->TimeInMilliseconds());
receive_counters_.transmitted.AddPacket(packet_length, header);
if (!in_order && retransmitted) {
@ -109,6 +111,7 @@ void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
// Our measured overhead. Filter from RFC 5104 4.2.1.2:
// avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
return receive_counters_;
}
void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
@ -150,35 +153,15 @@ void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
}
}
void StreamStatisticianImpl::NotifyRtpCallback() {
StreamDataCounters data;
uint32_t ssrc;
{
rtc::CritScope cs(&stream_lock_);
data = receive_counters_;
ssrc = ssrc_;
}
rtp_callback_->DataCountersUpdated(data, ssrc);
}
void StreamStatisticianImpl::NotifyRtcpCallback() {
RtcpStatistics data;
uint32_t ssrc;
{
rtc::CritScope cs(&stream_lock_);
data = last_reported_statistics_;
ssrc = ssrc_;
}
rtcp_callback_->StatisticsUpdated(data, ssrc);
}
void StreamStatisticianImpl::FecPacketReceived(const RTPHeader& header,
size_t packet_length) {
StreamDataCounters counters;
{
rtc::CritScope cs(&stream_lock_);
receive_counters_.fec.AddPacket(packet_length, header);
counters = receive_counters_;
}
NotifyRtpCallback();
rtp_callback_->DataCountersUpdated(counters, ssrc_);
}
void StreamStatisticianImpl::SetMaxReorderingThreshold(
@ -210,8 +193,7 @@ bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
*statistics = CalculateRtcpStatistics();
}
NotifyRtcpCallback();
rtcp_callback_->StatisticsUpdated(*statistics, ssrc_);
return true;
}
@ -402,7 +384,7 @@ void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
if (it != statisticians_.end()) {
impl = it->second;
} else {
impl = new StreamStatisticianImpl(clock_, this, this);
impl = new StreamStatisticianImpl(header.ssrc, clock_, this, this);
statisticians_[header.ssrc] = impl;
}
}
@ -415,12 +397,16 @@ void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
void ReceiveStatisticsImpl::FecPacketReceived(const RTPHeader& header,
size_t packet_length) {
rtc::CritScope cs(&receive_statistics_lock_);
StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
// Ignore FEC if it is the first packet.
if (it != statisticians_.end()) {
it->second->FecPacketReceived(header, packet_length);
StreamStatisticianImpl* impl;
{
rtc::CritScope cs(&receive_statistics_lock_);
StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
// Ignore FEC if it is the first packet.
if (it == statisticians_.end())
return;
impl = it->second;
}
impl->FecPacketReceived(header, packet_length);
}
StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {

View File

@ -25,7 +25,8 @@ namespace webrtc {
class StreamStatisticianImpl : public StreamStatistician {
public:
StreamStatisticianImpl(Clock* clock,
StreamStatisticianImpl(uint32_t ssrc,
Clock* clock,
RtcpStatisticsCallback* rtcp_callback,
StreamDataCountersCallback* rtp_callback);
virtual ~StreamStatisticianImpl() {}
@ -49,18 +50,17 @@ class StreamStatisticianImpl : public StreamStatistician {
private:
bool InOrderPacketInternal(uint16_t sequence_number) const;
RtcpStatistics CalculateRtcpStatistics();
RtcpStatistics CalculateRtcpStatistics()
EXCLUSIVE_LOCKS_REQUIRED(stream_lock_);
void UpdateJitter(const RTPHeader& header, NtpTime receive_time);
void UpdateCounters(const RTPHeader& rtp_header,
size_t packet_length,
bool retransmitted);
void NotifyRtpCallback() LOCKS_EXCLUDED(stream_lock_);
void NotifyRtcpCallback() LOCKS_EXCLUDED(stream_lock_);
StreamDataCounters UpdateCounters(const RTPHeader& rtp_header,
size_t packet_length,
bool retransmitted);
const uint32_t ssrc_;
Clock* const clock_;
rtc::CriticalSection stream_lock_;
RateStatistics incoming_bitrate_;
uint32_t ssrc_;
int max_reordering_threshold_; // In number of packets or sequence numbers.
// Stats on received RTP packets.
@ -86,6 +86,7 @@ class StreamStatisticianImpl : public StreamStatistician {
uint16_t last_report_seq_max_;
RtcpStatistics last_reported_statistics_;
// stream_lock_ shouldn't be held when calling callbacks.
RtcpStatisticsCallback* const rtcp_callback_;
StreamDataCountersCallback* const rtp_callback_;
};