From fe7d091f57707efdb43f9c83470203369e59cfea Mon Sep 17 00:00:00 2001 From: Taylor Brandstetter Date: Thu, 15 Sep 2016 17:47:42 -0700 Subject: [PATCH] Fixing a couple cases that cause ProcessAllMessageQueues to hang. The two situations are: 1. A thread is in the process of shutting down, so it won't handle any more messages. 2. A message queue is cleared before it has a chance to process pending messages. In both of those cases, we should consider processing done at that point. R=honghaiz@webrtc.org, pthatcher@webrtc.org Review URL: https://codereview.webrtc.org/2319303004 . Cr-Commit-Position: refs/heads/master@{#14245} --- webrtc/base/messagequeue.cc | 33 +++++++++--- webrtc/base/messagequeue_unittest.cc | 75 ++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 8 deletions(-) diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc index ebf98f58a3..503d5aff96 100644 --- a/webrtc/base/messagequeue.cc +++ b/webrtc/base/messagequeue.cc @@ -126,17 +126,34 @@ void MessageQueueManager::ProcessAllMessageQueues() { } void MessageQueueManager::ProcessAllMessageQueuesInternal() { - // Post a delayed message at the current time and wait for it to be dispatched - // on all queues, which will ensure that all messages that came before it were - // also dispatched. - volatile int queues_not_done; - auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); }; - FunctorMessageHandler handler(functor); + // This works by posting a delayed message at the current time and waiting + // for it to be dispatched on all queues, which will ensure that all messages + // that came before it were also dispatched. + volatile int queues_not_done = 0; + + // This class is used so that whether the posted message is processed, or the + // message queue is simply cleared, queues_not_done gets decremented. + class ScopedIncrement : public MessageData { + public: + ScopedIncrement(volatile int* value) : value_(value) { + AtomicOps::Increment(value_); + } + ~ScopedIncrement() override { AtomicOps::Decrement(value_); } + + private: + volatile int* value_; + }; + { DebugNonReentrantCritScope cs(&crit_, &locked_); - queues_not_done = static_cast(message_queues_.size()); for (MessageQueue* queue : message_queues_) { - queue->PostDelayed(RTC_FROM_HERE, 0, &handler); + if (queue->IsQuitting()) { + // If the queue is quitting, it's done processing messages so it can + // be ignored. If we tried to post a message to it, it would be dropped. + continue; + } + queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, + new ScopedIncrement(&queues_not_done)); } } // Note: One of the message queues may have been on this thread, which is why diff --git a/webrtc/base/messagequeue_unittest.cc b/webrtc/base/messagequeue_unittest.cc index fc3a8f781d..8834ab57ef 100644 --- a/webrtc/base/messagequeue_unittest.cc +++ b/webrtc/base/messagequeue_unittest.cc @@ -10,7 +10,11 @@ #include "webrtc/base/messagequeue.h" +#include + +#include "webrtc/base/atomicops.h" #include "webrtc/base/bind.h" +#include "webrtc/base/event.h" #include "webrtc/base/gunit.h" #include "webrtc/base/logging.h" #include "webrtc/base/thread.h" @@ -140,3 +144,74 @@ TEST(MessageQueueManager, Clear) { EXPECT_TRUE(deleted); EXPECT_FALSE(MessageQueueManager::IsInitialized()); } + +// Ensure that ProcessAllMessageQueues does its essential function; process +// all messages (both delayed and non delayed) up until the current time, on +// all registered message queues. +TEST(MessageQueueManager, ProcessAllMessageQueues) { + Event entered_process_all_message_queues(true, false); + Thread a; + Thread b; + a.Start(); + b.Start(); + + volatile int messages_processed = 0; + FunctorMessageHandler> incrementer( + [&messages_processed, &entered_process_all_message_queues] { + // Wait for event as a means to ensure Increment doesn't occur outside + // of ProcessAllMessageQueues. The event is set by a message posted to + // the main thread, which is guaranteed to be handled inside + // ProcessAllMessageQueues. + entered_process_all_message_queues.Wait(Event::kForever); + AtomicOps::Increment(&messages_processed); + }); + FunctorMessageHandler> event_signaler( + [&entered_process_all_message_queues] { + entered_process_all_message_queues.Set(); + }); + + // Post messages (both delayed and non delayed) to both threads. + a.Post(RTC_FROM_HERE, &incrementer); + b.Post(RTC_FROM_HERE, &incrementer); + a.PostDelayed(RTC_FROM_HERE, 0, &incrementer); + b.PostDelayed(RTC_FROM_HERE, 0, &incrementer); + rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler); + + MessageQueueManager::ProcessAllMessageQueues(); + EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed)); +} + +// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting. +TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) { + Thread t; + t.Start(); + t.Quit(); + MessageQueueManager::ProcessAllMessageQueues(); +} + +// Test that ProcessAllMessageQueues doesn't hang if a queue clears its +// messages. +TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) { + Event entered_process_all_message_queues(true, false); + Thread t; + t.Start(); + + FunctorMessageHandler> clearer( + [&entered_process_all_message_queues] { + // Wait for event as a means to ensure Clear doesn't occur outside of + // ProcessAllMessageQueues. The event is set by a message posted to the + // main thread, which is guaranteed to be handled inside + // ProcessAllMessageQueues. + entered_process_all_message_queues.Wait(Event::kForever); + rtc::Thread::Current()->Clear(nullptr); + }); + FunctorMessageHandler> event_signaler( + [&entered_process_all_message_queues] { + entered_process_all_message_queues.Set(); + }); + + // Post messages (both delayed and non delayed) to both threads. + t.Post(RTC_FROM_HERE, &clearer); + rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler); + MessageQueueManager::ProcessAllMessageQueues(); +}