From 4d715385e11161ff002b44d582b7d4cc274883a3 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Fri, 19 Aug 2022 10:28:40 +0200 Subject: [PATCH] Remove rtc::MessageHandler usage in pseudo tcp unittests Bug: webrtc:11988 Change-Id: Iac41f18410828333b40012d4876db23673d198d8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272283 Commit-Queue: Danil Chapovalov Reviewed-by: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#37834} --- p2p/BUILD.gn | 3 +- p2p/base/pseudo_tcp_unittest.cc | 120 ++++++++++++-------------------- 2 files changed, 48 insertions(+), 75 deletions(-) diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn index d7b099d0c7..7c78774b14 100644 --- a/p2p/BUILD.gn +++ b/p2p/BUILD.gn @@ -271,6 +271,8 @@ if (rtc_include_tests) { "../api:mock_async_dns_resolver", "../api:packet_socket_factory", "../api:scoped_refptr", + "../api/task_queue", + "../api/task_queue:pending_task_safety_flag", "../api/transport:stun_types", "../api/units:time_delta", "../rtc_base", @@ -280,7 +282,6 @@ if (rtc_include_tests) { "../rtc_base:copy_on_write_buffer", "../rtc_base:gunit_helpers", "../rtc_base:ip_address", - "../rtc_base:location", "../rtc_base:logging", "../rtc_base:macromagic", "../rtc_base:net_helpers", diff --git a/p2p/base/pseudo_tcp_unittest.cc b/p2p/base/pseudo_tcp_unittest.cc index 25a7397516..debddb217e 100644 --- a/p2p/base/pseudo_tcp_unittest.cc +++ b/p2p/base/pseudo_tcp_unittest.cc @@ -15,19 +15,23 @@ #include #include #include +#include #include +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" #include "rtc_base/gunit.h" #include "rtc_base/helpers.h" -#include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/memory_stream.h" -#include "rtc_base/message_handler.h" -#include "rtc_base/thread.h" #include "rtc_base/time_utils.h" #include "test/gtest.h" -using cricket::PseudoTcp; +using ::cricket::PseudoTcp; +using ::webrtc::ScopedTaskSafety; +using ::webrtc::TaskQueueBase; +using ::webrtc::TimeDelta; static const int kConnectTimeoutMs = 10000; // ~3 * default RTO of 3000ms static const int kTransferTimeoutMs = 15000; @@ -44,7 +48,6 @@ class PseudoTcpForTest : public cricket::PseudoTcp { }; class PseudoTcpTestBase : public ::testing::Test, - public rtc::MessageHandlerAutoCleanup, public cricket::IPseudoTcpNotify { public: PseudoTcpTestBase() @@ -121,14 +124,6 @@ class PseudoTcpTestBase : public ::testing::Test, UpdateLocalClock(); } - enum { - MSG_LPACKET, - MSG_RPACKET, - MSG_LCLOCK, - MSG_RCLOCK, - MSG_IOCOMPLETE, - MSG_WRITE - }; virtual void OnTcpOpen(PseudoTcp* tcp) { // Consider ourselves connected when the local side gets OnTcpOpen. // OnTcpWriteable isn't fired at open, so we trigger it now. @@ -173,54 +168,48 @@ class PseudoTcpTestBase : public ::testing::Test, << len; return WR_SUCCESS; } - int id = (tcp == &local_) ? MSG_RPACKET : MSG_LPACKET; + PseudoTcp* other; + ScopedTaskSafety* timer; + if (tcp == &local_) { + other = &remote_; + timer = &remote_timer_; + } else { + other = &local_; + timer = &local_timer_; + } std::string packet(buffer, len); - rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, delay_, this, id, - rtc::WrapMessageData(packet)); + ++packets_in_flight_; + TaskQueueBase::Current()->PostDelayedTask( + [other, timer, packet = std::move(packet), this] { + --packets_in_flight_; + other->NotifyPacket(packet.c_str(), packet.size()); + UpdateClock(*other, *timer); + }, + TimeDelta::Millis(delay_)); return WR_SUCCESS; } - void UpdateLocalClock() { UpdateClock(&local_, MSG_LCLOCK); } - void UpdateRemoteClock() { UpdateClock(&remote_, MSG_RCLOCK); } - void UpdateClock(PseudoTcp* tcp, uint32_t message) { + void UpdateLocalClock() { UpdateClock(local_, local_timer_); } + void UpdateRemoteClock() { UpdateClock(remote_, remote_timer_); } + static void UpdateClock(PseudoTcp& tcp, ScopedTaskSafety& timer) { long interval = 0; // NOLINT - tcp->GetNextClock(PseudoTcp::Now(), interval); + tcp.GetNextClock(PseudoTcp::Now(), interval); interval = std::max(interval, 0L); // sometimes interval is < 0 - rtc::Thread::Current()->Clear(this, message); - rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, interval, this, message); - } - - virtual void OnMessage(rtc::Message* message) { - switch (message->message_id) { - case MSG_LPACKET: { - const std::string& s(rtc::UseMessageData(message->pdata)); - local_.NotifyPacket(s.c_str(), s.size()); - UpdateLocalClock(); - break; - } - case MSG_RPACKET: { - const std::string& s(rtc::UseMessageData(message->pdata)); - remote_.NotifyPacket(s.c_str(), s.size()); - UpdateRemoteClock(); - break; - } - case MSG_LCLOCK: - local_.NotifyClock(PseudoTcp::Now()); - UpdateLocalClock(); - break; - case MSG_RCLOCK: - remote_.NotifyClock(PseudoTcp::Now()); - UpdateRemoteClock(); - break; - default: - break; - } - delete message->pdata; + timer.reset(); + TaskQueueBase::Current()->PostDelayedTask( + SafeTask(timer.flag(), + [&tcp, &timer] { + tcp.NotifyClock(PseudoTcp::Now()); + UpdateClock(tcp, timer); + }), + TimeDelta::Millis(interval)); } rtc::AutoThread main_thread_; PseudoTcpForTest local_; PseudoTcpForTest remote_; + ScopedTaskSafety local_timer_; + ScopedTaskSafety remote_timer_; rtc::MemoryStream send_stream_; rtc::MemoryStream recv_stream_; bool have_connected_; @@ -231,6 +220,7 @@ class PseudoTcpTestBase : public ::testing::Test, int loss_; bool drop_next_packet_ = false; bool simultaneous_open_ = false; + int packets_in_flight_ = 0; }; class PseudoTcpTest : public PseudoTcpTestBase { @@ -480,7 +470,7 @@ class PseudoTcpTestReceiveWindow : public PseudoTcpTestBase { EXPECT_EQ(0, Connect()); EXPECT_TRUE_WAIT(have_connected_, kConnectTimeoutMs); - rtc::Thread::Current()->Post(RTC_FROM_HERE, this, MSG_WRITE); + TaskQueueBase::Current()->PostTask([this] { WriteData(); }); EXPECT_TRUE_WAIT(have_disconnected_, kTransferTimeoutMs); ASSERT_EQ(2u, send_position_.size()); @@ -498,20 +488,6 @@ class PseudoTcpTestReceiveWindow : public PseudoTcpTestBase { EXPECT_EQ(2 * estimated_recv_window, recv_position_[1]); } - virtual void OnMessage(rtc::Message* message) { - int message_id = message->message_id; - PseudoTcpTestBase::OnMessage(message); - - switch (message_id) { - case MSG_WRITE: { - WriteData(); - break; - } - default: - break; - } - } - uint32_t EstimateReceiveWindowSize() const { return static_cast(recv_position_[0]); } @@ -575,15 +551,11 @@ class PseudoTcpTestReceiveWindow : public PseudoTcpTestBase { } while (sent > 0); // At this point, we've filled up the available space in the send queue. - int message_queue_size = static_cast(rtc::Thread::Current()->size()); - // The message queue will always have at least 2 messages, an RCLOCK and - // an LCLOCK, since they are added back on the delay queue at the same time - // they are pulled off and therefore are never really removed. - if (message_queue_size > 2) { - // If there are non-clock messages remaining, attempt to continue sending - // after giving those messages time to process, which should free up the - // send buffer. - rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, 10, this, MSG_WRITE); + if (packets_in_flight_ > 0) { + // If there are packet tasks, attempt to continue sending after giving + // those packets time to process, which should free up the send buffer. + rtc::Thread::Current()->PostDelayedTask([this] { WriteData(); }, + TimeDelta::Millis(10)); } else { if (!remote_.isReceiveBufferFull()) { RTC_LOG(LS_ERROR) << "This shouldn't happen - the send buffer is full, "