diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index 70350f842d..7878229666 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -295,6 +295,7 @@ rtc_library("rtp_rtcp") { "../../rtc_base:safe_minmax", "../../rtc_base/experiments:field_trial_parser", "../../rtc_base/synchronization:sequence_checker", + "../../rtc_base/task_utils:pending_task_safety_flag", "../../rtc_base/task_utils:repeating_task", "../../rtc_base/task_utils:to_queued_task", "../../rtc_base/time:timestamp_extrapolator", diff --git a/modules/rtp_rtcp/include/rtp_rtcp_defines.cc b/modules/rtp_rtcp/include/rtp_rtcp_defines.cc index ca128e708a..5aa41fccb3 100644 --- a/modules/rtp_rtcp/include/rtp_rtcp_defines.cc +++ b/modules/rtp_rtcp/include/rtp_rtcp_defines.cc @@ -44,6 +44,12 @@ bool IsLegalRsidName(absl::string_view name) { StreamDataCounters::StreamDataCounters() : first_packet_time_ms(-1) {} +RtpPacketCounter::RtpPacketCounter(const RtpPacket& packet) + : header_bytes(packet.headers_size()), + payload_bytes(packet.payload_size()), + padding_bytes(packet.padding_size()), + packets(1) {} + void RtpPacketCounter::AddPacket(const RtpPacket& packet) { ++packets; header_bytes += packet.headers_size(); diff --git a/modules/rtp_rtcp/include/rtp_rtcp_defines.h b/modules/rtp_rtcp/include/rtp_rtcp_defines.h index c860b3d476..869f3d218a 100644 --- a/modules/rtp_rtcp/include/rtp_rtcp_defines.h +++ b/modules/rtp_rtcp/include/rtp_rtcp_defines.h @@ -295,6 +295,8 @@ struct RtpPacketCounter { RtpPacketCounter() : header_bytes(0), payload_bytes(0), padding_bytes(0), packets(0) {} + explicit RtpPacketCounter(const RtpPacket& packet); + void Add(const RtpPacketCounter& other) { header_bytes += other.header_bytes; payload_bytes += other.payload_bytes; diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc index 5861ae99ca..bcc57b34e8 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc @@ -26,6 +26,7 @@ #include "test/gtest.h" #include "test/rtcp_packet_parser.h" #include "test/rtp_header_parser.h" +#include "test/run_loop.h" using ::testing::ElementsAre; @@ -198,6 +199,7 @@ class RtpRtcpImpl2Test : public ::testing::Test { receiver_.transport_.SetRtpRtcpModule(sender_.impl_.get()); } + test::RunLoop loop_; SimulatedClock clock_; RtpRtcpModule sender_; std::unique_ptr sender_video_; @@ -586,6 +588,7 @@ TEST_F(RtpRtcpImpl2Test, StoresPacketInfoForSentPackets) { packet.set_first_packet_of_frame(true); packet.SetMarker(true); sender_.impl_->TrySendPacket(&packet, pacing_info); + loop_.Flush(); std::vector seqno_info = sender_.impl_->GetSentRtpPacketInfos(std::vector{1}); @@ -610,6 +613,8 @@ TEST_F(RtpRtcpImpl2Test, StoresPacketInfoForSentPackets) { packet.SetMarker(true); sender_.impl_->TrySendPacket(&packet, pacing_info); + loop_.Flush(); + seqno_info = sender_.impl_->GetSentRtpPacketInfos(std::vector{2, 3, 4}); diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc index eeb910401d..50a997af63 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc @@ -19,6 +19,7 @@ #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h" #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" namespace webrtc { namespace { @@ -89,6 +90,7 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, kRtpSequenceNumberMapMaxEntries) : nullptr) { RTC_DCHECK(worker_queue_); + pacer_checker_.Detach(); if (bitrate_callback_) { update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_, kUpdateInterval, [this]() { @@ -105,6 +107,7 @@ RtpSenderEgress::~RtpSenderEgress() { void RtpSenderEgress::SendPacket(RtpPacketToSend* packet, const PacedPacketInfo& pacing_info) { + RTC_DCHECK_RUN_ON(&pacer_checker_); RTC_DCHECK(packet); const uint32_t packet_ssrc = packet->Ssrc(); @@ -132,24 +135,22 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet, #endif } - PacketOptions options; - { - rtc::CritScope lock(&lock_); - options.included_in_allocation = force_part_of_allocation_; - - if (need_rtp_packet_infos_ && - packet->packet_type() == RtpPacketToSend::Type::kVideo) { - RTC_DCHECK(rtp_sequence_number_map_); - // Last packet of a frame, add it to sequence number info map. - const uint32_t timestamp = packet->Timestamp() - timestamp_offset_; - bool is_first_packet_of_frame = packet->is_first_packet_of_frame(); - bool is_last_packet_of_frame = packet->Marker(); - - rtp_sequence_number_map_->InsertPacket( - packet->SequenceNumber(), - RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame, - is_last_packet_of_frame)); - } + if (need_rtp_packet_infos_ && + packet->packet_type() == RtpPacketToSend::Type::kVideo) { + worker_queue_->PostTask(ToQueuedTask( + task_safety_, + [this, packet_timestamp = packet->Timestamp(), + is_first_packet_of_frame = packet->is_first_packet_of_frame(), + is_last_packet_of_frame = packet->Marker(), + sequence_number = packet->SequenceNumber()]() { + RTC_DCHECK_RUN_ON(worker_queue_); + // Last packet of a frame, add it to sequence number info map. + const uint32_t timestamp = packet_timestamp - timestamp_offset_; + rtp_sequence_number_map_->InsertPacket( + sequence_number, + RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame, + is_last_packet_of_frame)); + })); } // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after @@ -180,6 +181,12 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet, const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio || packet->packet_type() == RtpPacketMediaType::kVideo; + PacketOptions options; + { + rtc::CritScope lock(&lock_); + options.included_in_allocation = force_part_of_allocation_; + } + // Downstream code actually uses this flag to distinguish between media and // everything else. options.is_retransmit = !is_media; @@ -212,11 +219,19 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet, } if (send_success) { - rtc::CritScope lock(&lock_); - // TODO(bugs.webrtc.org/11581): Update the stats on the worker thread - // (PostTask). - UpdateRtpStats(now_ms, *packet); + // TODO(tommi): Is this assuming is_media is true? media_has_been_sent_ = true; + + RTC_DCHECK(packet->packet_type().has_value()); + RtpPacketMediaType packet_type = *packet->packet_type(); + RtpPacketCounter counter(*packet); + size_t size = packet->size(); + worker_queue_->PostTask(ToQueuedTask( + task_safety_, [this, now_ms, ssrc = packet->Ssrc(), packet_type, + counter = std::move(counter), size]() { + RTC_DCHECK_RUN_ON(worker_queue_); + UpdateRtpStats(now_ms, ssrc, packet_type, std::move(counter), size); + })); } } @@ -252,22 +267,23 @@ void RtpSenderEgress::ForceIncludeSendPacketsInAllocation( } bool RtpSenderEgress::MediaHasBeenSent() const { - rtc::CritScope lock(&lock_); + RTC_DCHECK_RUN_ON(&pacer_checker_); return media_has_been_sent_; } void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) { - rtc::CritScope lock(&lock_); + RTC_DCHECK_RUN_ON(&pacer_checker_); media_has_been_sent_ = media_sent; } void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) { - rtc::CritScope lock(&lock_); + RTC_DCHECK_RUN_ON(worker_queue_); timestamp_offset_ = timestamp; } std::vector RtpSenderEgress::GetSentRtpPacketInfos( rtc::ArrayView sequence_numbers) const { + RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(!sequence_numbers.empty()); if (!need_rtp_packet_infos_) { return std::vector(); @@ -276,7 +292,6 @@ std::vector RtpSenderEgress::GetSentRtpPacketInfos( std::vector results; results.reserve(sequence_numbers.size()); - rtc::CritScope cs(&lock_); for (uint16_t sequence_number : sequence_numbers) { const auto& info = rtp_sequence_number_map_->Get(sequence_number); if (!info) { @@ -445,41 +460,47 @@ bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet, } void RtpSenderEgress::UpdateRtpStats(int64_t now_ms, - const RtpPacketToSend& packet) { - // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are - // only touched on the worker thread. - StreamDataCounters* counters = - packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; + uint32_t packet_ssrc, + RtpPacketMediaType packet_type, + RtpPacketCounter counter, + size_t packet_size) { + RTC_DCHECK_RUN_ON(worker_queue_); - if (counters->first_packet_time_ms == -1) { - counters->first_packet_time_ms = now_ms; - } - - if (packet.packet_type() == RtpPacketMediaType::kForwardErrorCorrection) { - counters->fec.AddPacket(packet); - } - - if (packet.packet_type() == RtpPacketMediaType::kRetransmission) { - counters->retransmitted.AddPacket(packet); - } - counters->transmitted.AddPacket(packet); - - RTC_DCHECK(packet.packet_type().has_value()); // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the // worker thread. - send_rates_[static_cast(*packet.packet_type())].Update(packet.size(), - now_ms); + RtpSendRates send_rates; + { + rtc::CritScope lock(&lock_); - // TODO(bugs.webrtc.org/11581): These (stats related) stat callbacks should be - // issued on the worker thread. - if (rtp_stats_callback_) { - rtp_stats_callback_->DataCountersUpdated(*counters, packet.Ssrc()); + // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are + // only touched on the worker thread. + StreamDataCounters* counters = + packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; + + if (counters->first_packet_time_ms == -1) { + counters->first_packet_time_ms = now_ms; + } + + if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { + counters->fec.Add(counter); + } else if (packet_type == RtpPacketMediaType::kRetransmission) { + counters->retransmitted.Add(counter); + } + counters->transmitted.Add(counter); + + send_rates_[static_cast(packet_type)].Update(packet_size, now_ms); + if (bitrate_callback_) { + send_rates = GetSendRatesLocked(now_ms); + } + + if (rtp_stats_callback_) { + rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc); + } } // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point // to the same object, so these callbacks could be consolidated into one. if (bitrate_callback_) { - RtpSendRates send_rates = GetSendRatesLocked(now_ms); bitrate_callback_->Notify( send_rates.Sum().bps(), send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h index db29cd51da..1ef71d72a7 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.h +++ b/modules/rtp_rtcp/source/rtp_sender_egress.h @@ -28,6 +28,7 @@ #include "rtc_base/critical_section.h" #include "rtc_base/rate_statistics.h" #include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" @@ -103,13 +104,18 @@ class RtpSenderEgress { bool SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options, const PacedPacketInfo& pacing_info); - void UpdateRtpStats(int64_t now_ms, const RtpPacketToSend& packet) - RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + + void UpdateRtpStats(int64_t now_ms, + uint32_t packet_ssrc, + RtpPacketMediaType packet_type, + RtpPacketCounter counter, + size_t packet_size); // Called on a timer, once a second, on the worker_queue_. void PeriodicUpdate(); TaskQueueBase* const worker_queue_; + SequenceChecker pacer_checker_; const uint32_t ssrc_; const absl::optional rtx_ssrc_; const absl::optional flexfec_ssrc_; @@ -129,9 +135,9 @@ class RtpSenderEgress { BitrateStatisticsObserver* const bitrate_callback_; rtc::CriticalSection lock_; - bool media_has_been_sent_ RTC_GUARDED_BY(lock_); + bool media_has_been_sent_ RTC_GUARDED_BY(pacer_checker_); bool force_part_of_allocation_ RTC_GUARDED_BY(lock_); - uint32_t timestamp_offset_ RTC_GUARDED_BY(lock_); + uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_); SendDelayMap send_delays_ RTC_GUARDED_BY(lock_); SendDelayMap::const_iterator max_delay_it_ RTC_GUARDED_BY(lock_); @@ -148,9 +154,9 @@ class RtpSenderEgress { // 2. Whether the packet was the first in its frame. // 3. Whether the packet was the last in its frame. const std::unique_ptr rtp_sequence_number_map_ - RTC_GUARDED_BY(lock_); - + RTC_GUARDED_BY(worker_queue_); RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_); + ScopedTaskSafety task_safety_; }; } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_unittest.cc index 6f0bbbc26c..dbd474cbe1 100644 --- a/modules/rtp_rtcp/source/rtp_sender_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_sender_unittest.cc @@ -40,6 +40,7 @@ #include "test/gtest.h" #include "test/mock_transport.h" #include "test/rtp_header_parser.h" +#include "test/run_loop.h" namespace webrtc { @@ -305,6 +306,7 @@ class RtpSenderTest : public ::testing::TestWithParam { rtp_sender()->SetTimestampOffset(0); } + test::RunLoop loop_; SimulatedClock fake_clock_; NiceMock mock_rtc_event_log_; MockRtpPacketPacer mock_paced_sender_; @@ -1274,46 +1276,46 @@ TEST_P(RtpSenderTest, SendFlexfecPackets) { uint16_t flexfec_seq_num; RTPVideoHeader video_header; - std::unique_ptr media_packet; - std::unique_ptr fec_packet; + std::unique_ptr media_packet; + std::unique_ptr fec_packet; - EXPECT_CALL(mock_paced_sender_, EnqueuePackets) - .WillOnce([&](std::vector> packets) { - for (auto& packet : packets) { - if (packet->packet_type() == RtpPacketMediaType::kVideo) { - EXPECT_EQ(packet->Ssrc(), kSsrc); - EXPECT_EQ(packet->SequenceNumber(), kSeqNum); - media_packet = std::move(packet); - } else { - EXPECT_EQ(packet->packet_type(), - RtpPacketMediaType::kForwardErrorCorrection); - EXPECT_EQ(packet->Ssrc(), kFlexFecSsrc); - fec_packet = std::move(packet); - } + EXPECT_CALL(mock_paced_sender_, EnqueuePackets) + .WillOnce([&](std::vector> packets) { + for (auto& packet : packets) { + if (packet->packet_type() == RtpPacketMediaType::kVideo) { + EXPECT_EQ(packet->Ssrc(), kSsrc); + EXPECT_EQ(packet->SequenceNumber(), kSeqNum); + media_packet = std::move(packet); + } else { + EXPECT_EQ(packet->packet_type(), + RtpPacketMediaType::kForwardErrorCorrection); + EXPECT_EQ(packet->Ssrc(), kFlexFecSsrc); + fec_packet = std::move(packet); } - }); + } + }); - video_header.frame_type = VideoFrameType::kVideoFrameKey; - EXPECT_TRUE(rtp_sender_video.SendVideo( - kMediaPayloadType, kCodecType, kTimestamp, - fake_clock_.TimeInMilliseconds(), kPayloadData, nullptr, video_header, - kDefaultExpectedRetransmissionTimeMs)); - ASSERT_TRUE(media_packet != nullptr); - ASSERT_TRUE(fec_packet != nullptr); + video_header.frame_type = VideoFrameType::kVideoFrameKey; + EXPECT_TRUE(rtp_sender_video.SendVideo( + kMediaPayloadType, kCodecType, kTimestamp, + fake_clock_.TimeInMilliseconds(), kPayloadData, nullptr, video_header, + kDefaultExpectedRetransmissionTimeMs)); + ASSERT_TRUE(media_packet != nullptr); + ASSERT_TRUE(fec_packet != nullptr); - flexfec_seq_num = fec_packet->SequenceNumber(); - rtp_egress()->SendPacket(media_packet.get(), PacedPacketInfo()); - rtp_egress()->SendPacket(fec_packet.get(), PacedPacketInfo()); + flexfec_seq_num = fec_packet->SequenceNumber(); + rtp_egress()->SendPacket(media_packet.get(), PacedPacketInfo()); + rtp_egress()->SendPacket(fec_packet.get(), PacedPacketInfo()); - ASSERT_EQ(2, transport_.packets_sent()); - const RtpPacketReceived& sent_media_packet = transport_.sent_packets_[0]; - EXPECT_EQ(kMediaPayloadType, sent_media_packet.PayloadType()); - EXPECT_EQ(kSeqNum, sent_media_packet.SequenceNumber()); - EXPECT_EQ(kSsrc, sent_media_packet.Ssrc()); - const RtpPacketReceived& sent_flexfec_packet = transport_.sent_packets_[1]; - EXPECT_EQ(kFlexfecPayloadType, sent_flexfec_packet.PayloadType()); - EXPECT_EQ(flexfec_seq_num, sent_flexfec_packet.SequenceNumber()); - EXPECT_EQ(kFlexFecSsrc, sent_flexfec_packet.Ssrc()); + ASSERT_EQ(2, transport_.packets_sent()); + const RtpPacketReceived& sent_media_packet = transport_.sent_packets_[0]; + EXPECT_EQ(kMediaPayloadType, sent_media_packet.PayloadType()); + EXPECT_EQ(kSeqNum, sent_media_packet.SequenceNumber()); + EXPECT_EQ(kSsrc, sent_media_packet.Ssrc()); + const RtpPacketReceived& sent_flexfec_packet = transport_.sent_packets_[1]; + EXPECT_EQ(kFlexfecPayloadType, sent_flexfec_packet.PayloadType()); + EXPECT_EQ(flexfec_seq_num, sent_flexfec_packet.SequenceNumber()); + EXPECT_EQ(kFlexFecSsrc, sent_flexfec_packet.Ssrc()); } TEST_P(RtpSenderTestWithoutPacer, SendFlexfecPackets) { @@ -1782,6 +1784,7 @@ TEST_P(RtpSenderTest, BitrateCallbacks) { kPayloadType, kCodecType, 1234, 4321, payload, nullptr, video_header, kDefaultExpectedRetransmissionTimeMs)); fake_clock_.AdvanceTimeMilliseconds(kPacketInterval); + loop_.Flush(); } // We get one call for every stats updated, thus two calls since both the @@ -1818,6 +1821,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacks) { ASSERT_TRUE(rtp_sender_video.SendVideo(kPayloadType, kCodecType, 1234, 4321, payload, nullptr, video_header, kDefaultExpectedRetransmissionTimeMs)); + loop_.Flush(); StreamDataCounters expected; expected.transmitted.payload_bytes = 6; expected.transmitted.header_bytes = 12; @@ -1833,6 +1837,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacks) { // Retransmit a frame. uint16_t seqno = rtp_sender()->SequenceNumber() - 1; rtp_sender()->ReSendPacket(seqno); + loop_.Flush(); expected.transmitted.payload_bytes = 12; expected.transmitted.header_bytes = 24; expected.transmitted.packets = 2; @@ -1844,6 +1849,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacks) { // Send padding. GenerateAndSendPadding(kMaxPaddingSize); + loop_.Flush(); expected.transmitted.payload_bytes = 12; expected.transmitted.header_bytes = 36; expected.transmitted.padding_bytes = kMaxPaddingSize; @@ -1886,6 +1892,7 @@ TEST_P(RtpSenderTestWithoutPacer, StreamDataCountersCallbacksUlpfec) { ASSERT_TRUE(rtp_sender_video.SendVideo(kPayloadType, kCodecType, 1234, 4321, payload, nullptr, video_header, kDefaultExpectedRetransmissionTimeMs)); + loop_.Flush(); expected.transmitted.payload_bytes = 28; expected.transmitted.header_bytes = 24; expected.transmitted.packets = 2; @@ -1904,6 +1911,8 @@ TEST_P(RtpSenderTestWithoutPacer, BytesReportedCorrectly) { GenerateAndSendPadding(1); GenerateAndSendPadding(1); + loop_.Flush(); + StreamDataCounters rtp_stats; StreamDataCounters rtx_stats; rtp_egress()->GetDataCounters(&rtp_stats, &rtx_stats); @@ -2146,6 +2155,9 @@ TEST_P(RtpSenderTest, SendPacketHandlesRetransmissionHistory) { rtp_sender_context_->packet_history_.SetStorePacketsStatus( RtpPacketHistory::StorageMode::kStoreAndCull, 10); + // Ignore calls to EnqueuePackets() for this test. + EXPECT_CALL(mock_paced_sender_, EnqueuePackets).WillRepeatedly(Return()); + // Build a media packet and send it. std::unique_ptr packet = BuildRtpPacket(kPayload, true, 0, fake_clock_.TimeInMilliseconds()); @@ -2302,6 +2314,8 @@ TEST_P(RtpSenderTest, SendPacketUpdatesStats) { OnSendPacket(3, capture_time_ms, kFlexFecSsrc)); rtp_egress()->SendPacket(fec_packet.get(), PacedPacketInfo()); + loop_.Flush(); + StreamDataCounters rtp_stats; StreamDataCounters rtx_stats; rtp_egress()->GetDataCounters(&rtp_stats, &rtx_stats);