From 9a21c4933796926a8b745f4595db5d948d8fb9d0 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Thu, 25 Aug 2022 11:40:13 +0000 Subject: [PATCH] SocketServer: Migrate Wait/kForever to TimeDelta. Bug: webrtc:13756 Change-Id: Ie36ca38b1ab336742231b101ef7bb5ccf3735659 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272102 Commit-Queue: Markus Handell Reviewed-by: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#37903} --- examples/peerconnection/client/linux/main.cc | 5 ++-- rtc_base/BUILD.gn | 7 ++++- rtc_base/firewall_socket_server.cc | 5 ++-- rtc_base/firewall_socket_server.h | 2 +- rtc_base/nat_socket_factory.cc | 5 ++-- rtc_base/nat_socket_factory.h | 2 +- rtc_base/null_socket_server.cc | 8 ++--- rtc_base/null_socket_server.h | 2 +- rtc_base/null_socket_server_unittest.cc | 3 +- rtc_base/physical_socket_server.cc | 31 +++++++++++++------- rtc_base/physical_socket_server.h | 12 +++++--- rtc_base/socket_server.h | 11 ++++--- rtc_base/thread.cc | 8 +++-- rtc_base/virtual_socket_server.cc | 6 ++-- rtc_base/virtual_socket_server.h | 2 +- test/network/fake_network_socket_server.cc | 10 +++---- test/network/fake_network_socket_server.h | 2 +- test/run_loop.cc | 3 +- test/run_loop.h | 2 +- test/time_controller/simulated_thread.cc | 4 +-- 20 files changed, 80 insertions(+), 50 deletions(-) diff --git a/examples/peerconnection/client/linux/main.cc b/examples/peerconnection/client/linux/main.cc index 051eda3dd2..ad3d671073 100644 --- a/examples/peerconnection/client/linux/main.cc +++ b/examples/peerconnection/client/linux/main.cc @@ -36,7 +36,8 @@ class CustomSocketServer : public rtc::PhysicalSocketServer { void set_conductor(Conductor* conductor) { conductor_ = conductor; } // Override so that we can also pump the GTK message loop. - bool Wait(int cms, bool process_io) override { + // This function never waits. + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override { // Pump GTK events. // TODO(henrike): We really should move either the socket server or UI to a // different thread. Alternatively we could look at merging the two loops @@ -49,7 +50,7 @@ class CustomSocketServer : public rtc::PhysicalSocketServer { client_ != NULL && !client_->is_connected()) { message_queue_->Quit(); } - return rtc::PhysicalSocketServer::Wait(0 /*cms == -1 ? 1 : cms*/, + return rtc::PhysicalSocketServer::Wait(webrtc::TimeDelta::Zero(), process_io); } diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 311b9e01e3..9938be891e 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -912,7 +912,11 @@ rtc_library("null_socket_server") { rtc_source_set("socket_server") { sources = [ "socket_server.h" ] - deps = [ ":socket_factory" ] + deps = [ + ":rtc_event", + ":socket_factory", + "../api/units:time_delta", + ] } rtc_library("threading") { @@ -1504,6 +1508,7 @@ if (rtc_include_tests) { ":testclient", ":threading", ":timeutils", + "../api/units:time_delta", "../system_wrappers", "../test:fileutils", "../test:test_main", diff --git a/rtc_base/firewall_socket_server.cc b/rtc_base/firewall_socket_server.cc index edb0cd2398..db88d19a15 100644 --- a/rtc_base/firewall_socket_server.cc +++ b/rtc_base/firewall_socket_server.cc @@ -210,8 +210,9 @@ void FirewallSocketServer::SetMessageQueue(Thread* queue) { server_->SetMessageQueue(queue); } -bool FirewallSocketServer::Wait(int cms, bool process_io) { - return server_->Wait(cms, process_io); +bool FirewallSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { + return server_->Wait(max_wait_duration, process_io); } void FirewallSocketServer::WakeUp() { diff --git a/rtc_base/firewall_socket_server.h b/rtc_base/firewall_socket_server.h index 8a82f885c6..63f9e1ac6c 100644 --- a/rtc_base/firewall_socket_server.h +++ b/rtc_base/firewall_socket_server.h @@ -79,7 +79,7 @@ class FirewallSocketServer : public SocketServer { Socket* CreateSocket(int family, int type) override; void SetMessageQueue(Thread* queue) override; - bool Wait(int cms, bool process_io) override; + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override; void WakeUp() override; Socket* WrapSocket(Socket* sock, int type); diff --git a/rtc_base/nat_socket_factory.cc b/rtc_base/nat_socket_factory.cc index f6492a9305..fe021b95ff 100644 --- a/rtc_base/nat_socket_factory.cc +++ b/rtc_base/nat_socket_factory.cc @@ -384,8 +384,9 @@ void NATSocketServer::SetMessageQueue(Thread* queue) { server_->SetMessageQueue(queue); } -bool NATSocketServer::Wait(int cms, bool process_io) { - return server_->Wait(cms, process_io); +bool NATSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { + return server_->Wait(max_wait_duration, process_io); } void NATSocketServer::WakeUp() { diff --git a/rtc_base/nat_socket_factory.h b/rtc_base/nat_socket_factory.h index 70cb303def..0b301b5844 100644 --- a/rtc_base/nat_socket_factory.h +++ b/rtc_base/nat_socket_factory.h @@ -152,7 +152,7 @@ class NATSocketServer : public SocketServer, public NATInternalSocketFactory { Socket* CreateSocket(int family, int type) override; void SetMessageQueue(Thread* queue) override; - bool Wait(int cms, bool process_io) override; + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override; void WakeUp() override; // NATInternalSocketFactory implementation diff --git a/rtc_base/null_socket_server.cc b/rtc_base/null_socket_server.cc index 4f9e01d889..366349db3a 100644 --- a/rtc_base/null_socket_server.cc +++ b/rtc_base/null_socket_server.cc @@ -20,14 +20,12 @@ namespace rtc { NullSocketServer::NullSocketServer() = default; NullSocketServer::~NullSocketServer() {} -bool NullSocketServer::Wait(int cms, bool process_io) { +bool NullSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { // Wait with the given timeout. Do not log a warning if we end up waiting for // a long time; that just means no one has any work for us, which is perfectly // legitimate. - event_.Wait(/*give_up_after=*/cms == kForever - ? Event::kForever - : webrtc::TimeDelta::Millis(cms), - /*warn_after=*/Event::kForever); + event_.Wait(max_wait_duration, /*warn_after=*/Event::kForever); return true; } diff --git a/rtc_base/null_socket_server.h b/rtc_base/null_socket_server.h index 6d4ae848e5..87f49f436e 100644 --- a/rtc_base/null_socket_server.h +++ b/rtc_base/null_socket_server.h @@ -23,7 +23,7 @@ class RTC_EXPORT NullSocketServer : public SocketServer { NullSocketServer(); ~NullSocketServer() override; - bool Wait(int cms, bool process_io) override; + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override; void WakeUp() override; Socket* CreateSocket(int family, int type) override; diff --git a/rtc_base/null_socket_server_unittest.cc b/rtc_base/null_socket_server_unittest.cc index a875d6c284..70f7cf8ca3 100644 --- a/rtc_base/null_socket_server_unittest.cc +++ b/rtc_base/null_socket_server_unittest.cc @@ -14,6 +14,7 @@ #include +#include "api/units/time_delta.h" #include "rtc_base/gunit.h" #include "rtc_base/location.h" #include "rtc_base/message_handler.h" @@ -44,7 +45,7 @@ TEST_F(NullSocketServerTest, WaitAndSet) { TEST_F(NullSocketServerTest, TestWait) { int64_t start = TimeMillis(); - ss_.Wait(200, true); + ss_.Wait(webrtc::TimeDelta::Millis(200), true); // The actual wait time is dependent on the resolution of the timer used by // the Event class. Allow for the event to signal ~20ms early. EXPECT_GE(TimeSince(start), 180); diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc index 33ebb69e2d..7c01815d30 100644 --- a/rtc_base/physical_socket_server.cc +++ b/rtc_base/physical_socket_server.cc @@ -9,6 +9,8 @@ */ #include "rtc_base/physical_socket_server.h" +#include + #if defined(_MSC_VER) && _MSC_VER < 1300 #pragma warning(disable : 4786) #endif @@ -1164,12 +1166,20 @@ void PhysicalSocketServer::Update(Dispatcher* pdispatcher) { #endif } +int PhysicalSocketServer::ToCmsWait(webrtc::TimeDelta max_wait_duration) { + return max_wait_duration == Event::kForever + ? kForeverMs + : max_wait_duration.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms(); +} + #if defined(WEBRTC_POSIX) -bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { +bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { // We don't support reentrant waiting. RTC_DCHECK(!waiting_); ScopedSetTrue s(&waiting_); + const int cmsWait = ToCmsWait(max_wait_duration); #if defined(WEBRTC_USE_EPOLL) // We don't keep a dedicated "epoll" descriptor containing only the non-IO // (i.e. signaling) dispatcher, so "poll" will be used instead of the default @@ -1256,7 +1266,7 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { struct timeval* ptvWait = nullptr; struct timeval tvWait; int64_t stop_us; - if (cmsWait != kForever) { + if (cmsWait != kForeverMs) { // Calculate wait timeval tvWait.tv_sec = cmsWait / 1000; tvWait.tv_usec = (cmsWait % 1000) * 1000; @@ -1266,7 +1276,6 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { stop_us = rtc::TimeMicros() + cmsWait * 1000; } - fd_set fdsRead; fd_set fdsWrite; // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the @@ -1454,7 +1463,7 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) { RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); int64_t tvWait = -1; int64_t tvStop = -1; - if (cmsWait != kForever) { + if (cmsWait != kForeverMs) { tvWait = cmsWait; tvStop = TimeAfter(cmsWait); } @@ -1499,7 +1508,7 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) { } } - if (cmsWait != kForever) { + if (cmsWait != kForeverMs) { tvWait = TimeDiff(tvStop, TimeMillis()); if (tvWait <= 0) { // Return success on timeout. @@ -1515,7 +1524,7 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { RTC_DCHECK(dispatcher); int64_t tvWait = -1; int64_t tvStop = -1; - if (cmsWait != kForever) { + if (cmsWait != kForeverMs) { tvWait = cmsWait; tvStop = TimeAfter(cmsWait); } @@ -1566,7 +1575,7 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { ProcessEvents(dispatcher, readable, writable, error, error); } - if (cmsWait != kForever) { + if (cmsWait != kForeverMs) { tvWait = TimeDiff(tvStop, TimeMillis()); if (tvWait < 0) { // Return success on timeout. @@ -1583,11 +1592,13 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { #endif // WEBRTC_POSIX #if defined(WEBRTC_WIN) -bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { +bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { // We don't support reentrant waiting. RTC_DCHECK(!waiting_); ScopedSetTrue s(&waiting_); + int cmsWait = ToCmsWait(max_wait_duration); int64_t cmsTotal = cmsWait; int64_t cmsElapsed = 0; int64_t msStart = Time(); @@ -1634,7 +1645,7 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { // Which is shorter, the delay wait or the asked wait? int64_t cmsNext; - if (cmsWait == kForever) { + if (cmsWait == kForeverMs) { cmsNext = cmsWait; } else { cmsNext = std::max(0, cmsTotal - cmsElapsed); @@ -1750,7 +1761,7 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { if (!fWait_) break; cmsElapsed = TimeSince(msStart); - if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { + if ((cmsWait != kForeverMs) && (cmsElapsed >= cmsWait)) { break; } } diff --git a/rtc_base/physical_socket_server.h b/rtc_base/physical_socket_server.h index a01229d593..f97271f422 100644 --- a/rtc_base/physical_socket_server.h +++ b/rtc_base/physical_socket_server.h @@ -11,6 +11,7 @@ #ifndef RTC_BASE_PHYSICAL_SOCKET_SERVER_H_ #define RTC_BASE_PHYSICAL_SOCKET_SERVER_H_ +#include "api/units/time_delta.h" #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) #include #define WEBRTC_USE_EPOLL 1 @@ -74,7 +75,7 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer { virtual Socket* WrapSocket(SOCKET s); // SocketServer: - bool Wait(int cms, bool process_io) override; + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override; void WakeUp() override; void Add(Dispatcher* dispatcher); @@ -84,16 +85,19 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer { private: // The number of events to process with one call to "epoll_wait". static constexpr size_t kNumEpollEvents = 128; + // A local historical definition of "foreverness", in milliseconds. + static constexpr int kForeverMs = -1; + static int ToCmsWait(webrtc::TimeDelta max_wait_duration); #if defined(WEBRTC_POSIX) - bool WaitSelect(int cms, bool process_io); + bool WaitSelect(int cmsWait, bool process_io); #endif // WEBRTC_POSIX #if defined(WEBRTC_USE_EPOLL) void AddEpoll(Dispatcher* dispatcher, uint64_t key); void RemoveEpoll(Dispatcher* dispatcher); void UpdateEpoll(Dispatcher* dispatcher, uint64_t key); - bool WaitEpoll(int cms); - bool WaitPoll(int cms, Dispatcher* dispatcher); + bool WaitEpoll(int cmsWait); + bool WaitPoll(int cmsWait, Dispatcher* dispatcher); // This array is accessed in isolation by a thread calling into Wait(). // It's useless to use a SequenceChecker to guard it because a socket diff --git a/rtc_base/socket_server.h b/rtc_base/socket_server.h index face04dbc2..bf1326dad9 100644 --- a/rtc_base/socket_server.h +++ b/rtc_base/socket_server.h @@ -13,6 +13,8 @@ #include +#include "api/units/time_delta.h" +#include "rtc_base/event.h" #include "rtc_base/socket_factory.h" namespace rtc { @@ -30,7 +32,7 @@ class NetworkBinderInterface; // notified of asynchronous I/O from this server's Wait method. class SocketServer : public SocketFactory { public: - static const int kForever = -1; + static constexpr webrtc::TimeDelta kForever = rtc::Event::kForever; static std::unique_ptr CreateDefault(); // When the socket server is installed into a Thread, this function is called @@ -40,10 +42,11 @@ class SocketServer : public SocketFactory { virtual void SetMessageQueue(Thread* queue) {} // Sleeps until: - // 1) cms milliseconds have elapsed (unless cms == kForever) - // 2) WakeUp() is called + // 1) `max_wait_duration` has elapsed (unless `max_wait_duration` == + // `kForever`) + // 2) WakeUp() is called // While sleeping, I/O is performed if process_io is true. - virtual bool Wait(int cms, bool process_io) = 0; + virtual bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) = 0; // Causes the current wait (if one is in progress) to wake up. virtual void WakeUp() = 0; diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 5246cbe231..1a43008030 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -11,6 +11,8 @@ #include "rtc_base/thread.h" #include "absl/strings/string_view.h" +#include "api/units/time_delta.h" +#include "rtc_base/socket_server.h" #if defined(WEBRTC_WIN) #include @@ -492,7 +494,9 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { { // Wait and multiplex in the meantime - if (!ss_->Wait(static_cast(cmsNext), process_io)) + if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever + : webrtc::TimeDelta::Millis(cmsNext), + process_io)) return false; } @@ -912,7 +916,7 @@ void Thread::Send(const Location& posted_from, crit_.Enter(); while (!ready) { crit_.Leave(); - current_thread->socketserver()->Wait(kForever, false); + current_thread->socketserver()->Wait(SocketServer::kForever, false); waited = true; crit_.Enter(); } diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc index a055e59517..6e5eeb28c7 100644 --- a/rtc_base/virtual_socket_server.cc +++ b/rtc_base/virtual_socket_server.cc @@ -613,7 +613,8 @@ void VirtualSocketServer::SetMessageQueue(Thread* msg_queue) { msg_queue_ = msg_queue; } -bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { +bool VirtualSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { RTC_DCHECK_RUN_ON(msg_queue_); if (stop_on_idle_ && Thread::Current()->empty()) { return false; @@ -622,8 +623,7 @@ bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { // any real I/O. Received packets come in the form of queued messages, so // Thread will ensure WakeUp is called if another thread sends a // packet. - wakeup_.Wait(cmsWait == kForever ? Event::kForever - : webrtc::TimeDelta::Millis(cmsWait)); + wakeup_.Wait(max_wait_duration); return true; } diff --git a/rtc_base/virtual_socket_server.h b/rtc_base/virtual_socket_server.h index b172567937..eb9cfc1d3c 100644 --- a/rtc_base/virtual_socket_server.h +++ b/rtc_base/virtual_socket_server.h @@ -223,7 +223,7 @@ class VirtualSocketServer : public SocketServer { // SocketServer: void SetMessageQueue(Thread* queue) override; - bool Wait(int cms, bool process_io) override; + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override; void WakeUp() override; void SetDelayOnAddress(const rtc::SocketAddress& address, int delay_ms) { diff --git a/test/network/fake_network_socket_server.cc b/test/network/fake_network_socket_server.cc index a2e082404d..c94c4e372a 100644 --- a/test/network/fake_network_socket_server.cc +++ b/test/network/fake_network_socket_server.cc @@ -304,12 +304,12 @@ void FakeNetworkSocketServer::SetMessageQueue(rtc::Thread* thread) { } // Always returns true (if return false, it won't be invoked again...) -bool FakeNetworkSocketServer::Wait(int cms, bool process_io) { +bool FakeNetworkSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { RTC_DCHECK(thread_ == rtc::Thread::Current()); - if (cms != 0) { - wakeup_.Wait(cms == kForever ? rtc::Event::kForever - : TimeDelta::Millis(cms)); - } + if (!max_wait_duration.IsZero()) + wakeup_.Wait(max_wait_duration); + return true; } diff --git a/test/network/fake_network_socket_server.h b/test/network/fake_network_socket_server.h index 5b23a01eee..25c85d048a 100644 --- a/test/network/fake_network_socket_server.h +++ b/test/network/fake_network_socket_server.h @@ -40,7 +40,7 @@ class FakeNetworkSocketServer : public rtc::SocketServer { // Called by the network thread when this server is installed, kicking off the // message handler loop. void SetMessageQueue(rtc::Thread* thread) override; - bool Wait(int cms, bool process_io) override; + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override; void WakeUp() override; protected: diff --git a/test/run_loop.cc b/test/run_loop.cc index d54b4a4137..7cc80ab481 100644 --- a/test/run_loop.cc +++ b/test/run_loop.cc @@ -51,7 +51,8 @@ void RunLoop::FakeSocketServer::FailNextWait() { fail_next_wait_ = true; } -bool RunLoop::FakeSocketServer::Wait(int cms, bool process_io) { +bool RunLoop::FakeSocketServer::Wait(webrtc::TimeDelta max_wait_duration, + bool process_io) { if (fail_next_wait_) { fail_next_wait_ = false; return false; diff --git a/test/run_loop.h b/test/run_loop.h index 1987a05785..8a2bf54402 100644 --- a/test/run_loop.h +++ b/test/run_loop.h @@ -47,7 +47,7 @@ class RunLoop { void FailNextWait(); private: - bool Wait(int cms, bool process_io) override; + bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override; void WakeUp() override; rtc::Socket* CreateSocket(int family, int type) override; diff --git a/test/time_controller/simulated_thread.cc b/test/time_controller/simulated_thread.cc index 54493285fc..dc09783037 100644 --- a/test/time_controller/simulated_thread.cc +++ b/test/time_controller/simulated_thread.cc @@ -24,8 +24,8 @@ class DummySocketServer : public rtc::SocketServer { RTC_DCHECK_NOTREACHED(); return nullptr; } - bool Wait(int cms, bool process_io) override { - RTC_CHECK_EQ(cms, 0); + bool Wait(TimeDelta max_wait_duration, bool process_io) override { + RTC_CHECK(max_wait_duration.IsZero()); return true; } void WakeUp() override {}