Added cluster id to PacedSender::Callback::TimeToSendPacket.

Also added cluster id to paced_sender::Packet and set the cluster id of
the probing packet that is about to be sent.

BUG=webrtc:5859
R=danilchap@webrtc.org, stefan@webrtc.org

Review URL: https://codereview.webrtc.org/1962303002 .

Cr-Commit-Position: refs/heads/master@{#12718}
This commit is contained in:
philipel 2016-05-13 11:13:05 +02:00
parent 1a830c2c66
commit 29dca2ce95
10 changed files with 109 additions and 63 deletions

View File

@ -35,7 +35,7 @@ BitrateProber::BitrateProber()
: probing_state_(kDisabled),
packet_size_last_send_(0),
time_last_send_ms_(-1),
cluster_id_(0) {}
next_cluster_id_(0) {}
void BitrateProber::SetEnabled(bool enable) {
if (enable) {
@ -67,16 +67,16 @@ void BitrateProber::OnIncomingPacket(uint32_t bitrate_bps,
const int kPacketsPerProbe = 5;
const float kProbeBitrateMultipliers[kMaxNumProbes] = {3, 6};
std::stringstream bitrate_log;
bitrate_log << "Start probing for bandwidth, bitrates:";
bitrate_log << "Start probing for bandwidth, (bitrate:packets): ";
for (int i = 0; i < kMaxNumProbes; ++i) {
ProbeCluster cluster;
// We need one extra to get 5 deltas for the first probe, therefore (i == 0)
cluster.max_probe_packets = kPacketsPerProbe + (i == 0 ? 1 : 0);
cluster.probe_bitrate_bps = kProbeBitrateMultipliers[i] * bitrate_bps;
cluster.id = cluster_id_++;
cluster.id = next_cluster_id_++;
bitrate_log << " " << cluster.probe_bitrate_bps;
bitrate_log << ", num packets: " << cluster.max_probe_packets;
bitrate_log << "(" << cluster.probe_bitrate_bps << ":"
<< cluster.max_probe_packets << ") ";
clusters_.push(cluster);
}

View File

@ -71,7 +71,7 @@ class BitrateProber {
std::queue<ProbeCluster> clusters_;
size_t packet_size_last_send_;
int64_t time_last_send_ms_;
int cluster_id_;
int next_cluster_id_;
};
} // namespace webrtc
#endif // WEBRTC_MODULES_PACING_BITRATE_PROBER_H_

View File

@ -399,8 +399,10 @@ void PacedSender::Process() {
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
const paced_sender::Packet& packet = packets_->BeginPop();
int probe_cluster_id =
prober_->IsProbing() ? prober_->CurrentClusterId() : -1;
if (SendPacket(packet)) {
if (SendPacket(packet, probe_cluster_id)) {
// Send succeeded, remove it from the queue.
packets_->FinalizePop(packet);
if (prober_->IsProbing())
@ -427,7 +429,8 @@ void PacedSender::Process() {
SendPadding(static_cast<size_t>(padding_needed));
}
bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
bool PacedSender::SendPacket(const paced_sender::Packet& packet,
int probe_cluster_id) {
// TODO(holmer): Because of this bug issue 5307 we have to send audio
// packets even when the pacer is paused. Here we assume audio packets are
// always high priority and that they are the only high priority packets.
@ -436,7 +439,7 @@ bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
critsect_->Leave();
const bool success = packet_sender_->TimeToSendPacket(
packet.ssrc, packet.sequence_number, packet.capture_time_ms,
packet.retransmission);
packet.retransmission, probe_cluster_id);
critsect_->Enter();
if (success) {

View File

@ -42,7 +42,8 @@ class PacedSender : public Module, public RtpPacketSender {
virtual bool TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission) = 0;
bool retransmission,
int probe_cluster_id) = 0;
// Called when it's a good time to send a padding data.
// Returns the number of bytes sent.
virtual size_t TimeToSendPadding(size_t bytes) = 0;
@ -130,7 +131,7 @@ class PacedSender : public Module, public RtpPacketSender {
void UpdateBytesPerInterval(int64_t delta_time_in_ms)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool SendPacket(const paced_sender::Packet& packet)
bool SendPacket(const paced_sender::Packet& packet, int probe_cluster_id)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void SendPadding(size_t padding_needed) EXCLUSIVE_LOCKS_REQUIRED(critsect_);

View File

@ -26,11 +26,12 @@ static const int kTargetBitrateBps = 800000;
class MockPacedSenderCallback : public PacedSender::PacketSender {
public:
MOCK_METHOD4(TimeToSendPacket,
MOCK_METHOD5(TimeToSendPacket,
bool(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission));
bool retransmission,
int probe_cluster_id));
MOCK_METHOD1(TimeToSendPadding,
size_t(size_t bytes));
};
@ -42,11 +43,12 @@ class PacedSenderPadding : public PacedSender::PacketSender {
bool TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission) {
bool retransmission,
int probe_cluster_id) override {
return true;
}
size_t TimeToSendPadding(size_t bytes) {
size_t TimeToSendPadding(size_t bytes) override {
const size_t kPaddingPacketSize = 224;
size_t num_packets = (bytes + kPaddingPacketSize - 1) / kPaddingPacketSize;
padding_sent_ += kPaddingPacketSize * num_packets;
@ -70,12 +72,13 @@ class PacedSenderProbing : public PacedSender::PacketSender {
bool TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission) {
bool retransmission,
int probe_cluster_id) override {
ExpectAndCountPacket();
return true;
}
size_t TimeToSendPadding(size_t bytes) {
size_t TimeToSendPadding(size_t bytes) override {
ExpectAndCountPacket();
return bytes;
}
@ -125,8 +128,8 @@ class PacedSenderTest : public ::testing::Test {
bool retransmission) {
send_bucket_->InsertPacket(priority, ssrc, sequence_number, capture_time_ms,
size, retransmission);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
capture_time_ms, false, _))
.Times(1)
.WillRepeatedly(Return(true));
}
@ -162,8 +165,8 @@ TEST_F(PacedSenderTest, QueuePacket) {
clock_.AdvanceTimeMilliseconds(1);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(1u, send_bucket_->QueueSizePackets());
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
queued_packet_timestamp, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number++,
queued_packet_timestamp, false, _))
.Times(1)
.WillRepeatedly(Return(true));
send_bucket_->Process();
@ -212,7 +215,7 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) {
for (int k = 0; k < 10; ++k) {
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
clock_.AdvanceTimeMilliseconds(5);
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _))
.Times(packets_to_send_per_interval)
.WillRepeatedly(Return(true));
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
@ -269,7 +272,7 @@ TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) {
for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, queued_sequence_number++, _, false))
TimeToSendPacket(ssrc, queued_sequence_number++, _, false, _))
.Times(1)
.WillRepeatedly(Return(true));
}
@ -441,7 +444,7 @@ TEST_F(PacedSenderTest, Priority) {
// Expect all high and normal priority to be sent out first.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms, false, _))
.Times(packets_to_send_per_interval + 1)
.WillRepeatedly(Return(true));
@ -452,8 +455,8 @@ TEST_F(PacedSenderTest, Priority) {
EXPECT_EQ(1u, send_bucket_->QueueSizePackets());
EXPECT_CALL(callback_,
TimeToSendPacket(
ssrc_low_priority, _, capture_time_ms_low_priority, false))
TimeToSendPacket(ssrc_low_priority, _,
capture_time_ms_low_priority, false, _))
.Times(1)
.WillRepeatedly(Return(true));
@ -491,8 +494,8 @@ TEST_F(PacedSenderTest, HighPrioDoesntAffectBudget) {
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
EXPECT_EQ(1u, send_bucket_->QueueSizePackets());
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number++, capture_time_ms, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number++,
capture_time_ms, false, _))
.Times(1)
.WillRepeatedly(Return(true));
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
@ -543,7 +546,7 @@ TEST_F(PacedSenderTest, Pause) {
// Expect no packet to come out while paused.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _)).Times(0);
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)).Times(0);
for (int i = 0; i < 10; ++i) {
clock_.AdvanceTimeMilliseconds(5);
@ -552,10 +555,11 @@ TEST_F(PacedSenderTest, Pause) {
}
// Expect high prio packets to come out first followed by all packets in the
// way they were added.
EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms, false))
EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms, false, _))
.Times(3)
.WillRepeatedly(Return(true));
EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms, false))
EXPECT_CALL(callback_,
TimeToSendPacket(_, _, second_capture_time_ms, false, _))
.Times(1)
.WillRepeatedly(Return(true));
send_bucket_->Resume();
@ -584,8 +588,8 @@ TEST_F(PacedSenderTest, ResendPacket) {
EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms,
send_bucket_->QueueInMs());
// Fails to send first packet so only one call.
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
capture_time_ms, false, _))
.Times(1)
.WillOnce(Return(false));
clock_.AdvanceTimeMilliseconds(10000);
@ -596,13 +600,12 @@ TEST_F(PacedSenderTest, ResendPacket) {
send_bucket_->QueueInMs());
// Fails to send second packet.
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
capture_time_ms, false, _))
.Times(1)
.WillOnce(Return(true));
EXPECT_CALL(
callback_,
TimeToSendPacket(ssrc, sequence_number + 1, capture_time_ms + 1, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1,
capture_time_ms + 1, false, _))
.Times(1)
.WillOnce(Return(false));
clock_.AdvanceTimeMilliseconds(10000);
@ -613,9 +616,8 @@ TEST_F(PacedSenderTest, ResendPacket) {
send_bucket_->QueueInMs());
// Send second packet and queue becomes empty.
EXPECT_CALL(
callback_,
TimeToSendPacket(ssrc, sequence_number + 1, capture_time_ms + 1, false))
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1,
capture_time_ms + 1, false, _))
.Times(1)
.WillOnce(Return(true));
clock_.AdvanceTimeMilliseconds(10000);
@ -768,18 +770,22 @@ TEST_F(PacedSenderTest, PriorityInversion) {
// Packets from earlier frames should be sent first.
{
::testing::InSequence sequence;
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
clock_.TimeInMilliseconds(), true))
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number,
clock_.TimeInMilliseconds(), true, _))
.WillOnce(Return(true));
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1,
clock_.TimeInMilliseconds(), true))
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number + 1,
clock_.TimeInMilliseconds(), true, _))
.WillOnce(Return(true));
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number + 3,
clock_.TimeInMilliseconds() + 33, true, _))
.WillOnce(Return(true));
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number + 2,
clock_.TimeInMilliseconds() + 33, true, _))
.WillOnce(Return(true));
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 3,
clock_.TimeInMilliseconds() + 33,
true)).WillOnce(Return(true));
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 2,
clock_.TimeInMilliseconds() + 33,
true)).WillOnce(Return(true));
while (send_bucket_->QueueSizePackets() > 0) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
@ -843,7 +849,7 @@ TEST_F(PacedSenderTest, AverageQueueTime) {
// Only first packet (queued for 20ms) should be removed, leave the second
// packet (queued for 10ms) alone in the queue.
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
first_capture_time, false))
first_capture_time, false, _))
.Times(1)
.WillRepeatedly(Return(true));
send_bucket_->Process();
@ -852,7 +858,7 @@ TEST_F(PacedSenderTest, AverageQueueTime) {
clock_.AdvanceTimeMilliseconds(10);
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1,
first_capture_time + 10, false))
first_capture_time + 10, false, _))
.Times(1)
.WillRepeatedly(Return(true));
for (int i = 0; i < 3; ++i) {
@ -863,5 +869,37 @@ TEST_F(PacedSenderTest, AverageQueueTime) {
EXPECT_EQ(0, send_bucket_->AverageQueueTimeMs());
}
TEST_F(PacedSenderTest, ProbeClusterId) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
const size_t kPacketSize = 1200;
send_bucket_->SetProbingEnabled(true);
for (int i = 0; i < 11; ++i) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number + i, clock_.TimeInMilliseconds(),
kPacketSize, false);
}
// First probing cluster.
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, 0))
.Times(6)
.WillRepeatedly(Return(true));
for (int i = 0; i < 6; ++i)
send_bucket_->Process();
// Second probing cluster.
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, 1))
.Times(5)
.WillRepeatedly(Return(true));
for (int i = 0; i < 5; ++i)
send_bucket_->Process();
// No more probing packets.
EXPECT_CALL(callback_, TimeToSendPadding(_))
.Times(1);
send_bucket_->Process();
}
} // namespace test
} // namespace webrtc

View File

@ -43,7 +43,8 @@ void PacketRouter::RemoveRtpModule(RtpRtcp* rtp_module) {
bool PacketRouter::TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_timestamp,
bool retransmission) {
bool retransmission,
int probe_cluster_id) {
RTC_DCHECK(pacer_thread_checker_.CalledOnValidThread());
rtc::CritScope cs(&modules_crit_);
for (auto* rtp_module : rtp_modules_) {

View File

@ -43,7 +43,8 @@ class PacketRouter : public PacedSender::PacketSender,
bool TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_timestamp,
bool retransmission) override;
bool retransmission,
int probe_cluster_id) override;
size_t TimeToSendPadding(size_t bytes) override;

View File

@ -53,7 +53,7 @@ TEST_F(PacketRouterTest, TimeToSendPacket) {
.WillOnce(Return(true));
EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number,
timestamp, retransmission));
timestamp, retransmission, -1));
// Send on the second module by letting rtp_2 be sending, but not rtp_1.
++sequence_number;
@ -69,7 +69,7 @@ TEST_F(PacketRouterTest, TimeToSendPacket) {
.Times(1)
.WillOnce(Return(true));
EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc2, sequence_number,
timestamp, retransmission));
timestamp, retransmission, -1));
// No module is sending, hence no packet should be sent.
EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false));
@ -77,7 +77,7 @@ TEST_F(PacketRouterTest, TimeToSendPacket) {
EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false));
EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number,
timestamp, retransmission));
timestamp, retransmission, -1));
// Add a packet with incorrect ssrc and test it's dropped in the router.
EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true));
@ -87,7 +87,7 @@ TEST_F(PacketRouterTest, TimeToSendPacket) {
EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _)).Times(0);
EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1 + kSsrc2, sequence_number,
timestamp, retransmission));
timestamp, retransmission, -1));
packet_router_->RemoveRtpModule(&rtp_1);
@ -97,7 +97,7 @@ TEST_F(PacketRouterTest, TimeToSendPacket) {
EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2));
EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number,
timestamp, retransmission));
timestamp, retransmission, -1));
packet_router_->RemoveRtpModule(&rtp_2);
}
@ -167,7 +167,7 @@ TEST_F(PacketRouterTest, SenderOnlyFunctionsRespectSendingMedia) {
// Verify that TimeToSendPacket does not end up in a receiver.
EXPECT_CALL(rtp, TimeToSendPacket(_, _, _, _)).Times(0);
EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc, 1, 1, false));
EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc, 1, 1, false, -1));
// Verify that TimeToSendPadding does not end up in a receiver.
EXPECT_CALL(rtp, TimeToSendPadding(_)).Times(0);
EXPECT_EQ(0u, packet_router_->TimeToSendPadding(200));

View File

@ -276,7 +276,8 @@ void PacedVideoSender::QueuePackets(Packets* batch,
bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission) {
bool retransmission,
int probe_cluster_id) {
for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end();
++it) {
MediaPacket* media_packet = static_cast<MediaPacket*>(*it);

View File

@ -113,7 +113,8 @@ class PacedVideoSender : public VideoSender, public PacedSender::PacketSender {
bool TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission) override;
bool retransmission,
int probe_cluster_id) override;
size_t TimeToSendPadding(size_t bytes) override;
// Implements BitrateObserver.