BitrateProber::CreateProbeCluster now only accept one parameter (bitrate_bps).

Instead of having to specify a bitrate and how many packets to use,
the BitrateProber will now use the bitrate to calculate how many
bytes it will use to probe that bitrate instead.

For now, |kMinProbeDurationMs| is set to 15 ms which means that probing
at 1900 kbps will result in 1900/8*0.015 = 3.5 kB used. Since we can
expect packets to be smaller at the beginning of a stream (500 to 700
bytes) this will result in 7 to 5 packets sent for that bitrate, and
should work very similar to how the current initial probing works.

A minimum of 5 packets are always sent.

BUG=webrtc:6822

Review-Url: https://codereview.webrtc.org/2609113003
Cr-Commit-Position: refs/heads/master@{#15899}
This commit is contained in:
philipel 2017-01-04 07:05:25 -08:00 committed by Commit bot
parent b557b2169e
commit fd58b61068
9 changed files with 106 additions and 75 deletions

View File

@ -20,12 +20,6 @@
namespace webrtc {
namespace {
// Number of deltas between probes per cluster. On the very first cluster,
// we will need kProbeDeltasPerCluster + 1 probes, but on a cluster following
// another, we need kProbeDeltasPerCluster probes.
constexpr int kProbeDeltasPerCluster = 5;
// Maximum waiting time from the time of initiating probing to getting
// the measured results back.
constexpr int64_t kMaxWaitingTimeForProbingResultMs = 1000;
@ -200,7 +194,6 @@ void ProbeController::InitiateProbing(
int64_t now_ms,
std::initializer_list<int64_t> bitrates_to_probe,
bool probe_further) {
bool first_cluster = true;
for (int64_t bitrate : bitrates_to_probe) {
int64_t max_probe_bitrate_bps =
max_bitrate_bps_ > 0 ? max_bitrate_bps_ : kDefaultMaxProbingBitrateBps;
@ -208,14 +201,7 @@ void ProbeController::InitiateProbing(
bitrate = max_probe_bitrate_bps;
probe_further = false;
}
if (first_cluster) {
pacer_->CreateProbeCluster(rtc::checked_cast<int>(bitrate),
kProbeDeltasPerCluster + 1);
first_cluster = false;
} else {
pacer_->CreateProbeCluster(rtc::checked_cast<int>(bitrate),
kProbeDeltasPerCluster);
}
pacer_->CreateProbeCluster(rtc::checked_cast<int>(bitrate));
}
time_last_probing_initiated_ms_ = now_ms;
if (probe_further) {

View File

@ -49,24 +49,24 @@ class ProbeControllerTest : public ::testing::Test {
};
TEST_F(ProbeControllerTest, InitiatesProbingAtStart) {
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(AtLeast(2));
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(AtLeast(2));
probe_controller_->SetBitrates(kMinBitrateBps, kStartBitrateBps,
kMaxBitrateBps);
}
TEST_F(ProbeControllerTest, ProbeOnlyWhenNetworkIsUp) {
probe_controller_->OnNetworkStateChanged(kNetworkDown);
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(0);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(0);
probe_controller_->SetBitrates(kMinBitrateBps, kStartBitrateBps,
kMaxBitrateBps);
testing::Mock::VerifyAndClearExpectations(&pacer_);
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(AtLeast(2));
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(AtLeast(2));
probe_controller_->OnNetworkStateChanged(kNetworkUp);
}
TEST_F(ProbeControllerTest, InitiatesProbingOnMaxBitrateIncrease) {
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(AtLeast(2));
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(AtLeast(2));
probe_controller_->SetBitrates(kMinBitrateBps, kStartBitrateBps,
kMaxBitrateBps);
// Long enough to time out exponential probing.
@ -74,7 +74,7 @@ TEST_F(ProbeControllerTest, InitiatesProbingOnMaxBitrateIncrease) {
probe_controller_->SetEstimatedBitrate(kStartBitrateBps);
probe_controller_->Process();
EXPECT_CALL(pacer_, CreateProbeCluster(kMaxBitrateBps + 100, _));
EXPECT_CALL(pacer_, CreateProbeCluster(kMaxBitrateBps + 100));
probe_controller_->SetBitrates(kMinBitrateBps, kStartBitrateBps,
kMaxBitrateBps + 100);
}
@ -85,11 +85,11 @@ TEST_F(ProbeControllerTest, TestExponentialProbing) {
// Repeated probe should only be sent when estimated bitrate climbs above
// 0.7 * 6 * kStartBitrateBps = 1260.
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(0);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(0);
probe_controller_->SetEstimatedBitrate(1000);
testing::Mock::VerifyAndClearExpectations(&pacer_);
EXPECT_CALL(pacer_, CreateProbeCluster(2 * 1800, _));
EXPECT_CALL(pacer_, CreateProbeCluster(2 * 1800));
probe_controller_->SetEstimatedBitrate(1800);
}
@ -101,12 +101,12 @@ TEST_F(ProbeControllerTest, TestExponentialProbingTimeout) {
clock_.AdvanceTimeMilliseconds(kExponentialProbingTimeoutMs);
probe_controller_->Process();
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(0);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(0);
probe_controller_->SetEstimatedBitrate(1800);
}
TEST_F(ProbeControllerTest, ProbeAfterEstimateDropInAlr) {
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(2);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(2);
probe_controller_->SetBitrates(kMinBitrateBps, kStartBitrateBps,
kMaxBitrateBps);
probe_controller_->SetEstimatedBitrate(500);
@ -114,7 +114,7 @@ TEST_F(ProbeControllerTest, ProbeAfterEstimateDropInAlr) {
// When bandwidth estimate drops the controller should send a probe at the
// previous bitrate.
EXPECT_CALL(pacer_, CreateProbeCluster(500, _)).Times(1);
EXPECT_CALL(pacer_, CreateProbeCluster(500)).Times(1);
EXPECT_CALL(pacer_, GetApplicationLimitedRegionStartTime())
.WillRepeatedly(
Return(rtc::Optional<int64_t>(clock_.TimeInMilliseconds())));
@ -124,7 +124,7 @@ TEST_F(ProbeControllerTest, ProbeAfterEstimateDropInAlr) {
}
TEST_F(ProbeControllerTest, PeriodicProbing) {
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(2);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(2);
probe_controller_->EnablePeriodicAlrProbing(true);
probe_controller_->SetBitrates(kMinBitrateBps, kStartBitrateBps,
kMaxBitrateBps);
@ -134,7 +134,7 @@ TEST_F(ProbeControllerTest, PeriodicProbing) {
int64_t start_time = clock_.TimeInMilliseconds();
// Expect the controller to send a new probe after 5s has passed.
EXPECT_CALL(pacer_, CreateProbeCluster(1000, _)).Times(1);
EXPECT_CALL(pacer_, CreateProbeCluster(1000)).Times(1);
EXPECT_CALL(pacer_, GetApplicationLimitedRegionStartTime())
.WillRepeatedly(Return(rtc::Optional<int64_t>(start_time)));
clock_.AdvanceTimeMilliseconds(5000);
@ -143,7 +143,7 @@ TEST_F(ProbeControllerTest, PeriodicProbing) {
testing::Mock::VerifyAndClearExpectations(&pacer_);
// The following probe should be sent at 10s into ALR.
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(0);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(0);
EXPECT_CALL(pacer_, GetApplicationLimitedRegionStartTime())
.WillRepeatedly(Return(rtc::Optional<int64_t>(start_time)));
clock_.AdvanceTimeMilliseconds(4000);
@ -151,7 +151,7 @@ TEST_F(ProbeControllerTest, PeriodicProbing) {
probe_controller_->SetEstimatedBitrate(500);
testing::Mock::VerifyAndClearExpectations(&pacer_);
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(1);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(1);
EXPECT_CALL(pacer_, GetApplicationLimitedRegionStartTime())
.WillRepeatedly(Return(rtc::Optional<int64_t>(start_time)));
clock_.AdvanceTimeMilliseconds(1000);
@ -166,12 +166,12 @@ TEST_F(ProbeControllerTest, TestExponentialProbingOverflow) {
100 * kMbpsMultiplier);
// Verify that probe bitrate is capped at the specified max bitrate
EXPECT_CALL(pacer_, CreateProbeCluster(100 * kMbpsMultiplier, _));
EXPECT_CALL(pacer_, CreateProbeCluster(100 * kMbpsMultiplier));
probe_controller_->SetEstimatedBitrate(60 * kMbpsMultiplier);
testing::Mock::VerifyAndClearExpectations(&pacer_);
// Verify that repeated probes aren't sent.
EXPECT_CALL(pacer_, CreateProbeCluster(_, _)).Times(0);
EXPECT_CALL(pacer_, CreateProbeCluster(_)).Times(0);
probe_controller_->SetEstimatedBitrate(100 * kMbpsMultiplier);
}

View File

@ -26,6 +26,12 @@ constexpr int kInactivityThresholdMs = 5000;
// A minimum interval between probes to allow scheduling to be feasible.
constexpr int kMinProbeDeltaMs = 1;
// The minimum number probing packets used.
constexpr int kMinProbePacketsSent = 5;
// The minimum probing duration in ms.
constexpr int kMinProbeDurationMs = 15;
int ComputeDeltaFromBitrate(size_t probe_size, uint32_t bitrate_bps) {
RTC_CHECK_GT(bitrate_bps, 0);
// Compute the time delta needed to send probe_size bytes at bitrate_bps
@ -68,16 +74,20 @@ void BitrateProber::OnIncomingPacket(size_t packet_size) {
}
}
void BitrateProber::CreateProbeCluster(int bitrate_bps, int num_probes) {
void BitrateProber::CreateProbeCluster(int bitrate_bps) {
RTC_DCHECK(probing_state_ != ProbingState::kDisabled);
ProbeCluster cluster;
cluster.max_probes = num_probes;
cluster.probe_bitrate_bps = bitrate_bps;
cluster.min_probes = kMinProbePacketsSent;
cluster.min_bytes = bitrate_bps * kMinProbeDurationMs / 8000;
cluster.bitrate_bps = bitrate_bps;
cluster.id = next_cluster_id_++;
clusters_.push(cluster);
LOG(LS_INFO) << "Probe cluster (bitrate:probes): ("
<< cluster.probe_bitrate_bps << ":" << cluster.max_probes
<< ") ";
LOG(LS_INFO) << "Probe cluster (bitrate:min bytes:min packets): ("
<< cluster.bitrate_bps << ":" << cluster.min_bytes << ":"
<< cluster.min_probes << ")";
// If we are already probing, continue to do so. Otherwise set it to
// kInactive and wait for OnIncomingPacket to start the probing.
if (probing_state_ != ProbingState::kActive)
probing_state_ = ProbingState::kInactive;
}
@ -90,8 +100,7 @@ void BitrateProber::ResetState() {
std::queue<ProbeCluster> clusters;
clusters.swap(clusters_);
while (!clusters.empty()) {
CreateProbeCluster(clusters.front().probe_bitrate_bps,
clusters.front().max_probes);
CreateProbeCluster(clusters.front().bitrate_bps);
clusters.pop();
}
// If its enabled, reset to inactive.
@ -120,8 +129,8 @@ int BitrateProber::TimeUntilNextProbe(int64_t now_ms) {
// sent before.
int time_until_probe_ms = 0;
if (probe_size_last_sent_ != 0 && probing_state_ == ProbingState::kActive) {
int next_delta_ms = ComputeDeltaFromBitrate(
probe_size_last_sent_, clusters_.front().probe_bitrate_bps);
int next_delta_ms = ComputeDeltaFromBitrate(probe_size_last_sent_,
clusters_.front().bitrate_bps);
time_until_probe_ms = next_delta_ms - elapsed_time_ms;
// If we have waited more than 3 ms for a new packet to probe with we will
// consider this probing session over.
@ -147,8 +156,7 @@ int BitrateProber::CurrentClusterId() const {
// feasible.
size_t BitrateProber::RecommendedMinProbeSize() const {
RTC_DCHECK(!clusters_.empty());
return clusters_.front().probe_bitrate_bps * 2 * kMinProbeDeltaMs /
(8 * 1000);
return clusters_.front().bitrate_bps * 2 * kMinProbeDeltaMs / (8 * 1000);
}
void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) {
@ -158,9 +166,12 @@ void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) {
time_last_probe_sent_ms_ = now_ms;
if (!clusters_.empty()) {
ProbeCluster* cluster = &clusters_.front();
++cluster->sent_probes;
if (cluster->sent_probes == cluster->max_probes)
cluster->sent_bytes += static_cast<int>(bytes);
cluster->sent_probes += 1;
if (cluster->sent_bytes >= cluster->min_bytes &&
cluster->sent_probes >= cluster->min_probes) {
clusters_.pop();
}
if (clusters_.empty())
probing_state_ = ProbingState::kSuspended;
}

View File

@ -38,7 +38,7 @@ class BitrateProber {
// Create a cluster used to probe for |bitrate_bps| with |num_probes| number
// of probes.
void CreateProbeCluster(int bitrate_bps, int num_probes);
void CreateProbeCluster(int bitrate_bps);
// Returns the number of milliseconds until the next probe should be sent to
// get accurate probing.
@ -74,9 +74,11 @@ class BitrateProber {
// A probe cluster consists of a set of probes. Each probe in turn can be
// divided into a number of packets to accomodate the MTU on the network.
struct ProbeCluster {
int max_probes = 0;
int min_probes = 0;
int sent_probes = 0;
int probe_bitrate_bps = 0;
int min_bytes = 0;
int sent_bytes = 0;
int bitrate_bps = 0;
int id = -1;
};

View File

@ -21,8 +21,8 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) {
int64_t now_ms = 0;
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
prober.CreateProbeCluster(900000, 6);
prober.CreateProbeCluster(1800000, 5);
prober.CreateProbeCluster(900000);
prober.CreateProbeCluster(1800000);
EXPECT_FALSE(prober.IsProbing());
prober.OnIncomingPacket(1000);
@ -33,7 +33,7 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) {
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
prober.ProbeSent(now_ms, 1000);
for (int i = 0; i < 5; ++i) {
for (int i = 0; i < 4; ++i) {
EXPECT_EQ(8, prober.TimeUntilNextProbe(now_ms));
now_ms += 4;
EXPECT_EQ(4, prober.TimeUntilNextProbe(now_ms));
@ -60,7 +60,7 @@ TEST(BitrateProberTest, DoesntProbeWithoutRecentPackets) {
int64_t now_ms = 0;
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
prober.CreateProbeCluster(900000, 6);
prober.CreateProbeCluster(900000);
EXPECT_FALSE(prober.IsProbing());
prober.OnIncomingPacket(1000);
@ -97,9 +97,44 @@ TEST(BitrateProberTest, VerifyProbeSizeOnHighBitrate) {
BitrateProber prober;
constexpr unsigned kHighBitrateBps = 10000000; // 10 Mbps
prober.CreateProbeCluster(kHighBitrateBps, 6);
prober.CreateProbeCluster(kHighBitrateBps);
// Probe size should ensure a minimum of 1 ms interval.
EXPECT_GT(prober.RecommendedMinProbeSize(), kHighBitrateBps / 8000);
}
TEST(BitrateProberTest, MinumumNumberOfProbingPackets) {
BitrateProber prober;
// Even when probing at a low bitrate we expect a minimum number
// of packets to be sent.
constexpr int kBitrateBps = 100000; // 100 kbps
constexpr int kPacketSizeBytes = 1000;
prober.CreateProbeCluster(kBitrateBps);
prober.OnIncomingPacket(kPacketSizeBytes);
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(prober.IsProbing());
prober.ProbeSent(0, kPacketSizeBytes);
}
EXPECT_FALSE(prober.IsProbing());
}
TEST(BitrateProberTest, ScaleBytesUsedForProbing) {
BitrateProber prober;
constexpr int kBitrateBps = 10000000; // 10 Mbps
constexpr int kPacketSizeBytes = 1000;
constexpr int kExpectedBytesSent = kBitrateBps * 15 / 8000;
prober.CreateProbeCluster(kBitrateBps);
prober.OnIncomingPacket(kPacketSizeBytes);
int bytes_sent = 0;
while (bytes_sent < kExpectedBytesSent) {
EXPECT_TRUE(prober.IsProbing());
prober.ProbeSent(0, kPacketSizeBytes);
bytes_sent += kPacketSizeBytes;
}
EXPECT_FALSE(prober.IsProbing());
}
} // namespace webrtc

View File

@ -28,7 +28,7 @@ class MockPacedSender : public PacedSender {
int64_t capture_time_ms,
size_t bytes,
bool retransmission));
MOCK_METHOD2(CreateProbeCluster, void(int, int));
MOCK_METHOD1(CreateProbeCluster, void(int));
MOCK_METHOD1(SetEstimatedBitrate, void(uint32_t));
MOCK_CONST_METHOD0(QueueInMs, int64_t());
MOCK_CONST_METHOD0(QueueInPackets, int());

View File

@ -267,9 +267,9 @@ PacedSender::PacedSender(Clock* clock, PacketSender* packet_sender)
PacedSender::~PacedSender() {}
void PacedSender::CreateProbeCluster(int bitrate_bps, int num_packets) {
void PacedSender::CreateProbeCluster(int bitrate_bps) {
CriticalSectionScoped cs(critsect_.get());
prober_->CreateProbeCluster(bitrate_bps, num_packets);
prober_->CreateProbeCluster(bitrate_bps);
}
void PacedSender::Pause() {

View File

@ -26,6 +26,7 @@ class AlrDetector;
class BitrateProber;
class Clock;
class CriticalSectionWrapper;
class ProbeClusterCreatedObserver;
namespace paced_sender {
class IntervalBudget;
@ -68,12 +69,11 @@ class PacedSender : public Module, public RtpPacketSender {
static const size_t kMinProbePacketSize = 200;
PacedSender(Clock* clock,
PacketSender* packet_sender);
PacedSender(Clock* clock, PacketSender* packet_sender);
virtual ~PacedSender();
virtual void CreateProbeCluster(int bitrate_bps, int num_packets);
virtual void CreateProbeCluster(int bitrate_bps);
// Temporarily pause all sending.
void Pause();

View File

@ -21,9 +21,7 @@ using testing::Return;
namespace {
constexpr unsigned kFirstClusterBps = 900000;
constexpr int kFirstClusterCount = 6;
constexpr unsigned kSecondClusterBps = 1800000;
constexpr int kSecondClusterCount = 5;
// The error stems from truncating the time interval of probe packets to integer
// values. This results in probing slightly higher than the target bitrate.
@ -105,8 +103,8 @@ class PacedSenderTest : public ::testing::Test {
srand(0);
// Need to initialize PacedSender after we initialize clock.
send_bucket_.reset(new PacedSender(&clock_, &callback_));
send_bucket_->CreateProbeCluster(kFirstClusterBps, kFirstClusterCount);
send_bucket_->CreateProbeCluster(kSecondClusterBps, kSecondClusterCount);
send_bucket_->CreateProbeCluster(kFirstClusterBps);
send_bucket_->CreateProbeCluster(kSecondClusterBps);
// Default to bitrate probing disabled for testing purposes. Probing tests
// have to enable probing, either by creating a new PacedSender instance or
// by calling SetProbingEnabled(true).
@ -806,18 +804,18 @@ TEST_F(PacedSenderTest, ProbingWithInsertedPackets) {
PacedSenderProbing packet_sender;
send_bucket_.reset(new PacedSender(&clock_, &packet_sender));
send_bucket_->CreateProbeCluster(kFirstClusterBps, kFirstClusterCount);
send_bucket_->CreateProbeCluster(kSecondClusterBps, kSecondClusterCount);
send_bucket_->CreateProbeCluster(kFirstClusterBps);
send_bucket_->CreateProbeCluster(kSecondClusterBps);
send_bucket_->SetEstimatedBitrate(kInitialBitrateBps);
for (int i = 0; i < kFirstClusterCount + kSecondClusterCount; ++i) {
for (int i = 0; i < 10; ++i) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
}
int64_t start = clock_.TimeInMilliseconds();
while (packet_sender.packets_sent() < kFirstClusterCount) {
while (packet_sender.packets_sent() < 5) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
clock_.AdvanceTimeMilliseconds(time_until_process);
send_bucket_->Process();
@ -831,8 +829,7 @@ TEST_F(PacedSenderTest, ProbingWithInsertedPackets) {
EXPECT_EQ(0, packet_sender.padding_sent());
start = clock_.TimeInMilliseconds();
while (packet_sender.packets_sent() <
kFirstClusterCount + kSecondClusterCount) {
while (packet_sender.packets_sent() < 10) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
clock_.AdvanceTimeMilliseconds(time_until_process);
send_bucket_->Process();
@ -852,10 +849,10 @@ TEST_F(PacedSenderTest, ProbingWithPaddingSupport) {
PacedSenderProbing packet_sender;
send_bucket_.reset(new PacedSender(&clock_, &packet_sender));
send_bucket_->CreateProbeCluster(kFirstClusterBps, kFirstClusterCount);
send_bucket_->CreateProbeCluster(kFirstClusterBps);
send_bucket_->SetEstimatedBitrate(kInitialBitrateBps);
for (int i = 0; i < kFirstClusterCount - 2; ++i) {
for (int i = 0; i < 3; ++i) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
@ -863,7 +860,7 @@ TEST_F(PacedSenderTest, ProbingWithPaddingSupport) {
int64_t start = clock_.TimeInMilliseconds();
int process_count = 0;
while (process_count < kFirstClusterCount) {
while (process_count < 5) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
clock_.AdvanceTimeMilliseconds(time_until_process);
send_bucket_->Process();
@ -1009,7 +1006,7 @@ TEST_F(PacedSenderTest, ProbeClusterId) {
send_bucket_->SetSendBitrateLimits(kTargetBitrateBps, kTargetBitrateBps);
send_bucket_->SetProbingEnabled(true);
for (int i = 0; i < 11; ++i) {
for (int i = 0; i < 10; ++i) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number + i, clock_.TimeInMilliseconds(),
kPacketSize, false);
@ -1017,9 +1014,9 @@ TEST_F(PacedSenderTest, ProbeClusterId) {
// First probing cluster.
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, 0))
.Times(6)
.Times(5)
.WillRepeatedly(Return(true));
for (int i = 0; i < 6; ++i) {
for (int i = 0; i < 5; ++i) {
clock_.AdvanceTimeMilliseconds(20);
send_bucket_->Process();
}