diff --git a/webrtc/modules/utility/interface/mock/mock_process_thread.h b/webrtc/modules/utility/interface/mock/mock_process_thread.h index a72218060d..fd108a8354 100644 --- a/webrtc/modules/utility/interface/mock/mock_process_thread.h +++ b/webrtc/modules/utility/interface/mock/mock_process_thread.h @@ -22,8 +22,16 @@ class MockProcessThread : public ProcessThread { MOCK_METHOD0(Start, void()); MOCK_METHOD0(Stop, void()); MOCK_METHOD1(WakeUp, void(Module* module)); + MOCK_METHOD1(PostTask, void(ProcessTask* task)); MOCK_METHOD1(RegisterModule, void(Module* module)); MOCK_METHOD1(DeRegisterModule, void(Module* module)); + + // MOCK_METHOD1 gets confused with mocking this method, so we work around it + // by overriding the method from the interface and forwarding the call to a + // mocked, simpler method. + void PostTask(rtc::scoped_ptr task) override { + PostTask(task.get()); + } }; } // namespace webrtc diff --git a/webrtc/modules/utility/interface/process_thread.h b/webrtc/modules/utility/interface/process_thread.h index e30325cd7a..0e84506f1a 100644 --- a/webrtc/modules/utility/interface/process_thread.h +++ b/webrtc/modules/utility/interface/process_thread.h @@ -17,6 +17,14 @@ namespace webrtc { class Module; +class ProcessTask { + public: + ProcessTask() {} + virtual ~ProcessTask() {} + + virtual void Run() = 0; +}; + class ProcessThread { public: virtual ~ProcessThread(); @@ -36,6 +44,14 @@ class ProcessThread { // Can be called on any thread. virtual void WakeUp(Module* module) = 0; + // Queues a task object to run on the worker thread. Ownership of the + // task object is transferred to the ProcessThread and the object will + // either be deleted after running on the worker thread, or on the + // construction thread of the ProcessThread instance, if the task did not + // get a chance to run (e.g. posting the task while shutting down or when + // the thread never runs). + virtual void PostTask(rtc::scoped_ptr task) = 0; + // Adds a module that will start to receive callbacks on the worker thread. // Can be called from any thread. virtual void RegisterModule(Module* module) = 0; diff --git a/webrtc/modules/utility/source/process_thread_impl.cc b/webrtc/modules/utility/source/process_thread_impl.cc index c408b2cc1f..8268fff802 100644 --- a/webrtc/modules/utility/source/process_thread_impl.cc +++ b/webrtc/modules/utility/source/process_thread_impl.cc @@ -51,6 +51,11 @@ ProcessThreadImpl::~ProcessThreadImpl() { DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(!thread_.get()); DCHECK(!stop_); + + while (!queue_.empty()) { + delete queue_.front(); + queue_.pop(); + } } void ProcessThreadImpl::Start() { @@ -102,6 +107,15 @@ void ProcessThreadImpl::WakeUp(Module* module) { wake_up_->Set(); } +void ProcessThreadImpl::PostTask(rtc::scoped_ptr task) { + // Allowed to be called on any thread. + { + rtc::CritScope lock(&lock_); + queue_.push(task.release()); + } + wake_up_->Set(); +} + void ProcessThreadImpl::RegisterModule(Module* module) { // Allowed to be called on any thread. DCHECK(module); @@ -155,6 +169,7 @@ bool ProcessThreadImpl::Run(void* obj) { bool ProcessThreadImpl::Process() { int64_t now = TickTime::MillisecondTimestamp(); int64_t next_checkpoint = now + (1000 * 60); + { rtc::CritScope lock(&lock_); if (stop_) @@ -180,6 +195,15 @@ bool ProcessThreadImpl::Process() { if (m.next_callback < next_checkpoint) next_checkpoint = m.next_callback; } + + while (!queue_.empty()) { + ProcessTask* task = queue_.front(); + queue_.pop(); + lock_.Leave(); + task->Run(); + delete task; + lock_.Enter(); + } } int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp(); diff --git a/webrtc/modules/utility/source/process_thread_impl.h b/webrtc/modules/utility/source/process_thread_impl.h index 0ba6f1ffb8..1fd2bf3adc 100644 --- a/webrtc/modules/utility/source/process_thread_impl.h +++ b/webrtc/modules/utility/source/process_thread_impl.h @@ -12,6 +12,7 @@ #define WEBRTC_MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_ #include +#include #include "webrtc/base/criticalsection.h" #include "webrtc/base/thread_checker.h" @@ -31,6 +32,7 @@ class ProcessThreadImpl : public ProcessThread { void Stop() override; void WakeUp(Module* module) override; + void PostTask(rtc::scoped_ptr task) override; void RegisterModule(Module* module) override; void DeRegisterModule(Module* module) override; @@ -64,13 +66,15 @@ class ProcessThreadImpl : public ProcessThread { // issues, but I haven't figured out what they are, if there are alignment // requirements for mutexes on Mac or if there's something else to it. // So be careful with changing the layout. - rtc::CriticalSection lock_; // Used to guard modules_ and stop_. + rtc::CriticalSection lock_; // Used to guard modules_, tasks_ and stop_. rtc::ThreadChecker thread_checker_; const rtc::scoped_ptr wake_up_; rtc::scoped_ptr thread_; ModuleList modules_; + // TODO(tommi): Support delayed tasks. + std::queue queue_; bool stop_; }; diff --git a/webrtc/modules/utility/source/process_thread_impl_unittest.cc b/webrtc/modules/utility/source/process_thread_impl_unittest.cc index 47af4d876a..cd1f956dd5 100644 --- a/webrtc/modules/utility/source/process_thread_impl_unittest.cc +++ b/webrtc/modules/utility/source/process_thread_impl_unittest.cc @@ -30,6 +30,15 @@ class MockModule : public Module { MOCK_METHOD1(ProcessThreadAttached, void(ProcessThread*)); }; +class RaiseEventTask : public ProcessTask { + public: + RaiseEventTask(EventWrapper* event) : event_(event) {} + void Run() override { event_->Set(); } + + private: + EventWrapper* event_; +}; + ACTION_P(SetEvent, event) { event->Set(); } @@ -280,4 +289,16 @@ TEST(ProcessThreadImpl, WakeUp) { EXPECT_LE(diff, 100u); } +// Tests that we can post a task that gets run straight away on the worker +// thread. +TEST(ProcessThreadImpl, PostTask) { + ProcessThreadImpl thread; + rtc::scoped_ptr task_ran(EventWrapper::Create()); + rtc::scoped_ptr task(new RaiseEventTask(task_ran.get())); + thread.Start(); + thread.PostTask(task.Pass()); + EXPECT_EQ(kEventSignaled, task_ran->Wait(100)); + thread.Stop(); +} + } // namespace webrtc