diff --git a/webrtc/modules/pacing/include/mock/mock_paced_sender.h b/webrtc/modules/pacing/include/mock/mock_paced_sender.h new file mode 100644 index 0000000000..13b414deaf --- /dev/null +++ b/webrtc/modules/pacing/include/mock/mock_paced_sender.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_ +#define WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_ + +#include + +#include + +#include "webrtc/modules/pacing/include/paced_sender.h" + +namespace webrtc { + +class MockPacedSender : public PacedSender { + public: + MockPacedSender() : PacedSender(NULL, 0) {} + MOCK_METHOD5(SendPacket, bool(Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + int bytes)); + MOCK_CONST_METHOD0(QueueInMs, int()); + MOCK_CONST_METHOD0(QueueInPackets, int()); +}; + +} // namespace webrtc + +#endif // WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_ diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h index 4a0ade8012..bd4880de47 100644 --- a/webrtc/modules/pacing/include/paced_sender.h +++ b/webrtc/modules/pacing/include/paced_sender.h @@ -62,11 +62,14 @@ class PacedSender : public Module { // Returns true if we send the packet now, else it will add the packet // information to the queue and call TimeToSendPacket when it's time to send. - bool SendPacket(Priority priority, uint32_t ssrc, uint16_t sequence_number, - int64_t capture_time_ms, int bytes); + virtual bool SendPacket(Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + int bytes); // Returns the time since the oldest queued packet was captured. - int QueueInMs() const; + virtual int QueueInMs() const; // Returns the number of milliseconds until the module want a worker thread // to call Process. diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc index 54c242d05d..e74465f89a 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc @@ -448,19 +448,18 @@ void RTPSender::SetStorePacketsStatus(const bool enable, packet_history_->SetStorePacketsStatus(enable, number_to_store); } -bool RTPSender::StorePackets() const { return packet_history_->StorePackets(); } +bool RTPSender::StorePackets() const { + return packet_history_->StorePackets(); +} int32_t RTPSender::ReSendPacket(uint16_t packet_id, uint32_t min_resend_time) { uint16_t length = IP_PACKET_SIZE; uint8_t data_buffer[IP_PACKET_SIZE]; uint8_t *buffer_to_send_ptr = data_buffer; - - int64_t stored_time_in_ms; + int64_t capture_time_ms; StorageType type; - bool found = packet_history_->GetRTPPacket(packet_id, min_resend_time, - data_buffer, &length, - &stored_time_in_ms, &type); - if (!found) { + if (!packet_history_->GetRTPPacket(packet_id, min_resend_time, data_buffer, + &length, &capture_time_ms, &type)) { // Packet not found. return 0; } @@ -469,44 +468,63 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id, uint32_t min_resend_time) { // packet should not be retransmitted. return 0; } + uint8_t data_buffer_rtx[IP_PACKET_SIZE]; if (rtx_ != kRtxOff) { BuildRtxPacket(data_buffer, &length, data_buffer_rtx); buffer_to_send_ptr = data_buffer_rtx; } - int32_t bytes_sent = ReSendToNetwork(buffer_to_send_ptr, length); ModuleRTPUtility::RTPHeaderParser rtp_parser(data_buffer, length); WebRtcRTPHeader rtp_header; rtp_parser.Parse(rtp_header); + + // Store the time when the packet was last sent or added to pacer. + packet_history_->UpdateResendTime(packet_id); + + { + // Update send statistics prior to pacer. + CriticalSectionScoped cs(send_critsect_); + Bitrate::Update(length); + packets_sent_++; + // We on purpose don't add to payload_bytes_sent_ since this is a + // re-transmit and not new payload data. + } + + if (paced_sender_) { + if (!paced_sender_->SendPacket(PacedSender::kHighPriority, + rtp_header.header.ssrc, + rtp_header.header.sequenceNumber, + capture_time_ms, + length)) { + // We can't send the packet right now. + // We will be called when it is time. + return 0; + } + } + TRACE_EVENT_INSTANT2("webrtc_rtp", "RTPSender::ReSendPacket", "timestamp", rtp_header.header.timestamp, "seqnum", rtp_header.header.sequenceNumber); - if (bytes_sent <= 0) { - WEBRTC_TRACE(kTraceWarning, kTraceRtpRtcp, id_, - "Transport failed to resend packet_id %u", packet_id); - return -1; + + if (SendPacketToNetwork(buffer_to_send_ptr, length)) { + return 0; } - // Store the time when the packet was last resent. - packet_history_->UpdateResendTime(packet_id); - return bytes_sent; + return -1; } -int32_t RTPSender::ReSendToNetwork(const uint8_t *packet, const uint32_t size) { - int32_t bytes_sent = -1; +bool RTPSender::SendPacketToNetwork(const uint8_t *packet, uint32_t size) { + int bytes_sent = -1; if (transport_) { bytes_sent = transport_->SendPacket(id_, packet, size); } + // TODO(pwesin): Add a separate bitrate for sent bitrate after pacer. if (bytes_sent <= 0) { - return -1; + WEBRTC_TRACE(kTraceWarning, kTraceRtpRtcp, id_, + "Transport failed to send packet"); + return false; } - // Update send statistics. - CriticalSectionScoped cs(send_critsect_); - Bitrate::Update(bytes_sent); - packets_sent_++; - // We on purpose don't add to payload_bytes_sent_ since this is a - // re-transmit and not new payload data. - return bytes_sent; + return true; } int RTPSender::SelectiveRetransmissions() const { @@ -625,12 +643,13 @@ void RTPSender::UpdateNACKBitRate(const uint32_t bytes, } } +// Called from pacer when we can send the packet. void RTPSender::TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms) { StorageType type; uint16_t length = IP_PACKET_SIZE; uint8_t data_buffer[IP_PACKET_SIZE]; - int64_t stored_time_ms; // TODO(pwestin) can we deprecate this? + int64_t stored_time_ms; if (packet_history_ == NULL) { return; @@ -655,20 +674,7 @@ void RTPSender::TimeToSendPacket(uint16_t sequence_number, rtp_header.header.sequenceNumber, rtp_header.header.headerLength); } - int bytes_sent = -1; - if (transport_) { - bytes_sent = transport_->SendPacket(id_, data_buffer, length); - } - if (bytes_sent <= 0) { - return; - } - // Update send statistics. - CriticalSectionScoped cs(send_critsect_); - Bitrate::Update(bytes_sent); - packets_sent_++; - if (bytes_sent > rtp_header.header.headerLength) { - payload_bytes_sent_ += bytes_sent - rtp_header.header.headerLength; - } + SendPacketToNetwork(data_buffer, length); } // TODO(pwestin): send in the RTPHeaderParser to avoid parsing it again. @@ -695,17 +701,26 @@ int32_t RTPSender::SendToNetwork( return -1; } - int32_t bytes_sent = -1; // Create and send RTX Packet. + // TODO(pwesin): This should be moved to its own code path triggered by pacer. + bool rtx_sent = false; if (rtx_ == kRtxAll && storage == kAllowRetransmission) { uint16_t length_rtx = payload_length + rtp_header_length; uint8_t data_buffer_rtx[IP_PACKET_SIZE]; BuildRtxPacket(buffer, &length_rtx, data_buffer_rtx); - if (transport_) { - bytes_sent += transport_->SendPacket(id_, data_buffer_rtx, length_rtx); - if (bytes_sent <= 0) { - return -1; - } + if (!SendPacketToNetwork(data_buffer_rtx, length_rtx)) return -1; + rtx_sent = true; + } + { + // Update send statistics prior to pacer. + CriticalSectionScoped cs(send_critsect_); + Bitrate::Update(payload_length + rtp_header_length); + ++packets_sent_; + payload_bytes_sent_ += payload_length; + if (rtx_sent) { + // The RTX packet. + ++packets_sent_; + payload_bytes_sent_ += payload_length; } } @@ -716,26 +731,13 @@ int32_t RTPSender::SendToNetwork( payload_length + rtp_header_length)) { // We can't send the packet right now. // We will be called when it is time. - return payload_length + rtp_header_length; + return 0; } } - // Send data packet. - bytes_sent = -1; - if (transport_) { - bytes_sent = transport_->SendPacket(id_, buffer, - payload_length + rtp_header_length); + if (SendPacketToNetwork(buffer, payload_length + rtp_header_length)) { + return 0; } - if (bytes_sent <= 0) { - return -1; - } - // Update send statistics. - CriticalSectionScoped cs(send_critsect_); - Bitrate::Update(bytes_sent); - packets_sent_++; - if (bytes_sent > rtp_header_length) { - payload_bytes_sent_ += bytes_sent - rtp_header_length; - } - return 0; + return -1; } void RTPSender::ProcessBitrate() { diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.h b/webrtc/modules/rtp_rtcp/source/rtp_sender.h index 0f41632798..2e207b52ce 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.h +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.h @@ -169,8 +169,6 @@ class RTPSender : public Bitrate, public RTPSenderInterface { int32_t ReSendPacket(uint16_t packet_id, uint32_t min_resend_time = 0); - int32_t ReSendToNetwork(const uint8_t *packet, const uint32_t size); - bool ProcessNACKBitRate(const uint32_t now); // RTX. @@ -263,6 +261,8 @@ class RTPSender : public Bitrate, public RTPSenderInterface { void BuildRtxPacket(uint8_t* buffer, uint16_t* length, uint8_t* buffer_rtx); + bool SendPacketToNetwork(const uint8_t *packet, uint32_t size); + int32_t id_; const bool audio_configured_; RTPSenderAudio *audio_; diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc index 2c2220167b..7b7f6ac315 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender_unittest.cc @@ -14,6 +14,7 @@ #include +#include "webrtc/modules/pacing/include/mock/mock_paced_sender.h" #include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h" #include "webrtc/modules/rtp_rtcp/source/rtp_format_video_generic.h" #include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h" @@ -34,6 +35,8 @@ const int kTimeOffset = 22222; const int kMaxPacketLength = 1500; } // namespace +using testing::_; + class LoopbackTransportTest : public webrtc::Transport { public: LoopbackTransportTest() @@ -58,13 +61,17 @@ class RtpSenderTest : public ::testing::Test { protected: RtpSenderTest() : fake_clock_(123456), + mock_paced_sender_(), rtp_sender_(new RTPSender(0, false, &fake_clock_, &transport_, NULL, - NULL)), + &mock_paced_sender_)), kMarkerBit(true), kType(kRtpExtensionTransmissionTimeOffset) { rtp_sender_->SetSequenceNumber(kSeqNum); + EXPECT_CALL(mock_paced_sender_, + SendPacket(_, _, _, _, _)).WillRepeatedly(testing::Return(true)); } SimulatedClock fake_clock_; + MockPacedSender mock_paced_sender_; scoped_ptr rtp_sender_; LoopbackTransportTest transport_; const bool kMarkerBit; @@ -173,24 +180,11 @@ TEST_F(RtpSenderTest, BuildRTPPacketWithNegativeTransmissionOffsetExtension) { EXPECT_EQ(kNegTimeOffset, rtp_header.extension.transmissionTimeOffset); } -TEST_F(RtpSenderTest, NoTrafficSmoothing) { - int32_t rtp_length = rtp_sender_->BuildRTPheader(packet_, - kPayload, - kMarkerBit, - kTimestamp); +TEST_F(RtpSenderTest, TrafficSmoothingWithTimeOffset) { + EXPECT_CALL(mock_paced_sender_, + SendPacket(PacedSender::kNormalPriority, _, kSeqNum, _, _)). + WillOnce(testing::Return(false)); - // Packet should be sent immediately. - EXPECT_EQ(0, rtp_sender_->SendToNetwork(packet_, - 0, - rtp_length, - kTimestamp / 90, - kAllowRetransmission)); - EXPECT_EQ(1, transport_.packets_sent_); - EXPECT_EQ(rtp_length, transport_.last_sent_packet_len_); -} - -TEST_F(RtpSenderTest, DISABLED_TrafficSmoothing) { - // TODO(pwestin) we need to send in a pacer object. rtp_sender_->SetStorePacketsStatus(true, 10); EXPECT_EQ(0, rtp_sender_->RegisterRtpHeaderExtension(kType, kId)); rtp_sender_->SetTargetSendBitrate(300000); @@ -198,15 +192,23 @@ TEST_F(RtpSenderTest, DISABLED_TrafficSmoothing) { kPayload, kMarkerBit, kTimestamp); + + int64_t capture_time_ms = fake_clock_.TimeInMilliseconds(); + // Packet should be stored in a send bucket. EXPECT_EQ(0, rtp_sender_->SendToNetwork(packet_, 0, rtp_length, - fake_clock_.TimeInMilliseconds(), + capture_time_ms, kAllowRetransmission)); + EXPECT_EQ(0, transport_.packets_sent_); + const int kStoredTimeInMs = 100; fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs); + + rtp_sender_->TimeToSendPacket(kSeqNum, capture_time_ms); + // Process send bucket. Packet should now be sent. EXPECT_EQ(1, transport_.packets_sent_); EXPECT_EQ(rtp_length, transport_.last_sent_packet_len_); @@ -218,6 +220,60 @@ TEST_F(RtpSenderTest, DISABLED_TrafficSmoothing) { map.Register(kType, kId); const bool valid_rtp_header = rtp_parser.Parse(rtp_header, &map); ASSERT_TRUE(valid_rtp_header); + + // Verify transmission time offset. + EXPECT_EQ(kStoredTimeInMs * 90, rtp_header.extension.transmissionTimeOffset); +} + +TEST_F(RtpSenderTest, TrafficSmoothingRetransmits) { + EXPECT_CALL(mock_paced_sender_, + SendPacket(PacedSender::kNormalPriority, _, kSeqNum, _, _)). + WillOnce(testing::Return(false)); + + rtp_sender_->SetStorePacketsStatus(true, 10); + EXPECT_EQ(0, rtp_sender_->RegisterRtpHeaderExtension(kType, kId)); + rtp_sender_->SetTargetSendBitrate(300000); + int32_t rtp_length = rtp_sender_->BuildRTPheader(packet_, + kPayload, + kMarkerBit, + kTimestamp); + + int64_t capture_time_ms = fake_clock_.TimeInMilliseconds(); + + // Packet should be stored in a send bucket. + EXPECT_EQ(0, rtp_sender_->SendToNetwork(packet_, + 0, + rtp_length, + capture_time_ms, + kAllowRetransmission)); + + EXPECT_EQ(0, transport_.packets_sent_); + + EXPECT_CALL(mock_paced_sender_, + SendPacket(PacedSender::kHighPriority, _, kSeqNum, _, _)). + WillOnce(testing::Return(false)); + + const int kStoredTimeInMs = 100; + fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs); + + EXPECT_EQ(0, rtp_sender_->ReSendPacket(kSeqNum)); + EXPECT_EQ(0, transport_.packets_sent_); + + rtp_sender_->TimeToSendPacket(kSeqNum, capture_time_ms); + + // Process send bucket. Packet should now be sent. + EXPECT_EQ(1, transport_.packets_sent_); + EXPECT_EQ(rtp_length, transport_.last_sent_packet_len_); + + // Parse sent packet. + webrtc::ModuleRTPUtility::RTPHeaderParser rtp_parser( + transport_.last_sent_packet_, rtp_length); + webrtc::WebRtcRTPHeader rtp_header; + RtpHeaderExtensionMap map; + map.Register(kType, kId); + const bool valid_rtp_header = rtp_parser.Parse(rtp_header, &map); + ASSERT_TRUE(valid_rtp_header); + // Verify transmission time offset. EXPECT_EQ(kStoredTimeInMs * 90, rtp_header.extension.transmissionTimeOffset); }