Allow packets to be reordered in the fake network pipe.

BUG=

Review URL: https://codereview.webrtc.org/1606183002

Cr-Commit-Position: refs/heads/master@{#11384}
This commit is contained in:
philipel 2016-01-26 08:41:53 -08:00 committed by Commit bot
parent 7fd881743c
commit a2c55235ca
5 changed files with 166 additions and 84 deletions

View File

@ -20,60 +20,16 @@
namespace webrtc {
const double kPi = 3.14159265;
static int GaussianRandom(int mean_delay_ms, int standard_deviation_ms) {
// Creating a Normal distribution variable from two independent uniform
// variables based on the Box-Muller transform.
double uniform1 = (rand() + 1.0) / (RAND_MAX + 1.0); // NOLINT
double uniform2 = (rand() + 1.0) / (RAND_MAX + 1.0); // NOLINT
return static_cast<int>(mean_delay_ms + standard_deviation_ms *
sqrt(-2 * log(uniform1)) * cos(2 * kPi * uniform2));
}
static bool UniformLoss(int loss_percent) {
int outcome = rand() % 100;
return outcome < loss_percent;
}
class NetworkPacket {
public:
NetworkPacket(const uint8_t* data, size_t length, int64_t send_time,
int64_t arrival_time)
: data_(NULL),
data_length_(length),
send_time_(send_time),
arrival_time_(arrival_time) {
data_ = new uint8_t[length];
memcpy(data_, data, length);
}
~NetworkPacket() {
delete [] data_;
}
uint8_t* data() const { return data_; }
size_t data_length() const { return data_length_; }
int64_t send_time() const { return send_time_; }
int64_t arrival_time() const { return arrival_time_; }
void IncrementArrivalTime(int64_t extra_delay) {
arrival_time_+= extra_delay;
}
private:
// The packet data.
uint8_t* data_;
// Length of data_.
size_t data_length_;
// The time the packet was sent out on the network.
const int64_t send_time_;
// The time the packet should arrive at the reciver.
int64_t arrival_time_;
};
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config)
: FakeNetworkPipe(clock, config, 1) {}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
uint64_t seed)
: clock_(clock),
packet_receiver_(NULL),
random_(seed),
config_(config),
dropped_packets_(0),
sent_packets_(0),
@ -86,8 +42,8 @@ FakeNetworkPipe::~FakeNetworkPipe() {
capacity_link_.pop();
}
while (!delay_link_.empty()) {
delete delay_link_.front();
delay_link_.pop();
delete *delay_link_.begin();
delay_link_.erase(delay_link_.begin());
}
}
@ -123,7 +79,7 @@ void FakeNetworkPipe::SendPacket(const uint8_t* data, size_t data_length) {
// Check if there already are packets on the link and change network start
// time if there is.
if (capacity_link_.size() > 0)
if (!capacity_link_.empty())
network_start_time = capacity_link_.back()->arrival_time();
int64_t arrival_time = network_start_time + capacity_delay_ms;
@ -156,41 +112,42 @@ void FakeNetworkPipe::Process() {
{
rtc::CritScope crit(&lock_);
// Check the capacity link first.
while (capacity_link_.size() > 0 &&
while (!capacity_link_.empty() &&
time_now >= capacity_link_.front()->arrival_time()) {
// Time to get this packet.
NetworkPacket* packet = capacity_link_.front();
capacity_link_.pop();
// Packets are randomly dropped after being affected by the bottleneck.
if (UniformLoss(config_.loss_percent)) {
if (random_.Rand(100) < static_cast<uint32_t>(config_.loss_percent)) {
delete packet;
continue;
}
// Add extra delay and jitter, but make sure the arrival time is not
// earlier than the last packet in the queue.
int extra_delay = GaussianRandom(config_.queue_delay_ms,
config_.delay_standard_deviation_ms);
if (delay_link_.size() > 0 &&
packet->arrival_time() + extra_delay <
delay_link_.back()->arrival_time()) {
extra_delay = delay_link_.back()->arrival_time() -
packet->arrival_time();
int arrival_time_jitter = random_.Gaussian(
config_.queue_delay_ms, config_.delay_standard_deviation_ms);
// If reordering is not allowed then adjust arrival_time_jitter
// to make sure all packets are sent in order.
if (!config_.allow_reordering && !delay_link_.empty() &&
packet->arrival_time() + arrival_time_jitter <
(*delay_link_.rbegin())->arrival_time()) {
arrival_time_jitter =
(*delay_link_.rbegin())->arrival_time() - packet->arrival_time();
}
packet->IncrementArrivalTime(extra_delay);
packet->IncrementArrivalTime(arrival_time_jitter);
if (packet->arrival_time() < next_process_time_)
next_process_time_ = packet->arrival_time();
delay_link_.push(packet);
delay_link_.insert(packet);
}
// Check the extra delay queue.
while (delay_link_.size() > 0 &&
time_now >= delay_link_.front()->arrival_time()) {
while (!delay_link_.empty() &&
time_now >= (*delay_link_.begin())->arrival_time()) {
// Deliver this packet.
NetworkPacket* packet = delay_link_.front();
NetworkPacket* packet = *delay_link_.begin();
packets_to_deliver.push(packet);
delay_link_.pop();
delay_link_.erase(delay_link_.begin());
// |time_now| might be later than when the packet should have arrived, due
// to NetworkProcess being called too late. For stats, use the time it
// should have been on the link.
@ -210,7 +167,7 @@ void FakeNetworkPipe::Process() {
int64_t FakeNetworkPipe::TimeUntilNextProcess() const {
rtc::CritScope crit(&lock_);
const int64_t kDefaultProcessIntervalMs = 30;
if (capacity_link_.size() == 0 || delay_link_.size() == 0)
if (capacity_link_.empty() || delay_link_.empty())
return kDefaultProcessIntervalMs;
return std::max<int64_t>(next_process_time_ - clock_->TimeInMilliseconds(),
0);

View File

@ -11,10 +11,13 @@
#ifndef WEBRTC_TEST_FAKE_NETWORK_PIPE_H_
#define WEBRTC_TEST_FAKE_NETWORK_PIPE_H_
#include <set>
#include <string.h>
#include <queue>
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/criticalsection.h"
#include "webrtc/base/random.h"
#include "webrtc/base/scoped_ptr.h"
#include "webrtc/typedefs.h"
@ -22,9 +25,40 @@ namespace webrtc {
class Clock;
class CriticalSectionWrapper;
class NetworkPacket;
class PacketReceiver;
class NetworkPacket {
public:
NetworkPacket(const uint8_t* data,
size_t length,
int64_t send_time,
int64_t arrival_time)
: data_(new uint8_t[length]),
data_length_(length),
send_time_(send_time),
arrival_time_(arrival_time) {
memcpy(data_.get(), data, length);
}
uint8_t* data() const { return data_.get(); }
size_t data_length() const { return data_length_; }
int64_t send_time() const { return send_time_; }
int64_t arrival_time() const { return arrival_time_; }
void IncrementArrivalTime(int64_t extra_delay) {
arrival_time_ += extra_delay;
}
private:
// The packet data.
rtc::scoped_ptr<uint8_t[]> data_;
// Length of data_.
size_t data_length_;
// The time the packet was sent out on the network.
const int64_t send_time_;
// The time the packet should arrive at the receiver.
int64_t arrival_time_;
};
// Class faking a network link. This is a simple and naive solution just faking
// capacity and adding an extra transport delay in addition to the capacity
// introduced delay.
@ -44,9 +78,14 @@ class FakeNetworkPipe {
int link_capacity_kbps = 0;
// Random packet loss.
int loss_percent = 0;
// If packets are allowed to be reordered.
bool allow_reordering = false;
};
FakeNetworkPipe(Clock* clock, const FakeNetworkPipe::Config& config);
FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
uint64_t seed);
~FakeNetworkPipe();
// Must not be called in parallel with SendPacket or Process.
@ -74,7 +113,17 @@ class FakeNetworkPipe {
rtc::CriticalSection lock_;
PacketReceiver* packet_receiver_;
std::queue<NetworkPacket*> capacity_link_;
std::queue<NetworkPacket*> delay_link_;
Random random_;
// Since we need to access both the packet with the earliest and latest
// arrival time we need to use a multiset to keep all packets sorted,
// hence, we cannot use a priority queue.
struct PacketArrivalTimeComparator {
bool operator()(const NetworkPacket* p1, const NetworkPacket* p2) {
return p1->arrival_time() < p2->arrival_time();
}
};
std::multiset<NetworkPacket*, PacketArrivalTimeComparator> delay_link_;
// Link configuration.
Config config_;

View File

@ -23,28 +23,45 @@ using ::testing::Invoke;
namespace webrtc {
class MockReceiver : public PacketReceiver {
class TestReceiver : public PacketReceiver {
public:
MockReceiver() {}
virtual ~MockReceiver() {}
TestReceiver() {}
virtual ~TestReceiver() {}
void IncomingPacket(const uint8_t* data, size_t length) {
DeliverPacket(MediaType::ANY, data, length, PacketTime());
delete [] data;
}
MOCK_METHOD4(
virtual MOCK_METHOD4(
DeliverPacket,
DeliveryStatus(MediaType, const uint8_t*, size_t, const PacketTime&));
};
class ReorderTestReceiver : public TestReceiver {
public:
ReorderTestReceiver() {}
virtual ~ReorderTestReceiver() {}
DeliveryStatus DeliverPacket(MediaType media_type,
const uint8_t* packet,
size_t length,
const PacketTime& packet_time) override {
int seq_num;
memcpy(&seq_num, packet, sizeof(int));
delivered_sequence_numbers_.push_back(seq_num);
return PacketReceiver::DELIVERY_OK;
}
std::vector<int> delivered_sequence_numbers_;
};
class FakeNetworkPipeTest : public ::testing::Test {
public:
FakeNetworkPipeTest() : fake_clock_(12345) {}
protected:
virtual void SetUp() {
receiver_.reset(new MockReceiver());
receiver_.reset(new TestReceiver());
ON_CALL(*receiver_, DeliverPacket(_, _, _, _))
.WillByDefault(Return(PacketReceiver::DELIVERY_OK));
}
@ -52,19 +69,23 @@ class FakeNetworkPipeTest : public ::testing::Test {
virtual void TearDown() {
}
void SendPackets(FakeNetworkPipe* pipe, int number_packets, int kPacketSize) {
rtc::scoped_ptr<uint8_t[]> packet(new uint8_t[kPacketSize]);
void SendPackets(FakeNetworkPipe* pipe, int number_packets, int packet_size) {
RTC_DCHECK_GE(packet_size, static_cast<int>(sizeof(int)));
rtc::scoped_ptr<uint8_t[]> packet(new uint8_t[packet_size]);
for (int i = 0; i < number_packets; ++i) {
pipe->SendPacket(packet.get(), kPacketSize);
// Set a sequence number for the packets by
// using the first bytes in the packet.
memcpy(packet.get(), &i, sizeof(int));
pipe->SendPacket(packet.get(), packet_size);
}
}
int PacketTimeMs(int capacity_kbps, int kPacketSize) const {
return 8 * kPacketSize / capacity_kbps;
int PacketTimeMs(int capacity_kbps, int packet_size) const {
return 8 * packet_size / capacity_kbps;
}
SimulatedClock fake_clock_;
rtc::scoped_ptr<MockReceiver> receiver_;
rtc::scoped_ptr<TestReceiver> receiver_;
};
void DeleteMemory(uint8_t* data, int length) { delete [] data; }
@ -308,4 +329,53 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithPacketsInPipeTest) {
EXPECT_CALL(*receiver_, DeliverPacket(_, _, _, _)).Times(0);
pipe->Process();
}
// At first disallow reordering and then allow reordering.
TEST_F(FakeNetworkPipeTest, DisallowReorderingThenAllowReordering) {
FakeNetworkPipe::Config config;
config.queue_length_packets = 1000;
config.link_capacity_kbps = 800;
config.queue_delay_ms = 100;
config.delay_standard_deviation_ms = 10;
rtc::scoped_ptr<FakeNetworkPipe> pipe(
new FakeNetworkPipe(&fake_clock_, config));
ReorderTestReceiver* receiver = new ReorderTestReceiver();
receiver_.reset(receiver);
pipe->SetReceiver(receiver_.get());
const uint32_t kNumPackets = 100;
const int kPacketSize = 10;
SendPackets(pipe.get(), kNumPackets, kPacketSize);
fake_clock_.AdvanceTimeMilliseconds(1000);
pipe->Process();
// Confirm that all packets have been delivered in order.
EXPECT_EQ(kNumPackets, receiver->delivered_sequence_numbers_.size());
int last_seq_num = -1;
for (int seq_num : receiver->delivered_sequence_numbers_) {
EXPECT_GT(seq_num, last_seq_num);
last_seq_num = seq_num;
}
config.allow_reordering = true;
pipe->SetConfig(config);
SendPackets(pipe.get(), kNumPackets, kPacketSize);
fake_clock_.AdvanceTimeMilliseconds(1000);
receiver->delivered_sequence_numbers_.clear();
pipe->Process();
// Confirm that all packets have been delivered
// and that reordering has occured.
EXPECT_EQ(kNumPackets, receiver->delivered_sequence_numbers_.size());
bool reordering_has_occured = false;
last_seq_num = -1;
for (int seq_num : receiver->delivered_sequence_numbers_) {
if (last_seq_num > seq_num) {
reordering_has_occured = true;
break;
}
last_seq_num = seq_num;
}
EXPECT_TRUE(reordering_has_occured);
}
} // namespace webrtc

View File

@ -174,6 +174,8 @@ DEFINE_bool(logs, false, "print logs to stderr");
DEFINE_bool(send_side_bwe, true, "Use send-side bandwidth estimation");
DEFINE_bool(allow_reordering, false, "Allow packet reordering to occur");
DEFINE_string(
force_fieldtrials,
"",
@ -212,6 +214,7 @@ void Loopback() {
pipe_config.queue_length_packets = flags::QueueSize();
pipe_config.queue_delay_ms = flags::AvgPropagationDelayMs();
pipe_config.delay_standard_deviation_ms = flags::StdPropagationDelayMs();
pipe_config.allow_reordering = flags::FLAGS_allow_reordering;
Call::Config::BitrateConfig call_bitrate_config;
call_bitrate_config.min_bitrate_bps = flags::MinBitrateKbps() * 1000;

View File

@ -176,6 +176,8 @@ DEFINE_bool(logs, false, "print logs to stderr");
DEFINE_bool(send_side_bwe, true, "Use send-side bandwidth estimation");
DEFINE_bool(allow_reordering, false, "Allow packet reordering to occur");
DEFINE_string(
force_fieldtrials,
"",
@ -201,6 +203,7 @@ void Loopback() {
pipe_config.queue_length_packets = flags::QueueSize();
pipe_config.queue_delay_ms = flags::AvgPropagationDelayMs();
pipe_config.delay_standard_deviation_ms = flags::StdPropagationDelayMs();
pipe_config.allow_reordering = flags::FLAGS_allow_reordering;
Call::Config::BitrateConfig call_bitrate_config;
call_bitrate_config.min_bitrate_bps = flags::MinBitrateKbps() * 1000;