Processing emulated network on task queue.
The has some benefits: * We no longer need locks to protect the emulated network node state. * We only process when there are packets in flight. * It makes Scenario more similar to network emulation manager. Bug: webrtc:10365 Change-Id: I8bd1ad1edfb54b047e8109dabd9846ae451cef17 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/127548 Commit-Queue: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Artem Titov <titovartem@webrtc.org> Cr-Commit-Position: refs/heads/master@{#27393}
This commit is contained in:
parent
6c072efe9f
commit
4124dab7f3
@ -118,7 +118,7 @@ TEST_F(BbrNetworkControllerTest, SendsConfigurationOnNetworkRouteChanged) {
|
||||
TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) {
|
||||
BbrNetworkControllerFactory factory;
|
||||
Scenario s("bbr_unit/updates_rate", false);
|
||||
SimulatedTimeClientConfig config;
|
||||
CallClientConfig config;
|
||||
config.transport.cc =
|
||||
TransportControllerConfig::CongestionController::kInjected;
|
||||
config.transport.cc_factory = &factory;
|
||||
@ -136,11 +136,14 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) {
|
||||
c->simulation.delay = TimeDelta::ms(100);
|
||||
c->update_frequency = TimeDelta::ms(5);
|
||||
});
|
||||
SimulatedTimeClient* client = s.CreateSimulatedTimeClient(
|
||||
"send", config, {PacketStreamConfig()}, {send_net}, {ret_net});
|
||||
auto* client = s.CreateClient("send", config);
|
||||
auto routes =
|
||||
s.CreateRoutes(client, {send_net},
|
||||
s.CreateClient("recv", CallClientConfig()), {ret_net});
|
||||
s.CreateVideoStream(routes->forward(), VideoStreamConfig());
|
||||
|
||||
s.RunFor(TimeDelta::seconds(25));
|
||||
EXPECT_NEAR(client->target_rate_kbps(), 450, 100);
|
||||
EXPECT_NEAR(client->send_bandwidth().kbps(), 450, 100);
|
||||
|
||||
send_net->UpdateConfig([](NetworkNodeConfig* c) {
|
||||
c->simulation.bandwidth = DataRate::kbps(800);
|
||||
@ -148,7 +151,7 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) {
|
||||
});
|
||||
|
||||
s.RunFor(TimeDelta::seconds(20));
|
||||
EXPECT_NEAR(client->target_rate_kbps(), 750, 150);
|
||||
EXPECT_NEAR(client->send_bandwidth().kbps(), 750, 150);
|
||||
|
||||
send_net->UpdateConfig([](NetworkNodeConfig* c) {
|
||||
c->simulation.bandwidth = DataRate::kbps(200);
|
||||
@ -158,7 +161,7 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) {
|
||||
[](NetworkNodeConfig* c) { c->simulation.delay = TimeDelta::ms(200); });
|
||||
|
||||
s.RunFor(TimeDelta::seconds(40));
|
||||
EXPECT_NEAR(client->target_rate_kbps(), 200, 40);
|
||||
EXPECT_NEAR(client->send_bandwidth().kbps(), 200, 40);
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
|
||||
@ -36,11 +36,13 @@ rtc_source_set("emulated_network") {
|
||||
"../../../api/units:timestamp",
|
||||
"../../../rtc_base",
|
||||
"../../../rtc_base:rtc_base_tests_utils",
|
||||
"../../../rtc_base:rtc_task_queue",
|
||||
"../../../rtc_base:safe_minmax",
|
||||
"../../../rtc_base:task_queue_for_test",
|
||||
"../../../rtc_base/task_utils:repeating_task",
|
||||
"../../../rtc_base/third_party/sigslot",
|
||||
"../../../system_wrappers",
|
||||
"../../time_controller",
|
||||
"//third_party/abseil-cpp/absl/memory",
|
||||
"//third_party/abseil-cpp/absl/types:optional",
|
||||
]
|
||||
|
||||
@ -21,81 +21,42 @@
|
||||
#include "test/gmock.h"
|
||||
#include "test/gtest.h"
|
||||
#include "test/scenario/network/cross_traffic.h"
|
||||
#include "test/scenario/network/network_emulation.h"
|
||||
#include "test/scenario/network/network_emulation_manager.h"
|
||||
|
||||
namespace webrtc {
|
||||
namespace test {
|
||||
namespace {
|
||||
|
||||
class CountingBehavior : public NetworkBehaviorInterface {
|
||||
class CountingReceiver : public EmulatedNetworkReceiverInterface {
|
||||
public:
|
||||
bool EnqueuePacket(PacketInFlightInfo packet_info) override {
|
||||
packets_to_send_.push_back(packet_info);
|
||||
return true;
|
||||
void OnPacketReceived(EmulatedIpPacket packet) override {
|
||||
packets_count_++;
|
||||
total_packets_size_ += packet.size();
|
||||
}
|
||||
|
||||
std::vector<PacketDeliveryInfo> DequeueDeliverablePackets(
|
||||
int64_t receive_time_us) override {
|
||||
std::vector<PacketDeliveryInfo> out;
|
||||
for (auto packet : packets_to_send_) {
|
||||
// we want to count packets, that went through this behavior.
|
||||
packets_count_++;
|
||||
total_packets_size_ += packet.size;
|
||||
out.push_back(PacketDeliveryInfo(packet, receive_time_us));
|
||||
}
|
||||
packets_to_send_.clear();
|
||||
return out;
|
||||
}
|
||||
|
||||
absl::optional<int64_t> NextDeliveryTimeUs() const override { return 1000; }
|
||||
|
||||
int packets_count() const { return packets_count_; }
|
||||
uint64_t total_packets_size() const { return total_packets_size_; }
|
||||
|
||||
private:
|
||||
std::vector<PacketInFlightInfo> packets_to_send_;
|
||||
|
||||
std::atomic<int> packets_count_{0};
|
||||
std::atomic<uint64_t> total_packets_size_{0};
|
||||
};
|
||||
struct TrafficCounterFixture {
|
||||
SimulatedClock clock{0};
|
||||
CountingReceiver counter;
|
||||
EmulatedEndpoint endpoint{1 /*id */, rtc::IPAddress(), true /*is_enabled*/,
|
||||
&clock};
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST(CrossTrafficTest, TriggerPacketBurst) {
|
||||
NetworkEmulationManagerImpl network_manager;
|
||||
TrafficCounterFixture fixture;
|
||||
TrafficRoute traffic(&fixture.clock, &fixture.counter, &fixture.endpoint);
|
||||
traffic.TriggerPacketBurst(100, 1000);
|
||||
|
||||
std::unique_ptr<CountingBehavior> behavior =
|
||||
absl::make_unique<CountingBehavior>();
|
||||
CountingBehavior* counter = behavior.get();
|
||||
|
||||
EmulatedNetworkNode* node_a = network_manager.CreateEmulatedNode(
|
||||
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
|
||||
EmulatedNetworkNode* node_b =
|
||||
network_manager.CreateEmulatedNode(std::move(behavior));
|
||||
|
||||
TrafficRoute* traffic = network_manager.CreateTrafficRoute({node_a, node_b});
|
||||
|
||||
traffic->TriggerPacketBurst(100, 1000);
|
||||
|
||||
rtc::Event event;
|
||||
event.Wait(1000);
|
||||
|
||||
EXPECT_EQ(counter->packets_count(), 100);
|
||||
EXPECT_EQ(counter->total_packets_size(), 100 * 1000ul);
|
||||
EXPECT_EQ(fixture.counter.packets_count_, 100);
|
||||
EXPECT_EQ(fixture.counter.total_packets_size_, 100 * 1000ul);
|
||||
}
|
||||
|
||||
TEST(CrossTrafficTest, PulsedPeaksCrossTraffic) {
|
||||
NetworkEmulationManagerImpl network_manager;
|
||||
|
||||
std::unique_ptr<CountingBehavior> behavior =
|
||||
absl::make_unique<CountingBehavior>();
|
||||
CountingBehavior* counter = behavior.get();
|
||||
|
||||
EmulatedNetworkNode* node_a = network_manager.CreateEmulatedNode(
|
||||
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
|
||||
EmulatedNetworkNode* node_b =
|
||||
network_manager.CreateEmulatedNode(std::move(behavior));
|
||||
TrafficCounterFixture fixture;
|
||||
TrafficRoute traffic(&fixture.clock, &fixture.counter, &fixture.endpoint);
|
||||
|
||||
PulsedPeaksConfig config;
|
||||
config.peak_rate = DataRate::kbps(1000);
|
||||
@ -103,32 +64,24 @@ TEST(CrossTrafficTest, PulsedPeaksCrossTraffic) {
|
||||
config.min_packet_interval = TimeDelta::ms(25);
|
||||
config.send_duration = TimeDelta::ms(500);
|
||||
config.hold_duration = TimeDelta::ms(250);
|
||||
TrafficRoute* traffic = network_manager.CreateTrafficRoute({node_a, node_b});
|
||||
network_manager.CreatePulsedPeaksCrossTraffic(traffic, config);
|
||||
PulsedPeaksCrossTraffic pulsed_peaks(config, &traffic);
|
||||
const auto kRunTime = TimeDelta::seconds(1);
|
||||
while (fixture.clock.TimeInMilliseconds() < kRunTime.ms()) {
|
||||
pulsed_peaks.Process(Timestamp::ms(fixture.clock.TimeInMilliseconds()));
|
||||
fixture.clock.AdvanceTimeMilliseconds(1);
|
||||
}
|
||||
|
||||
rtc::Event event;
|
||||
event.Wait(kRunTime.ms());
|
||||
|
||||
RTC_LOG(INFO) << counter->packets_count() << " packets; "
|
||||
<< counter->total_packets_size() << " bytes";
|
||||
RTC_LOG(INFO) << fixture.counter.packets_count_ << " packets; "
|
||||
<< fixture.counter.total_packets_size_ << " bytes";
|
||||
// Using 50% duty cycle.
|
||||
const auto kExpectedDataSent = kRunTime * config.peak_rate * 0.5;
|
||||
EXPECT_NEAR(counter->total_packets_size(), kExpectedDataSent.bytes(),
|
||||
EXPECT_NEAR(fixture.counter.total_packets_size_, kExpectedDataSent.bytes(),
|
||||
kExpectedDataSent.bytes() * 0.1);
|
||||
}
|
||||
|
||||
TEST(CrossTrafficTest, RandomWalkCrossTraffic) {
|
||||
NetworkEmulationManagerImpl network_manager;
|
||||
|
||||
std::unique_ptr<CountingBehavior> behavior =
|
||||
absl::make_unique<CountingBehavior>();
|
||||
CountingBehavior* counter = behavior.get();
|
||||
|
||||
EmulatedNetworkNode* node_a = network_manager.CreateEmulatedNode(
|
||||
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
|
||||
EmulatedNetworkNode* node_b =
|
||||
network_manager.CreateEmulatedNode(std::move(behavior));
|
||||
TrafficCounterFixture fixture;
|
||||
TrafficRoute traffic(&fixture.clock, &fixture.counter, &fixture.endpoint);
|
||||
|
||||
RandomWalkConfig config;
|
||||
config.peak_rate = DataRate::kbps(1000);
|
||||
@ -137,18 +90,19 @@ TEST(CrossTrafficTest, RandomWalkCrossTraffic) {
|
||||
config.update_interval = TimeDelta::ms(500);
|
||||
config.variance = 0.0;
|
||||
config.bias = 1.0;
|
||||
TrafficRoute* traffic = network_manager.CreateTrafficRoute({node_a, node_b});
|
||||
network_manager.CreateRandomWalkCrossTraffic(traffic, config);
|
||||
|
||||
RandomWalkCrossTraffic random_walk(config, &traffic);
|
||||
const auto kRunTime = TimeDelta::seconds(1);
|
||||
while (fixture.clock.TimeInMilliseconds() < kRunTime.ms()) {
|
||||
random_walk.Process(Timestamp::ms(fixture.clock.TimeInMilliseconds()));
|
||||
fixture.clock.AdvanceTimeMilliseconds(1);
|
||||
}
|
||||
|
||||
rtc::Event event;
|
||||
event.Wait(kRunTime.ms());
|
||||
|
||||
RTC_LOG(INFO) << counter->packets_count() << " packets; "
|
||||
<< counter->total_packets_size() << " bytes";
|
||||
RTC_LOG(INFO) << fixture.counter.packets_count_ << " packets; "
|
||||
<< fixture.counter.total_packets_size_ << " bytes";
|
||||
// Sending at peak rate since bias = 1.
|
||||
const auto kExpectedDataSent = kRunTime * config.peak_rate;
|
||||
EXPECT_NEAR(counter->total_packets_size(), kExpectedDataSent.bytes(),
|
||||
EXPECT_NEAR(fixture.counter.total_packets_size_, kExpectedDataSent.bytes(),
|
||||
kExpectedDataSent.bytes() * 0.1);
|
||||
}
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@
|
||||
#include "test/scenario/network/fake_network_socket_server.h"
|
||||
|
||||
#include <utility>
|
||||
#include "rtc_base/thread.h"
|
||||
|
||||
namespace webrtc {
|
||||
namespace test {
|
||||
|
||||
@ -48,13 +48,28 @@ void EmulatedNetworkNode::ClearRoute(rtc::IPAddress receiver_ip,
|
||||
}
|
||||
|
||||
EmulatedNetworkNode::EmulatedNetworkNode(
|
||||
Clock* clock,
|
||||
rtc::TaskQueue* task_queue,
|
||||
std::unique_ptr<NetworkBehaviorInterface> network_behavior)
|
||||
: network_behavior_(std::move(network_behavior)) {}
|
||||
: clock_(clock),
|
||||
task_queue_(task_queue),
|
||||
network_behavior_(std::move(network_behavior)) {}
|
||||
|
||||
EmulatedNetworkNode::~EmulatedNetworkNode() = default;
|
||||
|
||||
void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
|
||||
rtc::CritScope crit(&lock_);
|
||||
struct Closure {
|
||||
void operator()() {
|
||||
RTC_DCHECK_RUN_ON(node->task_queue_);
|
||||
node->HandlePacketReceived(std::move(packet));
|
||||
}
|
||||
EmulatedNetworkNode* node;
|
||||
EmulatedIpPacket packet;
|
||||
};
|
||||
task_queue_->PostTask(Closure{this, std::move(packet)});
|
||||
}
|
||||
|
||||
void EmulatedNetworkNode::HandlePacketReceived(EmulatedIpPacket packet) {
|
||||
if (routing_.find(packet.to.ipaddr()) == routing_.end()) {
|
||||
return;
|
||||
}
|
||||
@ -64,48 +79,55 @@ void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
|
||||
if (sent) {
|
||||
packets_.emplace_back(StoredPacket{packet_id, std::move(packet), false});
|
||||
}
|
||||
if (process_task_.Running())
|
||||
return;
|
||||
absl::optional<int64_t> next_time_us =
|
||||
network_behavior_->NextDeliveryTimeUs();
|
||||
if (!next_time_us)
|
||||
return;
|
||||
Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds());
|
||||
process_task_ = RepeatingTaskHandle::DelayedStart(
|
||||
task_queue_->Get(),
|
||||
std::max(TimeDelta::Zero(), Timestamp::us(*next_time_us) - current_time),
|
||||
[this]() {
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds());
|
||||
Process(current_time);
|
||||
absl::optional<int64_t> next_time_us =
|
||||
network_behavior_->NextDeliveryTimeUs();
|
||||
if (!next_time_us) {
|
||||
process_task_.Stop();
|
||||
return TimeDelta::Zero(); // This is ignored.
|
||||
}
|
||||
RTC_DCHECK_GE(*next_time_us, current_time.us());
|
||||
return Timestamp::us(*next_time_us) - current_time;
|
||||
});
|
||||
}
|
||||
|
||||
void EmulatedNetworkNode::Process(Timestamp at_time) {
|
||||
std::vector<PacketDeliveryInfo> delivery_infos;
|
||||
{
|
||||
rtc::CritScope crit(&lock_);
|
||||
absl::optional<int64_t> delivery_us =
|
||||
network_behavior_->NextDeliveryTimeUs();
|
||||
if (delivery_us && *delivery_us > at_time.us())
|
||||
return;
|
||||
|
||||
delivery_infos = network_behavior_->DequeueDeliverablePackets(at_time.us());
|
||||
}
|
||||
std::vector<PacketDeliveryInfo> delivery_infos =
|
||||
network_behavior_->DequeueDeliverablePackets(at_time.us());
|
||||
for (PacketDeliveryInfo& delivery_info : delivery_infos) {
|
||||
StoredPacket* packet = nullptr;
|
||||
EmulatedNetworkReceiverInterface* receiver = nullptr;
|
||||
{
|
||||
rtc::CritScope crit(&lock_);
|
||||
for (auto& stored_packet : packets_) {
|
||||
if (stored_packet.id == delivery_info.packet_id) {
|
||||
packet = &stored_packet;
|
||||
break;
|
||||
}
|
||||
for (auto& stored_packet : packets_) {
|
||||
if (stored_packet.id == delivery_info.packet_id) {
|
||||
packet = &stored_packet;
|
||||
break;
|
||||
}
|
||||
RTC_CHECK(packet);
|
||||
RTC_DCHECK(!packet->removed);
|
||||
receiver = routing_[packet->packet.to.ipaddr()];
|
||||
packet->removed = true;
|
||||
}
|
||||
RTC_CHECK(receiver);
|
||||
// We don't want to keep the lock here. Otherwise we would get a deadlock if
|
||||
// the receiver tries to push a new packet.
|
||||
RTC_CHECK(packet);
|
||||
RTC_DCHECK(!packet->removed);
|
||||
auto receiver_it = routing_.find(packet->packet.to.ipaddr());
|
||||
RTC_CHECK(receiver_it != routing_.end());
|
||||
packet->removed = true;
|
||||
|
||||
if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
|
||||
packet->packet.arrival_time =
|
||||
Timestamp::us(delivery_info.receive_time_us);
|
||||
receiver->OnPacketReceived(std::move(packet->packet));
|
||||
receiver_it->second->OnPacketReceived(std::move(packet->packet));
|
||||
}
|
||||
{
|
||||
rtc::CritScope crit(&lock_);
|
||||
while (!packets_.empty() && packets_.front().removed) {
|
||||
packets_.pop_front();
|
||||
}
|
||||
while (!packets_.empty() && packets_.front().removed) {
|
||||
packets_.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -113,15 +135,17 @@ void EmulatedNetworkNode::Process(Timestamp at_time) {
|
||||
void EmulatedNetworkNode::SetReceiver(
|
||||
rtc::IPAddress dest_ip,
|
||||
EmulatedNetworkReceiverInterface* receiver) {
|
||||
rtc::CritScope crit(&lock_);
|
||||
EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
|
||||
RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
|
||||
<< "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
|
||||
routing_[dest_ip] = receiver;
|
||||
task_queue_->PostTask([=] {
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
|
||||
RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
|
||||
<< "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
|
||||
routing_[dest_ip] = receiver;
|
||||
});
|
||||
}
|
||||
|
||||
void EmulatedNetworkNode::RemoveReceiver(rtc::IPAddress dest_ip) {
|
||||
rtc::CritScope crit(&lock_);
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
routing_.erase(dest_ip);
|
||||
}
|
||||
|
||||
|
||||
@ -21,12 +21,11 @@
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/test/simulated_network.h"
|
||||
#include "api/units/timestamp.h"
|
||||
#include "rtc_base/async_socket.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/critical_section.h"
|
||||
#include "rtc_base/network.h"
|
||||
#include "rtc_base/socket_address.h"
|
||||
#include "rtc_base/thread.h"
|
||||
#include "rtc_base/task_queue_for_test.h"
|
||||
#include "rtc_base/task_utils/repeating_task.h"
|
||||
#include "rtc_base/thread_checker.h"
|
||||
#include "system_wrappers/include/clock.h"
|
||||
|
||||
@ -78,13 +77,16 @@ class EmulatedNetworkNode : public EmulatedNetworkReceiverInterface {
|
||||
// Creates node based on |network_behavior|. The specified |packet_overhead|
|
||||
// is added to the size of each packet in the information provided to
|
||||
// |network_behavior|.
|
||||
// |task_queue| is used to process packets and to forward the packets when
|
||||
// they are ready.
|
||||
explicit EmulatedNetworkNode(
|
||||
Clock* clock,
|
||||
rtc::TaskQueue* task_queue,
|
||||
std::unique_ptr<NetworkBehaviorInterface> network_behavior);
|
||||
~EmulatedNetworkNode() override;
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(EmulatedNetworkNode);
|
||||
|
||||
void OnPacketReceived(EmulatedIpPacket packet) override;
|
||||
void Process(Timestamp at_time);
|
||||
void SetReceiver(rtc::IPAddress dest_ip,
|
||||
EmulatedNetworkReceiverInterface* receiver);
|
||||
void RemoveReceiver(rtc::IPAddress dest_ip);
|
||||
@ -98,20 +100,23 @@ class EmulatedNetworkNode : public EmulatedNetworkReceiverInterface {
|
||||
std::vector<EmulatedNetworkNode*> nodes);
|
||||
|
||||
private:
|
||||
void Process(Timestamp at_time) RTC_RUN_ON(task_queue_);
|
||||
void HandlePacketReceived(EmulatedIpPacket packet) RTC_RUN_ON(task_queue_);
|
||||
struct StoredPacket {
|
||||
uint64_t id;
|
||||
EmulatedIpPacket packet;
|
||||
bool removed;
|
||||
};
|
||||
|
||||
rtc::CriticalSection lock_;
|
||||
Clock* const clock_;
|
||||
rtc::TaskQueue* const task_queue_;
|
||||
RepeatingTaskHandle process_task_;
|
||||
std::map<rtc::IPAddress, EmulatedNetworkReceiverInterface*> routing_
|
||||
RTC_GUARDED_BY(lock_);
|
||||
RTC_GUARDED_BY(task_queue_);
|
||||
const std::unique_ptr<NetworkBehaviorInterface> network_behavior_
|
||||
RTC_GUARDED_BY(lock_);
|
||||
std::deque<StoredPacket> packets_ RTC_GUARDED_BY(lock_);
|
||||
RTC_GUARDED_BY(task_queue_);
|
||||
std::deque<StoredPacket> packets_ RTC_GUARDED_BY(task_queue_);
|
||||
|
||||
uint64_t next_packet_id_ RTC_GUARDED_BY(lock_) = 1;
|
||||
uint64_t next_packet_id_ RTC_GUARDED_BY(task_queue_) = 1;
|
||||
};
|
||||
|
||||
// Represents single network interface on the device.
|
||||
|
||||
@ -48,8 +48,8 @@ NetworkEmulationManagerImpl::~NetworkEmulationManagerImpl() = default;
|
||||
|
||||
EmulatedNetworkNode* NetworkEmulationManagerImpl::CreateEmulatedNode(
|
||||
std::unique_ptr<NetworkBehaviorInterface> network_behavior) {
|
||||
auto node =
|
||||
absl::make_unique<EmulatedNetworkNode>(std::move(network_behavior));
|
||||
auto node = absl::make_unique<EmulatedNetworkNode>(
|
||||
clock_, &task_queue_, std::move(network_behavior));
|
||||
EmulatedNetworkNode* out = node.get();
|
||||
|
||||
struct Closure {
|
||||
@ -126,19 +126,20 @@ EmulatedRoute* NetworkEmulationManagerImpl::CreateRoute(
|
||||
|
||||
void NetworkEmulationManagerImpl::ClearRoute(EmulatedRoute* route) {
|
||||
RTC_CHECK(route->active) << "Route already cleared";
|
||||
task_queue_.SendTask([route]() {
|
||||
// Remove receiver from intermediate nodes.
|
||||
for (auto* node : route->via_nodes) {
|
||||
node->RemoveReceiver(route->to->GetPeerLocalAddress());
|
||||
}
|
||||
// Detach endpoint from current send node.
|
||||
if (route->from->GetSendNode()) {
|
||||
route->from->GetSendNode()->RemoveReceiver(
|
||||
route->to->GetPeerLocalAddress());
|
||||
route->from->SetSendNode(nullptr);
|
||||
}
|
||||
|
||||
// Remove receiver from intermediate nodes.
|
||||
for (auto* node : route->via_nodes) {
|
||||
node->RemoveReceiver(route->to->GetPeerLocalAddress());
|
||||
}
|
||||
// Detach endpoint from current send node.
|
||||
if (route->from->GetSendNode()) {
|
||||
route->from->GetSendNode()->RemoveReceiver(
|
||||
route->to->GetPeerLocalAddress());
|
||||
route->from->SetSendNode(nullptr);
|
||||
}
|
||||
|
||||
route->active = false;
|
||||
route->active = false;
|
||||
});
|
||||
}
|
||||
|
||||
TrafficRoute* NetworkEmulationManagerImpl::CreateTrafficRoute(
|
||||
@ -245,9 +246,6 @@ void NetworkEmulationManagerImpl::ProcessNetworkPackets() {
|
||||
for (auto& traffic : pulsed_cross_traffics_) {
|
||||
traffic->Process(current_time);
|
||||
}
|
||||
for (auto& node : network_nodes_) {
|
||||
node->Process(current_time);
|
||||
}
|
||||
}
|
||||
|
||||
Timestamp NetworkEmulationManagerImpl::Now() const {
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
namespace webrtc {
|
||||
namespace test {
|
||||
namespace {
|
||||
constexpr char kDummyTransportName[] = "dummy";
|
||||
SimulatedNetwork::Config CreateSimulationConfig(NetworkNodeConfig config) {
|
||||
SimulatedNetwork::Config sim_config;
|
||||
sim_config.link_capacity_kbps = config.simulation.bandwidth.kbps_or(0);
|
||||
@ -41,13 +42,15 @@ void ActionReceiver::OnPacketReceived(EmulatedIpPacket packet) {
|
||||
}
|
||||
|
||||
std::unique_ptr<SimulationNode> SimulationNode::Create(
|
||||
Clock* clock,
|
||||
rtc::TaskQueue* task_queue,
|
||||
NetworkNodeConfig config) {
|
||||
RTC_DCHECK(config.mode == NetworkNodeConfig::TrafficMode::kSimulation);
|
||||
SimulatedNetwork::Config sim_config = CreateSimulationConfig(config);
|
||||
auto network = absl::make_unique<SimulatedNetwork>(sim_config);
|
||||
SimulatedNetwork* simulation_ptr = network.get();
|
||||
return std::unique_ptr<SimulationNode>(
|
||||
new SimulationNode(config, std::move(network), simulation_ptr));
|
||||
return std::unique_ptr<SimulationNode>(new SimulationNode(
|
||||
clock, task_queue, config, std::move(network), simulation_ptr));
|
||||
}
|
||||
|
||||
void SimulationNode::UpdateConfig(
|
||||
@ -73,10 +76,12 @@ ColumnPrinter SimulationNode::ConfigPrinter() const {
|
||||
}
|
||||
|
||||
SimulationNode::SimulationNode(
|
||||
Clock* clock,
|
||||
rtc::TaskQueue* task_queue,
|
||||
NetworkNodeConfig config,
|
||||
std::unique_ptr<NetworkBehaviorInterface> behavior,
|
||||
SimulatedNetwork* simulation)
|
||||
: EmulatedNetworkNode(std::move(behavior)),
|
||||
: EmulatedNetworkNode(clock, task_queue, std::move(behavior)),
|
||||
simulated_network_(simulation),
|
||||
config_(config) {}
|
||||
|
||||
@ -126,26 +131,37 @@ bool NetworkNodeTransport::SendRtcp(const uint8_t* packet, size_t length) {
|
||||
void NetworkNodeTransport::Connect(EmulatedNetworkNode* send_node,
|
||||
rtc::IPAddress receiver_ip,
|
||||
DataSize packet_overhead) {
|
||||
// Only IPv4 address is supported. We don't use full range of IPs in scenario
|
||||
// framework and also we need a simple way to convert IP into network_id
|
||||
// to signal network route.
|
||||
RTC_CHECK_EQ(receiver_ip.family(), AF_INET);
|
||||
RTC_CHECK_LE(receiver_ip.v4AddressAsHostOrderInteger(),
|
||||
std::numeric_limits<uint16_t>::max());
|
||||
rtc::CritScope crit(&crit_sect_);
|
||||
send_net_ = send_node;
|
||||
receiver_address_ = rtc::SocketAddress(receiver_ip, 0);
|
||||
packet_overhead_ = packet_overhead;
|
||||
|
||||
rtc::NetworkRoute route;
|
||||
route.connected = true;
|
||||
route.local_network_id =
|
||||
static_cast<uint16_t>(receiver_ip.v4AddressAsHostOrderInteger());
|
||||
route.remote_network_id =
|
||||
static_cast<uint16_t>(receiver_ip.v4AddressAsHostOrderInteger());
|
||||
std::string transport_name = "dummy";
|
||||
{
|
||||
// Only IPv4 address is supported. We don't use full range of IPs in
|
||||
// scenario framework and also we need a simple way to convert IP into
|
||||
// network_id to signal network route.
|
||||
RTC_CHECK_EQ(receiver_ip.family(), AF_INET);
|
||||
RTC_CHECK_LE(receiver_ip.v4AddressAsHostOrderInteger(),
|
||||
std::numeric_limits<uint16_t>::max());
|
||||
rtc::CritScope crit(&crit_sect_);
|
||||
send_net_ = send_node;
|
||||
receiver_address_ = rtc::SocketAddress(receiver_ip, 0);
|
||||
packet_overhead_ = packet_overhead;
|
||||
current_network_route_ = route;
|
||||
}
|
||||
|
||||
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged(
|
||||
transport_name, route);
|
||||
kDummyTransportName, route);
|
||||
}
|
||||
|
||||
void NetworkNodeTransport::Disconnect() {
|
||||
rtc::CritScope crit(&crit_sect_);
|
||||
current_network_route_.connected = false;
|
||||
sender_call_->GetTransportControllerSend()->OnNetworkRouteChanged(
|
||||
kDummyTransportName, current_network_route_);
|
||||
current_network_route_ = {};
|
||||
send_net_ = nullptr;
|
||||
}
|
||||
|
||||
CrossTrafficSource::CrossTrafficSource(EmulatedNetworkReceiverInterface* target,
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include "call/simulated_network.h"
|
||||
#include "rtc_base/constructor_magic.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
#include "test/scenario/column_printer.h"
|
||||
#include "test/scenario/network/network_emulation.h"
|
||||
#include "test/scenario/scenario_config.h"
|
||||
@ -55,10 +56,14 @@ class SimulationNode : public EmulatedNetworkNode {
|
||||
private:
|
||||
friend class Scenario;
|
||||
|
||||
SimulationNode(NetworkNodeConfig config,
|
||||
SimulationNode(Clock* clock,
|
||||
rtc::TaskQueue* task_queue,
|
||||
NetworkNodeConfig config,
|
||||
std::unique_ptr<NetworkBehaviorInterface> behavior,
|
||||
SimulatedNetwork* simulation);
|
||||
static std::unique_ptr<SimulationNode> Create(NetworkNodeConfig config);
|
||||
static std::unique_ptr<SimulationNode> Create(Clock* clock,
|
||||
rtc::TaskQueue* task_queue,
|
||||
NetworkNodeConfig config);
|
||||
|
||||
SimulatedNetwork* const simulated_network_;
|
||||
NetworkNodeConfig config_;
|
||||
@ -77,6 +82,7 @@ class NetworkNodeTransport : public Transport {
|
||||
void Connect(EmulatedNetworkNode* send_node,
|
||||
rtc::IPAddress receiver_ip,
|
||||
DataSize packet_overhead);
|
||||
void Disconnect();
|
||||
|
||||
DataSize packet_overhead() {
|
||||
rtc::CritScope crit(&crit_sect_);
|
||||
@ -92,6 +98,7 @@ class NetworkNodeTransport : public Transport {
|
||||
EmulatedNetworkNode* send_net_ RTC_GUARDED_BY(crit_sect_) = nullptr;
|
||||
rtc::SocketAddress receiver_address_ RTC_GUARDED_BY(crit_sect_);
|
||||
DataSize packet_overhead_ RTC_GUARDED_BY(crit_sect_) = DataSize::Zero();
|
||||
rtc::NetworkRoute current_network_route_ RTC_GUARDED_BY(crit_sect_);
|
||||
};
|
||||
|
||||
// CrossTrafficSource is created by a Scenario and generates cross traffic. It
|
||||
|
||||
@ -78,6 +78,8 @@ Scenario::Scenario(
|
||||
Scenario::~Scenario() {
|
||||
if (start_time_.IsFinite())
|
||||
Stop();
|
||||
for (auto& call_client : clients_)
|
||||
call_client->transport_->Disconnect();
|
||||
}
|
||||
|
||||
ColumnPrinter Scenario::TimePrinter() {
|
||||
@ -198,21 +200,17 @@ SimulationNode* Scenario::CreateSimulationNode(
|
||||
|
||||
SimulationNode* Scenario::CreateSimulationNode(NetworkNodeConfig config) {
|
||||
RTC_DCHECK(config.mode == NetworkNodeConfig::TrafficMode::kSimulation);
|
||||
auto network_node = SimulationNode::Create(config);
|
||||
auto network_node = SimulationNode::Create(clock_, &task_queue_, config);
|
||||
SimulationNode* sim_node = network_node.get();
|
||||
network_nodes_.emplace_back(std::move(network_node));
|
||||
Every(config.update_frequency,
|
||||
[this, sim_node] { sim_node->Process(Now()); });
|
||||
return sim_node;
|
||||
}
|
||||
|
||||
EmulatedNetworkNode* Scenario::CreateNetworkNode(
|
||||
std::unique_ptr<NetworkBehaviorInterface> behavior) {
|
||||
network_nodes_.emplace_back(new EmulatedNetworkNode(std::move(behavior)));
|
||||
network_nodes_.emplace_back(
|
||||
new EmulatedNetworkNode(clock_, &task_queue_, std::move(behavior)));
|
||||
EmulatedNetworkNode* network_node = network_nodes_.back().get();
|
||||
// TODO(srte): Use the update interval as provided by |behavior|.
|
||||
Every(TimeDelta::ms(5),
|
||||
[this, network_node] { network_node->Process(Now()); });
|
||||
return network_node;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user