In virtual socket unittests replace MessageHandler with RepeatingTask

MessageHandlerAutoCleanup class is marked for removal

Bug: webrtc:11988
Change-Id: I44646b53e5a9520eb1d6c314b7bd580b1bdc0078
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/273940
Auto-Submit: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37997}
This commit is contained in:
Danil Chapovalov 2022-09-02 15:40:26 +02:00 committed by WebRTC LUCI CQ
parent 16242931e3
commit f136165c54
2 changed files with 45 additions and 57 deletions

View File

@ -1627,6 +1627,7 @@ if (rtc_include_tests) {
"containers:unittests", "containers:unittests",
"memory:unittests", "memory:unittests",
"synchronization:mutex", "synchronization:mutex",
"task_utils:repeating_task",
"third_party/base64", "third_party/base64",
"third_party/sigslot", "third_party/sigslot",
] ]

View File

@ -22,17 +22,17 @@
#include <utility> #include <utility>
#include "absl/memory/memory.h" #include "absl/memory/memory.h"
#include "api/units/time_delta.h"
#include "rtc_base/arraysize.h" #include "rtc_base/arraysize.h"
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
#include "rtc_base/async_udp_socket.h" #include "rtc_base/async_udp_socket.h"
#include "rtc_base/fake_clock.h" #include "rtc_base/fake_clock.h"
#include "rtc_base/gunit.h" #include "rtc_base/gunit.h"
#include "rtc_base/ip_address.h" #include "rtc_base/ip_address.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/message_handler.h"
#include "rtc_base/socket.h" #include "rtc_base/socket.h"
#include "rtc_base/socket_address.h" #include "rtc_base/socket_address.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/test_client.h" #include "rtc_base/test_client.h"
#include "rtc_base/test_utils.h" #include "rtc_base/test_utils.h"
#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/third_party/sigslot/sigslot.h"
@ -44,77 +44,77 @@
namespace rtc { namespace rtc {
namespace { namespace {
using webrtc::testing::SSE_CLOSE; using ::webrtc::RepeatingTaskHandle;
using webrtc::testing::SSE_ERROR; using ::webrtc::TimeDelta;
using webrtc::testing::SSE_OPEN; using ::webrtc::testing::SSE_CLOSE;
using webrtc::testing::SSE_READ; using ::webrtc::testing::SSE_ERROR;
using webrtc::testing::SSE_WRITE; using ::webrtc::testing::SSE_OPEN;
using webrtc::testing::StreamSink; using ::webrtc::testing::SSE_READ;
using ::webrtc::testing::SSE_WRITE;
using ::webrtc::testing::StreamSink;
// Sends at a constant rate but with random packet sizes. // Sends at a constant rate but with random packet sizes.
struct Sender : public MessageHandlerAutoCleanup { struct Sender {
Sender(Thread* th, Socket* s, uint32_t rt) Sender(Thread* th, Socket* s, uint32_t rt)
: thread(th), : thread(th),
socket(std::make_unique<AsyncUDPSocket>(s)), socket(std::make_unique<AsyncUDPSocket>(s)),
done(false),
rate(rt), rate(rt),
count(0) { count(0) {
last_send = rtc::TimeMillis(); last_send = rtc::TimeMillis();
thread->PostDelayed(RTC_FROM_HERE, NextDelay(), this, 1);
}
uint32_t NextDelay() {
uint32_t size = (rand() % 4096) + 1;
return 1000 * size / rate;
}
void OnMessage(Message* pmsg) override {
ASSERT_EQ(1u, pmsg->message_id);
if (done)
return;
periodic = RepeatingTaskHandle::DelayedStart(thread, NextDelay(), [this] {
int64_t cur_time = rtc::TimeMillis(); int64_t cur_time = rtc::TimeMillis();
int64_t delay = cur_time - last_send; int64_t delay = cur_time - last_send;
uint32_t size = static_cast<uint32_t>(rate * delay / 1000); uint32_t size =
size = std::min<uint32_t>(size, 4096); std::clamp<uint32_t>(rate * delay / 1000, sizeof(uint32_t), 4096);
size = std::max<uint32_t>(size, sizeof(uint32_t));
count += size; count += size;
memcpy(dummy, &cur_time, sizeof(cur_time)); memcpy(dummy, &cur_time, sizeof(cur_time));
socket->Send(dummy, size, options); socket->Send(dummy, size, options);
last_send = cur_time; last_send = cur_time;
thread->PostDelayed(RTC_FROM_HERE, NextDelay(), this, 1); return NextDelay();
});
}
TimeDelta NextDelay() {
int size = (rand() % 4096) + 1;
return TimeDelta::Seconds(1) * size / rate;
} }
Thread* thread; Thread* thread;
std::unique_ptr<AsyncUDPSocket> socket; std::unique_ptr<AsyncUDPSocket> socket;
rtc::PacketOptions options; rtc::PacketOptions options;
bool done; RepeatingTaskHandle periodic;
uint32_t rate; // bytes per second uint32_t rate; // bytes per second
uint32_t count; uint32_t count;
int64_t last_send; int64_t last_send;
char dummy[4096]; char dummy[4096];
}; };
struct Receiver : public MessageHandlerAutoCleanup, struct Receiver : public sigslot::has_slots<> {
public sigslot::has_slots<> {
Receiver(Thread* th, Socket* s, uint32_t bw) Receiver(Thread* th, Socket* s, uint32_t bw)
: thread(th), : thread(th),
socket(std::make_unique<AsyncUDPSocket>(s)), socket(std::make_unique<AsyncUDPSocket>(s)),
bandwidth(bw), bandwidth(bw),
done(false),
count(0), count(0),
sec_count(0), sec_count(0),
sum(0), sum(0),
sum_sq(0), sum_sq(0),
samples(0) { samples(0) {
socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket); socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
thread->PostDelayed(RTC_FROM_HERE, 1000, this, 1); periodic = RepeatingTaskHandle::DelayedStart(
thread, TimeDelta::Seconds(1), [this] {
// It is always possible for us to receive more than expected because
// packets can be further delayed in delivery.
if (bandwidth > 0) {
EXPECT_LE(sec_count, 5 * bandwidth / 4);
}
sec_count = 0;
return TimeDelta::Seconds(1);
});
} }
~Receiver() override { thread->Clear(this); } ~Receiver() override { periodic.Stop(); }
void OnReadPacket(AsyncPacketSocket* s, void OnReadPacket(AsyncPacketSocket* s,
const char* data, const char* data,
@ -135,24 +135,10 @@ struct Receiver : public MessageHandlerAutoCleanup,
samples += 1; samples += 1;
} }
void OnMessage(Message* pmsg) override {
ASSERT_EQ(1u, pmsg->message_id);
if (done)
return;
// It is always possible for us to receive more than expected because
// packets can be further delayed in delivery.
if (bandwidth > 0)
ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
sec_count = 0;
thread->PostDelayed(RTC_FROM_HERE, 1000, this, 1);
}
Thread* thread; Thread* thread;
std::unique_ptr<AsyncUDPSocket> socket; std::unique_ptr<AsyncUDPSocket> socket;
uint32_t bandwidth; uint32_t bandwidth;
bool done; RepeatingTaskHandle periodic;
size_t count; size_t count;
size_t sec_count; size_t sec_count;
double sum; double sum;
@ -705,7 +691,7 @@ class VirtualSocketServerTest : public ::testing::Test {
// Allow the sender to run for 5 (simulated) seconds, then be stopped for 5 // Allow the sender to run for 5 (simulated) seconds, then be stopped for 5
// seconds. // seconds.
SIMULATED_WAIT(false, 5000, fake_clock_); SIMULATED_WAIT(false, 5000, fake_clock_);
sender.done = true; sender.periodic.Stop();
SIMULATED_WAIT(false, 5000, fake_clock_); SIMULATED_WAIT(false, 5000, fake_clock_);
// Ensure the observed bandwidth fell within a reasonable margin of error. // Ensure the observed bandwidth fell within a reasonable margin of error.
@ -747,7 +733,8 @@ class VirtualSocketServerTest : public ::testing::Test {
// Simulate 10 seconds of packets being sent, then check the observed delay // Simulate 10 seconds of packets being sent, then check the observed delay
// distribution. // distribution.
SIMULATED_WAIT(false, 10000, fake_clock_); SIMULATED_WAIT(false, 10000, fake_clock_);
sender.done = receiver.done = true; sender.periodic.Stop();
receiver.periodic.Stop();
ss_.ProcessMessagesUntilIdle(); ss_.ProcessMessagesUntilIdle();
const double sample_mean = receiver.sum / receiver.samples; const double sample_mean = receiver.sum / receiver.samples;