Post stats updates in RtpSenderEgress to the worker.

On the way remove need for lock for
rtp_sequence_number_map_ and timestamp_offset_.

Change-Id: I21a5cbf6208620435a1a16fff68c33c0cb84f51d
Bug: webrtc:11581
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177424
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Magnus Flodman <mflodman@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31581}
This commit is contained in:
Tomas Gunnarsson 2020-06-29 13:56:15 +02:00 committed by Commit Bot
parent 6fb7004f33
commit 096c0b0921
7 changed files with 148 additions and 93 deletions

View File

@ -295,6 +295,7 @@ rtc_library("rtp_rtcp") {
"../../rtc_base:safe_minmax",
"../../rtc_base/experiments:field_trial_parser",
"../../rtc_base/synchronization:sequence_checker",
"../../rtc_base/task_utils:pending_task_safety_flag",
"../../rtc_base/task_utils:repeating_task",
"../../rtc_base/task_utils:to_queued_task",
"../../rtc_base/time:timestamp_extrapolator",

View File

@ -44,6 +44,12 @@ bool IsLegalRsidName(absl::string_view name) {
StreamDataCounters::StreamDataCounters() : first_packet_time_ms(-1) {}
RtpPacketCounter::RtpPacketCounter(const RtpPacket& packet)
: header_bytes(packet.headers_size()),
payload_bytes(packet.payload_size()),
padding_bytes(packet.padding_size()),
packets(1) {}
void RtpPacketCounter::AddPacket(const RtpPacket& packet) {
++packets;
header_bytes += packet.headers_size();

View File

@ -295,6 +295,8 @@ struct RtpPacketCounter {
RtpPacketCounter()
: header_bytes(0), payload_bytes(0), padding_bytes(0), packets(0) {}
explicit RtpPacketCounter(const RtpPacket& packet);
void Add(const RtpPacketCounter& other) {
header_bytes += other.header_bytes;
payload_bytes += other.payload_bytes;

View File

@ -26,6 +26,7 @@
#include "test/gtest.h"
#include "test/rtcp_packet_parser.h"
#include "test/rtp_header_parser.h"
#include "test/run_loop.h"
using ::testing::ElementsAre;
@ -198,6 +199,7 @@ class RtpRtcpImpl2Test : public ::testing::Test {
receiver_.transport_.SetRtpRtcpModule(sender_.impl_.get());
}
test::RunLoop loop_;
SimulatedClock clock_;
RtpRtcpModule sender_;
std::unique_ptr<RTPSenderVideo> sender_video_;
@ -586,6 +588,7 @@ TEST_F(RtpRtcpImpl2Test, StoresPacketInfoForSentPackets) {
packet.set_first_packet_of_frame(true);
packet.SetMarker(true);
sender_.impl_->TrySendPacket(&packet, pacing_info);
loop_.Flush();
std::vector<RtpSequenceNumberMap::Info> seqno_info =
sender_.impl_->GetSentRtpPacketInfos(std::vector<uint16_t>{1});
@ -610,6 +613,8 @@ TEST_F(RtpRtcpImpl2Test, StoresPacketInfoForSentPackets) {
packet.SetMarker(true);
sender_.impl_->TrySendPacket(&packet, pacing_info);
loop_.Flush();
seqno_info =
sender_.impl_->GetSentRtpPacketInfos(std::vector<uint16_t>{2, 3, 4});

View File

@ -19,6 +19,7 @@
#include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
#include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc {
namespace {
@ -89,6 +90,7 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
kRtpSequenceNumberMapMaxEntries)
: nullptr) {
RTC_DCHECK(worker_queue_);
pacer_checker_.Detach();
if (bitrate_callback_) {
update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
kUpdateInterval, [this]() {
@ -105,6 +107,7 @@ RtpSenderEgress::~RtpSenderEgress() {
void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK(packet);
const uint32_t packet_ssrc = packet->Ssrc();
@ -132,24 +135,22 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
#endif
}
PacketOptions options;
{
rtc::CritScope lock(&lock_);
options.included_in_allocation = force_part_of_allocation_;
if (need_rtp_packet_infos_ &&
packet->packet_type() == RtpPacketToSend::Type::kVideo) {
RTC_DCHECK(rtp_sequence_number_map_);
// Last packet of a frame, add it to sequence number info map.
const uint32_t timestamp = packet->Timestamp() - timestamp_offset_;
bool is_first_packet_of_frame = packet->is_first_packet_of_frame();
bool is_last_packet_of_frame = packet->Marker();
rtp_sequence_number_map_->InsertPacket(
packet->SequenceNumber(),
RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
is_last_packet_of_frame));
}
if (need_rtp_packet_infos_ &&
packet->packet_type() == RtpPacketToSend::Type::kVideo) {
worker_queue_->PostTask(ToQueuedTask(
task_safety_,
[this, packet_timestamp = packet->Timestamp(),
is_first_packet_of_frame = packet->is_first_packet_of_frame(),
is_last_packet_of_frame = packet->Marker(),
sequence_number = packet->SequenceNumber()]() {
RTC_DCHECK_RUN_ON(worker_queue_);
// Last packet of a frame, add it to sequence number info map.
const uint32_t timestamp = packet_timestamp - timestamp_offset_;
rtp_sequence_number_map_->InsertPacket(
sequence_number,
RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
is_last_packet_of_frame));
}));
}
// Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
@ -180,6 +181,12 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
packet->packet_type() == RtpPacketMediaType::kVideo;
PacketOptions options;
{
rtc::CritScope lock(&lock_);
options.included_in_allocation = force_part_of_allocation_;
}
// Downstream code actually uses this flag to distinguish between media and
// everything else.
options.is_retransmit = !is_media;
@ -212,11 +219,19 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
}
if (send_success) {
rtc::CritScope lock(&lock_);
// TODO(bugs.webrtc.org/11581): Update the stats on the worker thread
// (PostTask).
UpdateRtpStats(now_ms, *packet);
// TODO(tommi): Is this assuming is_media is true?
media_has_been_sent_ = true;
RTC_DCHECK(packet->packet_type().has_value());
RtpPacketMediaType packet_type = *packet->packet_type();
RtpPacketCounter counter(*packet);
size_t size = packet->size();
worker_queue_->PostTask(ToQueuedTask(
task_safety_, [this, now_ms, ssrc = packet->Ssrc(), packet_type,
counter = std::move(counter), size]() {
RTC_DCHECK_RUN_ON(worker_queue_);
UpdateRtpStats(now_ms, ssrc, packet_type, std::move(counter), size);
}));
}
}
@ -252,22 +267,23 @@ void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
}
bool RtpSenderEgress::MediaHasBeenSent() const {
rtc::CritScope lock(&lock_);
RTC_DCHECK_RUN_ON(&pacer_checker_);
return media_has_been_sent_;
}
void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
rtc::CritScope lock(&lock_);
RTC_DCHECK_RUN_ON(&pacer_checker_);
media_has_been_sent_ = media_sent;
}
void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
rtc::CritScope lock(&lock_);
RTC_DCHECK_RUN_ON(worker_queue_);
timestamp_offset_ = timestamp;
}
std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
rtc::ArrayView<const uint16_t> sequence_numbers) const {
RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(!sequence_numbers.empty());
if (!need_rtp_packet_infos_) {
return std::vector<RtpSequenceNumberMap::Info>();
@ -276,7 +292,6 @@ std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
std::vector<RtpSequenceNumberMap::Info> results;
results.reserve(sequence_numbers.size());
rtc::CritScope cs(&lock_);
for (uint16_t sequence_number : sequence_numbers) {
const auto& info = rtp_sequence_number_map_->Get(sequence_number);
if (!info) {
@ -445,41 +460,47 @@ bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
}
void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
const RtpPacketToSend& packet) {
// TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
// only touched on the worker thread.
StreamDataCounters* counters =
packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
uint32_t packet_ssrc,
RtpPacketMediaType packet_type,
RtpPacketCounter counter,
size_t packet_size) {
RTC_DCHECK_RUN_ON(worker_queue_);
if (counters->first_packet_time_ms == -1) {
counters->first_packet_time_ms = now_ms;
}
if (packet.packet_type() == RtpPacketMediaType::kForwardErrorCorrection) {
counters->fec.AddPacket(packet);
}
if (packet.packet_type() == RtpPacketMediaType::kRetransmission) {
counters->retransmitted.AddPacket(packet);
}
counters->transmitted.AddPacket(packet);
RTC_DCHECK(packet.packet_type().has_value());
// TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
// worker thread.
send_rates_[static_cast<size_t>(*packet.packet_type())].Update(packet.size(),
now_ms);
RtpSendRates send_rates;
{
rtc::CritScope lock(&lock_);
// TODO(bugs.webrtc.org/11581): These (stats related) stat callbacks should be
// issued on the worker thread.
if (rtp_stats_callback_) {
rtp_stats_callback_->DataCountersUpdated(*counters, packet.Ssrc());
// TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
// only touched on the worker thread.
StreamDataCounters* counters =
packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
if (counters->first_packet_time_ms == -1) {
counters->first_packet_time_ms = now_ms;
}
if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
counters->fec.Add(counter);
} else if (packet_type == RtpPacketMediaType::kRetransmission) {
counters->retransmitted.Add(counter);
}
counters->transmitted.Add(counter);
send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now_ms);
if (bitrate_callback_) {
send_rates = GetSendRatesLocked(now_ms);
}
if (rtp_stats_callback_) {
rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc);
}
}
// The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
// to the same object, so these callbacks could be consolidated into one.
if (bitrate_callback_) {
RtpSendRates send_rates = GetSendRatesLocked(now_ms);
bitrate_callback_->Notify(
send_rates.Sum().bps(),
send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);

View File

@ -28,6 +28,7 @@
#include "rtc_base/critical_section.h"
#include "rtc_base/rate_statistics.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread_annotations.h"
@ -103,13 +104,18 @@ class RtpSenderEgress {
bool SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options,
const PacedPacketInfo& pacing_info);
void UpdateRtpStats(int64_t now_ms, const RtpPacketToSend& packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void UpdateRtpStats(int64_t now_ms,
uint32_t packet_ssrc,
RtpPacketMediaType packet_type,
RtpPacketCounter counter,
size_t packet_size);
// Called on a timer, once a second, on the worker_queue_.
void PeriodicUpdate();
TaskQueueBase* const worker_queue_;
SequenceChecker pacer_checker_;
const uint32_t ssrc_;
const absl::optional<uint32_t> rtx_ssrc_;
const absl::optional<uint32_t> flexfec_ssrc_;
@ -129,9 +135,9 @@ class RtpSenderEgress {
BitrateStatisticsObserver* const bitrate_callback_;
rtc::CriticalSection lock_;
bool media_has_been_sent_ RTC_GUARDED_BY(lock_);
bool media_has_been_sent_ RTC_GUARDED_BY(pacer_checker_);
bool force_part_of_allocation_ RTC_GUARDED_BY(lock_);
uint32_t timestamp_offset_ RTC_GUARDED_BY(lock_);
uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_);
SendDelayMap send_delays_ RTC_GUARDED_BY(lock_);
SendDelayMap::const_iterator max_delay_it_ RTC_GUARDED_BY(lock_);
@ -148,9 +154,9 @@ class RtpSenderEgress {
// 2. Whether the packet was the first in its frame.
// 3. Whether the packet was the last in its frame.
const std::unique_ptr<RtpSequenceNumberMap> rtp_sequence_number_map_
RTC_GUARDED_BY(lock_);
RTC_GUARDED_BY(worker_queue_);
RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_);
ScopedTaskSafety task_safety_;
};
} // namespace webrtc

View File

@ -40,6 +40,7 @@
#include "test/gtest.h"
#include "test/mock_transport.h"
#include "test/rtp_header_parser.h"
#include "test/run_loop.h"
namespace webrtc {
@ -305,6 +306,7 @@ class RtpSenderTest : public ::testing::TestWithParam<TestConfig> {
rtp_sender()->SetTimestampOffset(0);
}
test::RunLoop loop_;
SimulatedClock fake_clock_;
NiceMock<MockRtcEventLog> mock_rtc_event_log_;
MockRtpPacketPacer mock_paced_sender_;
@ -1274,46 +1276,46 @@ TEST_P(RtpSenderTest, SendFlexfecPackets) {
uint16_t flexfec_seq_num;
RTPVideoHeader video_header;
std::unique_ptr<RtpPacketToSend> media_packet;
std::unique_ptr<RtpPacketToSend> fec_packet;
std::unique_ptr<RtpPacketToSend> media_packet;
std::unique_ptr<RtpPacketToSend> fec_packet;
EXPECT_CALL(mock_paced_sender_, EnqueuePackets)
.WillOnce([&](std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
for (auto& packet : packets) {
if (packet->packet_type() == RtpPacketMediaType::kVideo) {
EXPECT_EQ(packet->Ssrc(), kSsrc);
EXPECT_EQ(packet->SequenceNumber(), kSeqNum);
media_packet = std::move(packet);
} else {
EXPECT_EQ(packet->packet_type(),
RtpPacketMediaType::kForwardErrorCorrection);
EXPECT_EQ(packet->Ssrc(), kFlexFecSsrc);
fec_packet = std::move(packet);
}
EXPECT_CALL(mock_paced_sender_, EnqueuePackets)
.WillOnce([&](std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
for (auto& packet : packets) {
if (packet->packet_type() == RtpPacketMediaType::kVideo) {
EXPECT_EQ(packet->Ssrc(), kSsrc);
EXPECT_EQ(packet->SequenceNumber(), kSeqNum);
media_packet = std::move(packet);
} else {
EXPECT_EQ(packet->packet_type(),
RtpPacketMediaType::kForwardErrorCorrection);
EXPECT_EQ(packet->Ssrc(), kFlexFecSsrc);
fec_packet = std::move(packet);
}
});
}
});
video_header.frame_type = VideoFrameType::kVideoFrameKey;
EXPECT_TRUE(rtp_sender_video.SendVideo(
kMediaPayloadType, kCodecType, kTimestamp,
fake_clock_.TimeInMilliseconds(), kPayloadData, nullptr, video_header,
kDefaultExpectedRetransmissionTimeMs));
ASSERT_TRUE(media_packet != nullptr);
ASSERT_TRUE(fec_packet != nullptr);
video_header.frame_type = VideoFrameType::kVideoFrameKey;
EXPECT_TRUE(rtp_sender_video.SendVideo(
kMediaPayloadType, kCodecType, kTimestamp,
fake_clock_.TimeInMilliseconds(), kPayloadData, nullptr, video_header,
kDefaultExpectedRetransmissionTimeMs));
ASSERT_TRUE(media_packet != nullptr);
ASSERT_TRUE(fec_packet != nullptr);
flexfec_seq_num = fec_packet->SequenceNumber();
rtp_egress()->SendPacket(media_packet.get(), PacedPacketInfo());
rtp_egress()->SendPacket(fec_packet.get(), PacedPacketInfo());
flexfec_seq_num = fec_packet->SequenceNumber();
rtp_egress()->SendPacket(media_packet.get(), PacedPacketInfo());
rtp_egress()->SendPacket(fec_packet.get(), PacedPacketInfo());
ASSERT_EQ(2, transport_.packets_sent());
const RtpPacketReceived& sent_media_packet = transport_.sent_packets_[0];
EXPECT_EQ(kMediaPayloadType, sent_media_packet.PayloadType());
EXPECT_EQ(kSeqNum, sent_media_packet.SequenceNumber());
EXPECT_EQ(kSsrc, sent_media_packet.Ssrc());
const RtpPacketReceived& sent_flexfec_packet = transport_.sent_packets_[1];
EXPECT_EQ(kFlexfecPayloadType, sent_flexfec_packet.PayloadType());
EXPECT_EQ(flexfec_seq_num, sent_flexfec_packet.SequenceNumber());
EXPECT_EQ(kFlexFecSsrc, sent_flexfec_packet.Ssrc());
ASSERT_EQ(2, transport_.packets_sent());
const RtpPacketReceived& sent_media_packet = transport_.sent_packets_[0];
EXPECT_EQ(kMediaPayloadType, sent_media_packet.PayloadType());
EXPECT_EQ(kSeqNum, sent_media_packet.SequenceNumber());
EXPECT_EQ(kSsrc, sent_media_packet.Ssrc());
const RtpPacketReceived& sent_flexfec_packet = transport_.sent_packets_[1];
EXPECT_EQ(kFlexfecPayloadType, sent_flexfec_packet.PayloadType());
EXPECT_EQ(flexfec_seq_num, sent_flexfec_packet.SequenceNumber());
EXPECT_EQ(kFlexFecSsrc, sent_flexfec_packet.Ssrc());
}
TEST_P(RtpSenderTestWithoutPacer, SendFlexfecPackets) {
@ -1782,6 +1784,7 @@ TEST_P(RtpSenderTest, BitrateCallbacks) {
kPayloadType, kCodecType, 1234, 4321, payload, nullptr, video_header,
kDefaultExpectedRetransmissionTimeMs));
fake_clock_.AdvanceTimeMilliseconds(kPacketInterval);
loop_.Flush();
}
// We get one call for every stats updated, thus two calls since both the
@ -1818,6 +1821,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacks) {
ASSERT_TRUE(rtp_sender_video.SendVideo(kPayloadType, kCodecType, 1234, 4321,
payload, nullptr, video_header,
kDefaultExpectedRetransmissionTimeMs));
loop_.Flush();
StreamDataCounters expected;
expected.transmitted.payload_bytes = 6;
expected.transmitted.header_bytes = 12;
@ -1833,6 +1837,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacks) {
// Retransmit a frame.
uint16_t seqno = rtp_sender()->SequenceNumber() - 1;
rtp_sender()->ReSendPacket(seqno);
loop_.Flush();
expected.transmitted.payload_bytes = 12;
expected.transmitted.header_bytes = 24;
expected.transmitted.packets = 2;
@ -1844,6 +1849,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacks) {
// Send padding.
GenerateAndSendPadding(kMaxPaddingSize);
loop_.Flush();
expected.transmitted.payload_bytes = 12;
expected.transmitted.header_bytes = 36;
expected.transmitted.padding_bytes = kMaxPaddingSize;
@ -1886,6 +1892,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacksUlpfec) {
ASSERT_TRUE(rtp_sender_video.SendVideo(kPayloadType, kCodecType, 1234, 4321,
payload, nullptr, video_header,
kDefaultExpectedRetransmissionTimeMs));
loop_.Flush();
expected.transmitted.payload_bytes = 28;
expected.transmitted.header_bytes = 24;
expected.transmitted.packets = 2;
@ -1904,6 +1911,8 @@ TEST_P(RtpSenderTestWithoutPacer, BytesReportedCorrectly) {
GenerateAndSendPadding(1);
GenerateAndSendPadding(1);
loop_.Flush();
StreamDataCounters rtp_stats;
StreamDataCounters rtx_stats;
rtp_egress()->GetDataCounters(&rtp_stats, &rtx_stats);
@ -2146,6 +2155,9 @@ TEST_P(RtpSenderTest, SendPacketHandlesRetransmissionHistory) {
rtp_sender_context_->packet_history_.SetStorePacketsStatus(
RtpPacketHistory::StorageMode::kStoreAndCull, 10);
// Ignore calls to EnqueuePackets() for this test.
EXPECT_CALL(mock_paced_sender_, EnqueuePackets).WillRepeatedly(Return());
// Build a media packet and send it.
std::unique_ptr<RtpPacketToSend> packet =
BuildRtpPacket(kPayload, true, 0, fake_clock_.TimeInMilliseconds());
@ -2302,6 +2314,8 @@ TEST_P(RtpSenderTest, SendPacketUpdatesStats) {
OnSendPacket(3, capture_time_ms, kFlexFecSsrc));
rtp_egress()->SendPacket(fec_packet.get(), PacedPacketInfo());
loop_.Flush();
StreamDataCounters rtp_stats;
StreamDataCounters rtx_stats;
rtp_egress()->GetDataCounters(&rtp_stats, &rtx_stats);