Fixing a couple cases that cause ProcessAllMessageQueues to hang.
The two situations are: 1. A thread is in the process of shutting down, so it won't handle any more messages. 2. A message queue is cleared before it has a chance to process pending messages. In both of those cases, we should consider processing done at that point. R=honghaiz@webrtc.org, pthatcher@webrtc.org Review URL: https://codereview.webrtc.org/2319303004 . Cr-Commit-Position: refs/heads/master@{#14245}
This commit is contained in:
parent
9ecb08576e
commit
fe7d091f57
@ -126,17 +126,34 @@ void MessageQueueManager::ProcessAllMessageQueues() {
|
||||
}
|
||||
|
||||
void MessageQueueManager::ProcessAllMessageQueuesInternal() {
|
||||
// Post a delayed message at the current time and wait for it to be dispatched
|
||||
// on all queues, which will ensure that all messages that came before it were
|
||||
// also dispatched.
|
||||
volatile int queues_not_done;
|
||||
auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); };
|
||||
FunctorMessageHandler<void, decltype(functor)> handler(functor);
|
||||
// This works by posting a delayed message at the current time and waiting
|
||||
// for it to be dispatched on all queues, which will ensure that all messages
|
||||
// that came before it were also dispatched.
|
||||
volatile int queues_not_done = 0;
|
||||
|
||||
// This class is used so that whether the posted message is processed, or the
|
||||
// message queue is simply cleared, queues_not_done gets decremented.
|
||||
class ScopedIncrement : public MessageData {
|
||||
public:
|
||||
ScopedIncrement(volatile int* value) : value_(value) {
|
||||
AtomicOps::Increment(value_);
|
||||
}
|
||||
~ScopedIncrement() override { AtomicOps::Decrement(value_); }
|
||||
|
||||
private:
|
||||
volatile int* value_;
|
||||
};
|
||||
|
||||
{
|
||||
DebugNonReentrantCritScope cs(&crit_, &locked_);
|
||||
queues_not_done = static_cast<int>(message_queues_.size());
|
||||
for (MessageQueue* queue : message_queues_) {
|
||||
queue->PostDelayed(RTC_FROM_HERE, 0, &handler);
|
||||
if (queue->IsQuitting()) {
|
||||
// If the queue is quitting, it's done processing messages so it can
|
||||
// be ignored. If we tried to post a message to it, it would be dropped.
|
||||
continue;
|
||||
}
|
||||
queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
|
||||
new ScopedIncrement(&queues_not_done));
|
||||
}
|
||||
}
|
||||
// Note: One of the message queues may have been on this thread, which is why
|
||||
|
||||
@ -10,7 +10,11 @@
|
||||
|
||||
#include "webrtc/base/messagequeue.h"
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include "webrtc/base/atomicops.h"
|
||||
#include "webrtc/base/bind.h"
|
||||
#include "webrtc/base/event.h"
|
||||
#include "webrtc/base/gunit.h"
|
||||
#include "webrtc/base/logging.h"
|
||||
#include "webrtc/base/thread.h"
|
||||
@ -140,3 +144,74 @@ TEST(MessageQueueManager, Clear) {
|
||||
EXPECT_TRUE(deleted);
|
||||
EXPECT_FALSE(MessageQueueManager::IsInitialized());
|
||||
}
|
||||
|
||||
// Ensure that ProcessAllMessageQueues does its essential function; process
|
||||
// all messages (both delayed and non delayed) up until the current time, on
|
||||
// all registered message queues.
|
||||
TEST(MessageQueueManager, ProcessAllMessageQueues) {
|
||||
Event entered_process_all_message_queues(true, false);
|
||||
Thread a;
|
||||
Thread b;
|
||||
a.Start();
|
||||
b.Start();
|
||||
|
||||
volatile int messages_processed = 0;
|
||||
FunctorMessageHandler<void, std::function<void()>> incrementer(
|
||||
[&messages_processed, &entered_process_all_message_queues] {
|
||||
// Wait for event as a means to ensure Increment doesn't occur outside
|
||||
// of ProcessAllMessageQueues. The event is set by a message posted to
|
||||
// the main thread, which is guaranteed to be handled inside
|
||||
// ProcessAllMessageQueues.
|
||||
entered_process_all_message_queues.Wait(Event::kForever);
|
||||
AtomicOps::Increment(&messages_processed);
|
||||
});
|
||||
FunctorMessageHandler<void, std::function<void()>> event_signaler(
|
||||
[&entered_process_all_message_queues] {
|
||||
entered_process_all_message_queues.Set();
|
||||
});
|
||||
|
||||
// Post messages (both delayed and non delayed) to both threads.
|
||||
a.Post(RTC_FROM_HERE, &incrementer);
|
||||
b.Post(RTC_FROM_HERE, &incrementer);
|
||||
a.PostDelayed(RTC_FROM_HERE, 0, &incrementer);
|
||||
b.PostDelayed(RTC_FROM_HERE, 0, &incrementer);
|
||||
rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
|
||||
|
||||
MessageQueueManager::ProcessAllMessageQueues();
|
||||
EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
|
||||
}
|
||||
|
||||
// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
|
||||
TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
|
||||
Thread t;
|
||||
t.Start();
|
||||
t.Quit();
|
||||
MessageQueueManager::ProcessAllMessageQueues();
|
||||
}
|
||||
|
||||
// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
|
||||
// messages.
|
||||
TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
|
||||
Event entered_process_all_message_queues(true, false);
|
||||
Thread t;
|
||||
t.Start();
|
||||
|
||||
FunctorMessageHandler<void, std::function<void()>> clearer(
|
||||
[&entered_process_all_message_queues] {
|
||||
// Wait for event as a means to ensure Clear doesn't occur outside of
|
||||
// ProcessAllMessageQueues. The event is set by a message posted to the
|
||||
// main thread, which is guaranteed to be handled inside
|
||||
// ProcessAllMessageQueues.
|
||||
entered_process_all_message_queues.Wait(Event::kForever);
|
||||
rtc::Thread::Current()->Clear(nullptr);
|
||||
});
|
||||
FunctorMessageHandler<void, std::function<void()>> event_signaler(
|
||||
[&entered_process_all_message_queues] {
|
||||
entered_process_all_message_queues.Set();
|
||||
});
|
||||
|
||||
// Post messages (both delayed and non delayed) to both threads.
|
||||
t.Post(RTC_FROM_HERE, &clearer);
|
||||
rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
|
||||
MessageQueueManager::ProcessAllMessageQueues();
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user