In rtc::Thread hide MessageHandler handling as implementation details

Remote Peek function as unused
Move Get and Dispatch into private section to ensure they are not used
from outside.

Bug: webrtc:9702
Change-Id: Ibd0b236fe43543d60f97f988524526493bbeaaa7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272804
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37889}
This commit is contained in:
Danil Chapovalov 2022-08-24 12:19:46 +02:00 committed by WebRTC LUCI CQ
parent 83db78e854
commit 207f8536b2
4 changed files with 46 additions and 68 deletions

View File

@ -380,8 +380,7 @@ Thread::Thread(std::unique_ptr<SocketServer> ss)
: Thread(std::move(ss), /*do_init=*/true) {}
Thread::Thread(SocketServer* ss, bool do_init)
: fPeekKeep_(false),
delayed_next_num_(0),
: delayed_next_num_(0),
fInitialized_(false),
fDestroyed_(false),
stop_(0),
@ -450,28 +449,7 @@ void Thread::Restart() {
stop_.store(0, std::memory_order_release);
}
bool Thread::Peek(Message* pmsg, int cmsWait) {
if (fPeekKeep_) {
*pmsg = msgPeek_;
return true;
}
if (!Get(pmsg, cmsWait))
return false;
msgPeek_ = *pmsg;
fPeekKeep_ = true;
return true;
}
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
// Return and clear peek if present
// Always return the peek if it exists so there is Peek/Get symmetry
if (fPeekKeep_) {
*pmsg = msgPeek_;
fPeekKeep_ = false;
return true;
}
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
int64_t cmsTotal = cmsWait;
@ -650,17 +628,6 @@ int Thread::GetDelay() {
void Thread::ClearInternal(MessageHandler* phandler,
uint32_t id,
MessageList* removed) {
// Remove messages with phandler
if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
if (removed) {
removed->push_back(msgPeek_);
} else {
delete msgPeek_.pdata;
}
fPeekKeep_ = false;
}
// Remove from ordered message queue
for (auto it = messages_.begin(); it != messages_.end();) {

View File

@ -268,14 +268,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Processed. Normally, this would be true until IsQuitting() is true.
virtual bool IsProcessingMessagesForTesting();
// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
// 3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,
int cmsWait = kForever,
bool process_io = true);
virtual bool Peek(Message* pmsg, int cmsWait = 0);
// `time_sensitive` is deprecated and should always be false.
virtual void Post(const Location& posted_from,
MessageHandler* phandler,
@ -295,7 +287,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
virtual void Clear(MessageHandler* phandler,
uint32_t id = MQID_ANY,
MessageList* removed = nullptr);
virtual void Dispatch(Message* pmsg);
// Amount of time until the next message can be retrieved
virtual int GetDelay();
@ -303,7 +294,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
bool empty() const { return size() == 0u; }
size_t size() const {
CritScope cs(&crit_);
return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u);
return messages_.size() + delayed_messages_.size();
}
// Internally posts a message which causes the doomed object to be deleted
@ -522,6 +513,21 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
private:
static const int kSlowDispatchLoggingThreshold = 50; // 50 ms
// TODO(bugs.webrtc.org/9702): Delete when chromium stops overriding it.
// chromium's ThreadWrapper overrides it just to check it is never called.
virtual bool Peek(Message* pmsg, int cms_wait) {
RTC_DCHECK_NOTREACHED();
return false;
}
// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
// 3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,
int cmsWait = kForever,
bool process_io = true);
virtual void Dispatch(Message* pmsg);
// Sets the per-thread allow-blocking-calls flag and returns the previous
// value. Must be called on this thread.
bool SetAllowBlockingCalls(bool allow);
@ -552,8 +558,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Called by the ThreadManager when being unset as the current thread.
void ClearCurrentTaskQueue();
bool fPeekKeep_;
Message msgPeek_;
MessageList messages_ RTC_GUARDED_BY(crit_);
PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);

View File

@ -20,6 +20,7 @@
#include "rtc_base/async_udp_socket.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/gunit.h"
#include "rtc_base/internal/default_socket_server.h"
#include "rtc_base/null_socket_server.h"
@ -27,6 +28,7 @@
#include "rtc_base/socket_address.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "test/gmock.h"
#include "test/testsupport/rtc_expect_death.h"
#if defined(WEBRTC_WIN)
@ -37,6 +39,7 @@
namespace rtc {
namespace {
using ::testing::ElementsAre;
using ::webrtc::TimeDelta;
// Generates a sequence of numbers (collaboratively).
@ -585,32 +588,39 @@ struct DeletedLockChecker {
bool* deleted;
};
static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) {
EXPECT_TRUE(q != nullptr);
static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
FakeClock& clock,
Thread& q) {
std::vector<int> run_order;
Event done;
int64_t now = TimeMillis();
q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);
q.PostDelayedTask([&] { run_order.push_back(3); }, TimeDelta::Millis(3));
q.PostDelayedTask([&] { run_order.push_back(0); }, TimeDelta::Millis(1));
q.PostDelayedTask([&] { run_order.push_back(1); }, TimeDelta::Millis(2));
q.PostDelayedTask([&] { run_order.push_back(4); }, TimeDelta::Millis(3));
q.PostDelayedTask([&] { run_order.push_back(2); }, TimeDelta::Millis(2));
q.PostDelayedTask([&] { done.Set(); }, TimeDelta::Millis(4));
// Validate time was frozen while tasks were posted.
RTC_DCHECK_EQ(TimeMillis(), now);
Message msg;
for (size_t i = 0; i < 5; ++i) {
memset(&msg, 0, sizeof(msg));
EXPECT_TRUE(q->Get(&msg, 0));
EXPECT_EQ(i, msg.message_id);
}
// Change time to make all tasks ready to run and wait for them.
clock.AdvanceTime(TimeDelta::Millis(4));
ASSERT_TRUE(done.Wait(TimeDelta::Seconds(1)));
EXPECT_FALSE(q->Get(&msg, 0)); // No more messages
EXPECT_THAT(run_order, ElementsAre(0, 1, 2, 3, 4));
}
TEST_F(ThreadQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
ScopedBaseFakeClock clock;
Thread q(CreateDefaultSocketServer(), true);
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
q.Start();
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q);
NullSocketServer nullss;
Thread q_nullss(&nullss, true);
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
q_nullss.Start();
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q_nullss);
}
TEST_F(ThreadQueueTest, DisposeNotLocked) {
@ -619,7 +629,7 @@ TEST_F(ThreadQueueTest, DisposeNotLocked) {
DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
Dispose(d);
Message msg;
EXPECT_FALSE(Get(&msg, 0));
ProcessMessages(0);
EXPECT_TRUE(deleted);
EXPECT_FALSE(was_locked);
}
@ -642,7 +652,7 @@ TEST_F(ThreadQueueTest, DiposeHandlerWithPostedMessagePending) {
// Now, post a message, which should *not* be returned by Get().
Post(RTC_FROM_HERE, handler, 1);
Message msg;
EXPECT_FALSE(Get(&msg, 0));
ProcessMessages(0);
EXPECT_TRUE(deleted);
}

View File

@ -647,10 +647,7 @@ bool VirtualSocketServer::ProcessMessagesUntilIdle() {
fake_clock_->AdvanceTime(webrtc::TimeDelta::Millis(1));
} else {
// Otherwise, run a normal message loop.
Message msg;
if (msg_queue_->Get(&msg, Thread::kForever)) {
msg_queue_->Dispatch(&msg);
}
msg_queue_->ProcessMessages(Thread::kForever);
}
}
stop_on_idle_ = false;