From 4da304407c993ccfc829bfe3f338b66a1f6c1a31 Mon Sep 17 00:00:00 2001 From: michaelt Date: Thu, 17 Nov 2016 01:38:43 -0800 Subject: [PATCH] Add overhead per packet observer to the rtp_sender. BUG=webrtc:6638 Review-Url: https://codereview.webrtc.org/2495553002 Cr-Commit-Position: refs/heads/master@{#15124} --- webrtc/common_types.h | 8 ++ webrtc/modules/rtp_rtcp/include/rtp_rtcp.h | 2 + .../modules/rtp_rtcp/source/rtp_rtcp_impl.cc | 4 +- webrtc/modules/rtp_rtcp/source/rtp_sender.cc | 82 ++++++++++++----- webrtc/modules/rtp_rtcp/source/rtp_sender.h | 15 ++- .../rtp_rtcp/source/rtp_sender_unittest.cc | 91 ++++++++++++++----- 6 files changed, 158 insertions(+), 44 deletions(-) diff --git a/webrtc/common_types.h b/webrtc/common_types.h index 803f2c2632..303c596943 100644 --- a/webrtc/common_types.h +++ b/webrtc/common_types.h @@ -299,6 +299,14 @@ class SendPacketObserver { uint32_t ssrc) = 0; }; +// Callback, used to notify an observer when the overhead per packet +// has changed. +class OverheadObserver { + public: + virtual ~OverheadObserver() = default; + virtual void OnOverheadChanged(size_t overhead_bytes_per_packet) = 0; +}; + // ================================================================== // Voice specific types // ================================================================== diff --git a/webrtc/modules/rtp_rtcp/include/rtp_rtcp.h b/webrtc/modules/rtp_rtcp/include/rtp_rtcp.h index 830d895774..2812ad9890 100644 --- a/webrtc/modules/rtp_rtcp/include/rtp_rtcp.h +++ b/webrtc/modules/rtp_rtcp/include/rtp_rtcp.h @@ -27,6 +27,7 @@ namespace webrtc { // Forward declarations. +class OverheadObserver; class RateLimiter; class ReceiveStatistics; class RemoteBitrateEstimator; @@ -89,6 +90,7 @@ class RtpRtcp : public Module { RtcEventLog* event_log = nullptr; SendPacketObserver* send_packet_observer = nullptr; RateLimiter* retransmission_rate_limiter = nullptr; + OverheadObserver* overhead_observer = nullptr; private: RTC_DISALLOW_COPY_AND_ASSIGN(Configuration); diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc index 0a99f839d3..339e4f0d56 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc @@ -82,7 +82,8 @@ ModuleRtpRtcpImpl::ModuleRtpRtcpImpl(const Configuration& configuration) configuration.send_side_delay_observer, configuration.event_log, configuration.send_packet_observer, - configuration.retransmission_rate_limiter), + configuration.retransmission_rate_limiter, + configuration.overhead_observer), rtcp_sender_(configuration.audio, configuration.clock, configuration.receive_statistics, @@ -460,6 +461,7 @@ void ModuleRtpRtcpImpl::SetTransportOverhead( RTC_DCHECK_LT(transport_overhead_per_packet, mtu); size_t max_payload_length = mtu - transport_overhead_per_packet; packet_overhead_ = transport_overhead_per_packet; + rtp_sender_.SetTransportOverhead(packet_overhead_); rtcp_sender_.SetMaxPayloadLength(max_payload_length); rtp_sender_.SetMaxPayloadLength(max_payload_length); } diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc index fbaa544ac8..71ee0b6002 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc @@ -77,7 +77,8 @@ RTPSender::RTPSender( SendSideDelayObserver* send_side_delay_observer, RtcEventLog* event_log, SendPacketObserver* send_packet_observer, - RateLimiter* retransmission_rate_limiter) + RateLimiter* retransmission_rate_limiter, + OverheadObserver* overhead_observer) : clock_(clock), // TODO(holmer): Remove this conversion? clock_delta_ms_(clock_->TimeInMilliseconds() - rtc::TimeMillis()), @@ -119,7 +120,10 @@ RTPSender::RTPSender( last_packet_marker_bit_(false), csrcs_(), rtx_(kRtxOff), - retransmission_rate_limiter_(retransmission_rate_limiter) { + transport_overhead_bytes_per_packet_(0), + rtp_overhead_bytes_per_packet_(0), + retransmission_rate_limiter_(retransmission_rate_limiter), + overhead_observer_(overhead_observer) { ssrc_ = ssrc_db_->CreateSSRC(); RTC_DCHECK(ssrc_ != 0); ssrc_rtx_ = ssrc_db_->CreateSSRC(); @@ -571,18 +575,15 @@ size_t RTPSender::DeprecatedSendPadData(size_t bytes, kTimestampTicksPerMs * (now_ms - capture_time_ms)); } padding_packet.SetExtension(now_ms); - PacketOptions options; - bool has_transport_seq_no = + bool has_transport_seq_num = UpdateTransportSequenceNumber(&padding_packet, &options.packet_id); - padding_packet.SetPadding(padding_bytes_in_packet, &random_); - if (has_transport_seq_no && transport_feedback_observer_) - transport_feedback_observer_->AddPacket( - options.packet_id, - padding_packet.payload_size() + padding_packet.padding_size(), - probe_cluster_id); + if (has_transport_seq_num) { + AddPacketToTransportFeedback(options.packet_id, padding_packet, + probe_cluster_id); + } if (!SendPacketToNetwork(padding_packet, options)) break; @@ -640,6 +641,7 @@ bool RTPSender::SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options) { int bytes_sent = -1; if (transport_) { + UpdateRtpOverhead(packet); bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options) ? static_cast(packet.size()) : -1; @@ -755,12 +757,9 @@ bool RTPSender::PrepareAndSendPacket(std::unique_ptr packet, packet_to_send->SetExtension(now_ms); PacketOptions options; - if (UpdateTransportSequenceNumber(packet_to_send, &options.packet_id) && - transport_feedback_observer_) { - transport_feedback_observer_->AddPacket( - options.packet_id, - packet_to_send->payload_size() + packet_to_send->padding_size(), - probe_cluster_id); + if (UpdateTransportSequenceNumber(packet_to_send, &options.packet_id)) { + AddPacketToTransportFeedback(options.packet_id, *packet_to_send, + probe_cluster_id); } if (!is_retransmit && !send_over_rtx) { @@ -889,11 +888,9 @@ bool RTPSender::SendToNetwork(std::unique_ptr packet, } PacketOptions options; - if (UpdateTransportSequenceNumber(packet.get(), &options.packet_id) && - transport_feedback_observer_) { - transport_feedback_observer_->AddPacket( - options.packet_id, packet->payload_size() + packet->padding_size(), - PacketInfo::kNotAProbe); + if (UpdateTransportSequenceNumber(packet.get(), &options.packet_id)) { + AddPacketToTransportFeedback(options.packet_id, *packet.get(), + PacketInfo::kNotAProbe); } UpdateDelayStatistics(packet->capture_time_ms(), now_ms); @@ -1280,4 +1277,47 @@ RtpState RTPSender::GetRtxRtpState() const { return state; } +void RTPSender::SetTransportOverhead(int transport_overhead) { + if (!overhead_observer_) + return; + size_t overhead_bytes_per_packet = 0; + { + rtc::CritScope lock(&send_critsect_); + if (transport_overhead_bytes_per_packet_ == + static_cast(transport_overhead)) { + return; + } + transport_overhead_bytes_per_packet_ = transport_overhead; + overhead_bytes_per_packet = + rtp_overhead_bytes_per_packet_ + transport_overhead_bytes_per_packet_; + } + overhead_observer_->OnOverheadChanged(overhead_bytes_per_packet); +} + +void RTPSender::AddPacketToTransportFeedback(uint16_t packet_id, + const RtpPacketToSend& packet, + int probe_cluster_id) { + if (transport_feedback_observer_) { + transport_feedback_observer_->AddPacket( + packet_id, packet.payload_size() + packet.padding_size(), + probe_cluster_id); + } +} + +void RTPSender::UpdateRtpOverhead(const RtpPacketToSend& packet) { + if (!overhead_observer_) + return; + size_t overhead_bytes_per_packet = 0; + { + rtc::CritScope lock(&send_critsect_); + if (rtp_overhead_bytes_per_packet_ == packet.headers_size()) { + return; + } + rtp_overhead_bytes_per_packet_ = packet.headers_size(); + overhead_bytes_per_packet = + rtp_overhead_bytes_per_packet_ + transport_overhead_bytes_per_packet_; + } + overhead_observer_->OnOverheadChanged(overhead_bytes_per_packet); +} + } // namespace webrtc diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.h b/webrtc/modules/rtp_rtcp/source/rtp_sender.h index 6d83ae54dc..758e4d3238 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.h +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.h @@ -36,6 +36,7 @@ namespace webrtc { +class OverheadObserver; class RateLimiter; class RtcEventLog; class RtpPacketToSend; @@ -58,7 +59,8 @@ class RTPSender { SendSideDelayObserver* send_side_delay_observer, RtcEventLog* event_log, SendPacketObserver* send_packet_observer, - RateLimiter* nack_rate_limiter); + RateLimiter* nack_rate_limiter, + OverheadObserver* overhead_observer); ~RTPSender(); @@ -214,6 +216,8 @@ class RTPSender { void SetRtxRtpState(const RtpState& rtp_state); RtpState GetRtxRtpState() const; + void SetTransportOverhead(int transport_overhead); + protected: int32_t CheckPayloadType(int8_t payload_type, RtpVideoCodecTypes* video_type); @@ -259,6 +263,12 @@ class RTPSender { bool is_retransmit); bool IsFecPacket(const RtpPacketToSend& packet) const; + void AddPacketToTransportFeedback(uint16_t packet_id, + const RtpPacketToSend& packet, + int probe_cluster_id); + + void UpdateRtpOverhead(const RtpPacketToSend& packet); + Clock* const clock_; const int64_t clock_delta_ms_; Random random_ GUARDED_BY(send_critsect_); @@ -327,8 +337,11 @@ class RTPSender { uint32_t ssrc_rtx_ GUARDED_BY(send_critsect_); // Mapping rtx_payload_type_map_[associated] = rtx. std::map rtx_payload_type_map_ GUARDED_BY(send_critsect_); + size_t transport_overhead_bytes_per_packet_ GUARDED_BY(send_critsect_); + size_t rtp_overhead_bytes_per_packet_ GUARDED_BY(send_critsect_); RateLimiter* const retransmission_rate_limiter_; + OverheadObserver* overhead_observer_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RTPSender); }; diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc index cdac7a7210..c86d92b307 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc @@ -150,7 +150,7 @@ class RtpSenderTest : public ::testing::Test { false, &fake_clock_, &transport_, pacer ? &mock_paced_sender_ : nullptr, nullptr, &seq_num_allocator_, nullptr, nullptr, nullptr, nullptr, &mock_rtc_event_log_, &send_packet_observer_, - &retransmission_rate_limiter_)); + &retransmission_rate_limiter_, nullptr)); rtp_sender_->SetSendPayloadType(kPayload); rtp_sender_->SetSequenceNumber(kSeqNum); rtp_sender_->SetTimestampOffset(0); @@ -444,7 +444,7 @@ TEST_F(RtpSenderTestWithoutPacer, SendsPacketsWithTransportSequenceNumber) { rtp_sender_.reset(new RTPSender( false, &fake_clock_, &transport_, nullptr, nullptr, &seq_num_allocator_, &feedback_observer_, nullptr, nullptr, nullptr, &mock_rtc_event_log_, - &send_packet_observer_, &retransmission_rate_limiter_)); + &send_packet_observer_, &retransmission_rate_limiter_, nullptr)); EXPECT_EQ(0, rtp_sender_->RegisterRtpHeaderExtension( kRtpExtensionTransportSequenceNumber, kTransportSequenceNumberExtensionId)); @@ -487,11 +487,11 @@ TEST_F(RtpSenderTestWithoutPacer, OnSendPacketUpdated) { } TEST_F(RtpSenderTest, SendsPacketsWithTransportSequenceNumber) { - rtp_sender_.reset( - new RTPSender(false, &fake_clock_, &transport_, &mock_paced_sender_, - nullptr, &seq_num_allocator_, &feedback_observer_, nullptr, - nullptr, nullptr, &mock_rtc_event_log_, - &send_packet_observer_, &retransmission_rate_limiter_)); + rtp_sender_.reset(new RTPSender( + false, &fake_clock_, &transport_, &mock_paced_sender_, nullptr, + &seq_num_allocator_, &feedback_observer_, nullptr, nullptr, nullptr, + &mock_rtc_event_log_, &send_packet_observer_, + &retransmission_rate_limiter_, nullptr)); rtp_sender_->SetSequenceNumber(kSeqNum); rtp_sender_->SetSSRC(kSsrc); rtp_sender_->SetStorePacketsStatus(true, 10); @@ -775,7 +775,8 @@ TEST_F(RtpSenderTest, OnSendPacketNotUpdatedWithoutSeqNumAllocator) { rtp_sender_.reset(new RTPSender( false, &fake_clock_, &transport_, &mock_paced_sender_, nullptr, nullptr /* TransportSequenceNumberAllocator */, nullptr, nullptr, nullptr, - nullptr, nullptr, &send_packet_observer_, &retransmission_rate_limiter_)); + nullptr, nullptr, &send_packet_observer_, &retransmission_rate_limiter_, + nullptr)); rtp_sender_->SetSequenceNumber(kSeqNum); rtp_sender_->SetSSRC(kSsrc); EXPECT_EQ(0, rtp_sender_->RegisterRtpHeaderExtension( @@ -801,7 +802,7 @@ TEST_F(RtpSenderTest, SendRedundantPayloads) { rtp_sender_.reset(new RTPSender( false, &fake_clock_, &transport, &mock_paced_sender_, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, &mock_rtc_event_log_, nullptr, - &retransmission_rate_limiter_)); + &retransmission_rate_limiter_, nullptr)); rtp_sender_->SetSequenceNumber(kSeqNum); rtp_sender_->SetSSRC(kSsrc); rtp_sender_->SetRtxPayloadType(kRtxPayload, kPayload); @@ -919,11 +920,11 @@ TEST_F(RtpSenderTest, SendFlexfecPackets) { kNoRtpExtensions, &fake_clock_); // Reset |rtp_sender_| to use FlexFEC. - rtp_sender_.reset( - new RTPSender(false, &fake_clock_, &transport_, &mock_paced_sender_, - &flexfec_sender, &seq_num_allocator_, nullptr, nullptr, - nullptr, nullptr, &mock_rtc_event_log_, - &send_packet_observer_, &retransmission_rate_limiter_)); + rtp_sender_.reset(new RTPSender( + false, &fake_clock_, &transport_, &mock_paced_sender_, &flexfec_sender, + &seq_num_allocator_, nullptr, nullptr, nullptr, nullptr, + &mock_rtc_event_log_, &send_packet_observer_, + &retransmission_rate_limiter_, nullptr)); rtp_sender_->SetSSRC(kMediaSsrc); rtp_sender_->SetSequenceNumber(kSeqNum); rtp_sender_->SetSendPayloadType(kMediaPayloadType); @@ -977,7 +978,7 @@ TEST_F(RtpSenderTestWithoutPacer, SendFlexfecPackets) { &flexfec_sender, &seq_num_allocator_, nullptr, nullptr, nullptr, nullptr, &mock_rtc_event_log_, &send_packet_observer_, - &retransmission_rate_limiter_)); + &retransmission_rate_limiter_, nullptr)); rtp_sender_->SetSSRC(kMediaSsrc); rtp_sender_->SetSequenceNumber(kSeqNum); rtp_sender_->SetSendPayloadType(kMediaPayloadType); @@ -1020,10 +1021,10 @@ TEST_F(RtpSenderTest, FrameCountCallbacks) { FrameCounts frame_counts_; } callback; - rtp_sender_.reset(new RTPSender(false, &fake_clock_, &transport_, - &mock_paced_sender_, nullptr, nullptr, - nullptr, nullptr, &callback, nullptr, nullptr, - nullptr, &retransmission_rate_limiter_)); + rtp_sender_.reset( + new RTPSender(false, &fake_clock_, &transport_, &mock_paced_sender_, + nullptr, nullptr, nullptr, nullptr, &callback, nullptr, + nullptr, nullptr, &retransmission_rate_limiter_, nullptr)); char payload_name[RTP_PAYLOAD_NAME_SIZE] = "GENERIC"; const uint8_t payload_type = 127; @@ -1085,7 +1086,7 @@ TEST_F(RtpSenderTest, BitrateCallbacks) { rtp_sender_.reset(new RTPSender(false, &fake_clock_, &transport_, nullptr, nullptr, nullptr, nullptr, &callback, nullptr, nullptr, nullptr, nullptr, - &retransmission_rate_limiter_)); + &retransmission_rate_limiter_, nullptr)); // Simulate kNumPackets sent with kPacketInterval ms intervals, with the // number of packets selected so that we fill (but don't overflow) the one @@ -1143,7 +1144,7 @@ class RtpSenderAudioTest : public RtpSenderTest { rtp_sender_.reset(new RTPSender(true, &fake_clock_, &transport_, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, - &retransmission_rate_limiter_)); + &retransmission_rate_limiter_, nullptr)); rtp_sender_->SetSequenceNumber(kSeqNum); } }; @@ -1494,4 +1495,52 @@ TEST_F(RtpSenderVideoTest, SendVideoWithCameraAndFlipCVO) { ConvertCVOByteToVideoRotation(flip_bit | camera_bit | 3)); } +namespace { +class MockOverheadObserver : public OverheadObserver { + public: + MOCK_METHOD1(OnOverheadChanged, void(size_t overhead_bytes_per_packet)); +}; +} // namespace + +TEST_F(RtpSenderTest, OnOverheadChanged) { + MockOverheadObserver mock_overhead_observer; + rtp_sender_.reset( + new RTPSender(false, &fake_clock_, &transport_, nullptr, nullptr, nullptr, + nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, + &retransmission_rate_limiter_, &mock_overhead_observer)); + + // Transport overhead is set to 28B. + EXPECT_CALL(mock_overhead_observer, OnOverheadChanged(28)).Times(1); + rtp_sender_->SetTransportOverhead(28); + + // RTP overhead is 12B. + // 28B + 12B = 40B + EXPECT_CALL(mock_overhead_observer, OnOverheadChanged(40)).Times(1); + SendGenericPayload(); + + rtp_sender_->RegisterRtpHeaderExtension(kRtpExtensionTransmissionTimeOffset, + kTransmissionTimeOffsetExtensionId); + + // TransmissionTimeOffset extension has a size of 8B. + // 28B + 12B + 8B = 48B + EXPECT_CALL(mock_overhead_observer, OnOverheadChanged(48)).Times(1); + SendGenericPayload(); +} + +TEST_F(RtpSenderTest, DoesNotUpdateOverheadOnEqualSize) { + MockOverheadObserver mock_overhead_observer; + rtp_sender_.reset( + new RTPSender(false, &fake_clock_, &transport_, nullptr, nullptr, nullptr, + nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, + &retransmission_rate_limiter_, &mock_overhead_observer)); + + EXPECT_CALL(mock_overhead_observer, OnOverheadChanged(_)).Times(1); + rtp_sender_->SetTransportOverhead(28); + rtp_sender_->SetTransportOverhead(28); + + EXPECT_CALL(mock_overhead_observer, OnOverheadChanged(_)).Times(1); + SendGenericPayload(); + SendGenericPayload(); +} + } // namespace webrtc