From b518017e71d7cc0eab031f6259e4d87aaeb5c9c5 Mon Sep 17 00:00:00 2001 From: "pwestin@webrtc.org" Date: Fri, 9 Nov 2012 20:56:23 +0000 Subject: [PATCH] Adding pacing module, will replace the transmission_bucket in the RTP module. TESTED=unittest Review URL: https://webrtc-codereview.appspot.com/930015 git-svn-id: http://webrtc.googlecode.com/svn/trunk@3073 4adac7df-926f-26a2-2b94-8c16560cd09d --- webrtc/modules/modules.gyp | 1 + webrtc/modules/pacing/OWNERS | 4 + webrtc/modules/pacing/include/paced_sender.h | 102 +++++++++ webrtc/modules/pacing/paced_sender.cc | 209 ++++++++++++++++++ .../modules/pacing/paced_sender_unittest.cc | 184 +++++++++++++++ webrtc/modules/pacing/pacing.gypi | 53 +++++ 6 files changed, 553 insertions(+) create mode 100644 webrtc/modules/pacing/OWNERS create mode 100644 webrtc/modules/pacing/include/paced_sender.h create mode 100644 webrtc/modules/pacing/paced_sender.cc create mode 100644 webrtc/modules/pacing/paced_sender_unittest.cc create mode 100644 webrtc/modules/pacing/pacing.gypi diff --git a/webrtc/modules/modules.gyp b/webrtc/modules/modules.gyp index f9d8765c11..3244357247 100644 --- a/webrtc/modules/modules.gyp +++ b/webrtc/modules/modules.gyp @@ -23,6 +23,7 @@ 'audio_processing/audio_processing.gypi', 'bitrate_controller/bitrate_controller.gypi', 'media_file/source/media_file.gypi', + 'pacing/pacing.gypi', 'remote_bitrate_estimator/remote_bitrate_estimator.gypi', 'rtp_rtcp/source/rtp_rtcp.gypi', 'udp_transport/source/udp_transport.gypi', diff --git a/webrtc/modules/pacing/OWNERS b/webrtc/modules/pacing/OWNERS new file mode 100644 index 0000000000..933a045009 --- /dev/null +++ b/webrtc/modules/pacing/OWNERS @@ -0,0 +1,4 @@ +pwestin@webrtc.org +stefan@webrtc.org +mflodman@webrtc.org +asapersson@webrtc.org diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h new file mode 100644 index 0000000000..6d394c26eb --- /dev/null +++ b/webrtc/modules/pacing/include/paced_sender.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_MODULES_PACED_SENDER_H_ +#define WEBRTC_MODULES_PACED_SENDER_H_ + +#include + +#include "webrtc/modules/interface/module.h" +#include "webrtc/system_wrappers/interface/scoped_ptr.h" +#include "webrtc/system_wrappers/interface/tick_util.h" +#include "webrtc/typedefs.h" + +namespace webrtc { +class CriticalSectionWrapper; + +class PacedSender : public Module { + public: + enum Priority { + kHighPriority = 0, // Pass through; will be sent immediately. + kNormalPriority = 2, // Put in back of the line. + kLowPriority = 3, // Put in back of the low priority line. + }; + class Callback { + public: + // Note: packets sent as a result of a callback should not pass by this + // module again. + // Called when it's time to send a queued packet. + virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, + int64_t capture_time_ms) = 0; + // Called when it's a good time to send a padding data. + virtual void TimeToSendPadding(int bytes) = 0; + protected: + virtual ~Callback() {} + }; + PacedSender(Callback* callback, int target_bitrate_kbps); + + virtual ~PacedSender(); + + // Enable/disable pacing. + void SetStatus(bool enable); + + // Current total estimated bitrate. + void UpdateBitrate(int target_bitrate_kbps); + + // Returns true if we send the packet now, else it will add the packet + // information to the queue and call TimeToSendPacket when it's time to send. + bool SendPacket(Priority priority, uint32_t ssrc, uint16_t sequence_number, + int64_t capture_time_ms, int bytes); + + // Returns the number of milliseconds until the module want a worker thread + // to call Process. + virtual int32_t TimeUntilNextProcess(); + + // Process any pending packets in the queue(s). + virtual int32_t Process(); + + private: + struct Packet { + Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, + int length_in_bytes) + : ssrc_(ssrc), + sequence_number_(seq_number), + capture_time_ms_(capture_time_ms), + bytes_(length_in_bytes) { + } + uint32_t ssrc_; + uint16_t sequence_number_; + int64_t capture_time_ms_; + int bytes_; + }; + // Checks if next packet in line can be transmitted. Returns true on success. + bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number, + int64_t* capture_time_ms); + + // Updates the number of bytes that can be sent for the next time interval. + void UpdateBytesPerInterval(uint32_t delta_time_in_ms); + + // Updates the buffers with the number of bytes that we sent. + void UpdateState(int num_bytes); + + Callback* callback_; + bool enable_; + scoped_ptr critsect_; + int target_bitrate_kbytes_per_s_; + int bytes_remaining_interval_; + int padding_bytes_remaining_interval_; + TickTime time_last_update_; + TickTime time_last_send_; + + std::list normal_priority_packets_; + std::list low_priority_packets_; +}; +} // namespace webrtc +#endif // WEBRTC_MODULES_PACED_SENDER_H_ diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc new file mode 100644 index 0000000000..0bd21cb366 --- /dev/null +++ b/webrtc/modules/pacing/paced_sender.cc @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/modules/pacing/include/paced_sender.h" + +#include + +#include "webrtc/system_wrappers/interface/critical_section_wrapper.h" + +namespace { +// Multiplicative factor that is applied to the target bitrate to calculate the +// number of bytes that can be transmitted per interval. +// Increasing this factor will result in lower delays in cases of bitrate +// overshoots from the encoder. +const float kBytesPerIntervalMargin = 1.5f; + +// Time limit in milliseconds between packet bursts. +const int kMinPacketLimitMs = 5; + +// Upper cap on process interval, in case process has not been called in a long +// time. +const int kMaxIntervalTimeMs = 30; + +// Max time that the first packet in the queue can sit in the queue if no +// packets are sent, regardless of buffer state. In practice only in effect at +// low bitrates (less than 320 kbits/s). +const int kMaxQueueTimeWithoutSendingMs = 30; +} // namespace + +namespace webrtc { + +PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps) + : callback_(callback), + enable_(false), + critsect_(CriticalSectionWrapper::CreateCriticalSection()), + target_bitrate_kbytes_per_s_(target_bitrate_kbps >> 3), // Divide by 8. + bytes_remaining_interval_(0), + padding_bytes_remaining_interval_(0), + time_last_update_(TickTime::Now()) { + UpdateBytesPerInterval(kMinPacketLimitMs); +} + +PacedSender::~PacedSender() { + normal_priority_packets_.clear(); + low_priority_packets_.clear(); +} + +void PacedSender::SetStatus(bool enable) { + CriticalSectionScoped cs(critsect_.get()); + enable_ = enable; +} + +void PacedSender::UpdateBitrate(int target_bitrate_kbps) { + CriticalSectionScoped cs(critsect_.get()); + target_bitrate_kbytes_per_s_ = target_bitrate_kbps >> 3; // Divide by 8. +} + +bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, + uint16_t sequence_number, int64_t capture_time_ms, int bytes) { + CriticalSectionScoped cs(critsect_.get()); + + if (!enable_) { + UpdateState(bytes); + return true; // We can send now. + } + switch (priority) { + case kHighPriority: + UpdateState(bytes); + return true; // We can send now. + case kNormalPriority: + if (normal_priority_packets_.empty() && bytes_remaining_interval_ > 0) { + UpdateState(bytes); + return true; // We can send now. + } + normal_priority_packets_.push_back( + Packet(ssrc, sequence_number, capture_time_ms, bytes)); + return false; + case kLowPriority: + if (normal_priority_packets_.empty() && + low_priority_packets_.empty() && + bytes_remaining_interval_ > 0) { + UpdateState(bytes); + return true; // We can send now. + } + low_priority_packets_.push_back( + Packet(ssrc, sequence_number, capture_time_ms, bytes)); + return false; + } + return false; +} + +int32_t PacedSender::TimeUntilNextProcess() { + CriticalSectionScoped cs(critsect_.get()); + int64_t elapsed_time_ms = + (TickTime::Now() - time_last_update_).Milliseconds(); + if (elapsed_time_ms <= 0) { + return kMinPacketLimitMs; + } + if (elapsed_time_ms >= kMinPacketLimitMs) { + return 0; + } + return kMinPacketLimitMs - elapsed_time_ms; +} + +int32_t PacedSender::Process() { + TickTime now = TickTime::Now(); + CriticalSectionScoped cs(critsect_.get()); + int elapsed_time_ms = (now - time_last_update_).Milliseconds(); + time_last_update_ = now; + if (elapsed_time_ms > 0) { + uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); + UpdateBytesPerInterval(delta_time_ms); + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; + while (GetNextPacket(&ssrc, &sequence_number, &capture_time_ms)) { + critsect_->Leave(); + callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms); + critsect_->Enter(); + } + if (normal_priority_packets_.empty() && + low_priority_packets_.empty() && + padding_bytes_remaining_interval_ > 0) { + critsect_->Leave(); + callback_->TimeToSendPadding(padding_bytes_remaining_interval_); + critsect_->Enter(); + padding_bytes_remaining_interval_ = 0; + bytes_remaining_interval_ -= padding_bytes_remaining_interval_; + } + } + return 0; +} + +// MUST have critsect_ when calling. +void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { + uint32_t bytes_per_interval = target_bitrate_kbytes_per_s_ * delta_time_ms; + + if (bytes_remaining_interval_ < 0) { + // We overused last interval, compensate this interval. + bytes_remaining_interval_ += kBytesPerIntervalMargin * bytes_per_interval; + } else { + // If we underused last interval we can't use it this interval. + bytes_remaining_interval_ = kBytesPerIntervalMargin * bytes_per_interval; + } + if (padding_bytes_remaining_interval_ < 0) { + // We overused last interval, compensate this interval. + padding_bytes_remaining_interval_ += bytes_per_interval; + } else { + // If we underused last interval we can't use it this interval. + padding_bytes_remaining_interval_ = bytes_per_interval; + } +} + +// MUST have critsect_ when calling. +bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number, + int64_t* capture_time_ms) { + if (bytes_remaining_interval_ <= 0) { + // All bytes consumed for this interval. + // Check if we have not sent in a too long time. + if (!normal_priority_packets_.empty()) { + if ((TickTime::Now() - time_last_send_).Milliseconds() > + kMaxQueueTimeWithoutSendingMs) { + Packet packet = normal_priority_packets_.front(); + UpdateState(packet.bytes_); + *sequence_number = packet.sequence_number_; + *ssrc = packet.ssrc_; + *capture_time_ms = packet.capture_time_ms_; + normal_priority_packets_.pop_front(); + return true; + } + } + return false; + } + if (!normal_priority_packets_.empty()) { + Packet packet = normal_priority_packets_.front(); + UpdateState(packet.bytes_); + *sequence_number = packet.sequence_number_; + *ssrc = packet.ssrc_; + *capture_time_ms = packet.capture_time_ms_; + normal_priority_packets_.pop_front(); + return true; + } + if (!low_priority_packets_.empty()) { + Packet packet = low_priority_packets_.front(); + UpdateState(packet.bytes_); + *sequence_number = packet.sequence_number_; + *ssrc = packet.ssrc_; + *capture_time_ms = packet.capture_time_ms_; + low_priority_packets_.pop_front(); + return true; + } + return false; +} + +// MUST have critsect_ when calling. +void PacedSender::UpdateState(int num_bytes) { + time_last_send_ = TickTime::Now(); + bytes_remaining_interval_ -= num_bytes; + padding_bytes_remaining_interval_ -= num_bytes; +} + +} // namespace webrtc diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc new file mode 100644 index 0000000000..845ac40218 --- /dev/null +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include + +#include "webrtc/modules/pacing/include/paced_sender.h" + +namespace { + const int kTargetBitrate = 800; +}; + +namespace webrtc { + +class MockPacedSenderCallback : public PacedSender::Callback { + public: + MOCK_METHOD3(TimeToSendPacket, + void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms)); + MOCK_METHOD1(TimeToSendPadding, + void(int bytes)); +}; + +class PacedSenderTest : public ::testing::Test { + protected: + PacedSenderTest() { + TickTime::UseFakeClock(123456); + // Need to initialize PacedSender after we initialize clock. + send_bucket_.reset(new PacedSender(&callback_, kTargetBitrate)); + send_bucket_->SetStatus(true); + } + MockPacedSenderCallback callback_; + scoped_ptr send_bucket_; +}; + +TEST_F(PacedSenderTest, QueuePacket) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, capture_time_ms, 250)); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(0); + TickTime::AdvanceFakeClock(4); + EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess()); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(1); + TickTime::AdvanceFakeClock(1); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + sequence_number++; + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); +} + +TEST_F(PacedSenderTest, PaceQueuedPackets) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + for (int i = 0; i < 3; ++i) { + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + } + for (int j = 0; j < 30; ++j) { + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + } + EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0); + for (int k = 0; k < 10; ++k) { + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(3); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + } + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); +} + +TEST_F(PacedSenderTest, Padding) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250)); + EXPECT_CALL(callback_, TimeToSendPadding(250)).Times(1); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(0); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + + EXPECT_CALL(callback_, TimeToSendPadding(500)).Times(1); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); +} + +TEST_F(PacedSenderTest, Priority) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + int64_t capture_time_ms_low_priority = 1234567; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kLowPriority, + ssrc_low_priority, sequence_number++, capture_time_ms_low_priority, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + + // Expect normal and low priority to be queued and high to pass through. + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kLowPriority, + ssrc_low_priority, sequence_number++, capture_time_ms_low_priority, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kHighPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + + // Expect all normal priority to be sent out first. + EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(2); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + + EXPECT_CALL(callback_, TimeToSendPacket(ssrc_low_priority, + testing::_, capture_time_ms_low_priority)).Times(1); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); +} + +} // namespace webrtc diff --git a/webrtc/modules/pacing/pacing.gypi b/webrtc/modules/pacing/pacing.gypi new file mode 100644 index 0000000000..66b12ecafe --- /dev/null +++ b/webrtc/modules/pacing/pacing.gypi @@ -0,0 +1,53 @@ +# Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +{ + 'targets': [ + { + 'target_name': 'paced_sender', + 'type': '<(library)', + 'dependencies': [ + '<(webrtc_root)/system_wrappers/source/system_wrappers.gyp:system_wrappers', + ], + 'include_dirs': [ + 'include', + ], + 'sources': [ + 'include/paced_sender.h', + 'paced_sender.cc', + ], + }, + ], # targets + + 'conditions': [ + ['include_tests==1', { + 'targets' : [ + { + 'target_name': 'paced_sender_unittests', + 'type': 'executable', + 'dependencies': [ + 'paced_sender', + '<(webrtc_root)/test/test.gyp:test_support_main', + '<(DEPTH)/testing/gmock.gyp:gmock', + '<(DEPTH)/testing/gtest.gyp:gtest', + ], + 'sources': [ + 'paced_sender_unittest.cc', + ], + }, + ], # targets + }], # include_tests + ], # conditions + +} + +# Local Variables: +# tab-width:2 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=2 shiftwidth=2