From 13bc8713af57b74b9c15b4f57756e9df672ed1d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Bostr=C3=B6m?= Date: Fri, 15 Feb 2019 10:14:22 +0100 Subject: [PATCH] PostMessageWithFunctor() added. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This function is used to post messages onto rtc::Threads. The thread invokes the functor without blocking the calling thread. Messages posted in this way are executed in the order that they were posted. This is meant to work as the equivalent of "thread->PostTask()" in Chromium. Note: AsyncInvoker currently does something similar but it is more cumbersome to use (somebody has to create it and own it and make sure not to destroy it while tasks are pending or else they're cancelled). It also comes with a fundamental flaw: You cannot destroy the AsyncInvoker from within the functor (this results in a neverending Wait). This makes the AsyncInvoker not suitable for implementing "destructor traits" amongst other things. This CL will allow us to easily add "PostTask()" to rtc::Thread or add support for DestructorTraits, which is especially useful when you have a reference counted object that is referenced from multiple threads but owns resources that has to be destroyed on a particular thread. Blocking invokes are forbidden in Chromium but WebRTC performs them frequently. Being able to perform the equivalent of PostTask() is a good thing. Bug: webrtc:10293 Change-Id: Ie2a612059a783f18ddf98cff6edb7fce447fb5be Reviewed-on: https://webrtc-review.googlesource.com/c/121408 Commit-Queue: Henrik Boström Reviewed-by: Karl Wiberg Reviewed-by: Harald Alvestrand Reviewed-by: Steve Anton Cr-Commit-Position: refs/heads/master@{#26704} --- BUILD.gn | 1 + rtc_base/BUILD.gn | 26 ++ rtc_base/post_message_with_functor.h | 77 ++++++ .../post_message_with_functor_unittest.cc | 229 ++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 rtc_base/post_message_with_functor.h create mode 100644 rtc_base/post_message_with_functor_unittest.cc diff --git a/BUILD.gn b/BUILD.gn index 0f68aacfd1..4c10d668ae 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -491,6 +491,7 @@ if (rtc_include_tests) { "rtc_base:rtc_base_unittests", "rtc_base:rtc_json_unittests", "rtc_base:rtc_numerics_unittests", + "rtc_base:rtc_post_message_with_functor_unittests", "rtc_base:rtc_task_queue_unittests", "rtc_base:sequenced_task_checker_unittests", "rtc_base:sigslot_unittest", diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index c152533469..9809a8ed5f 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -1063,6 +1063,17 @@ rtc_static_library("rtc_base") { } } +rtc_source_set("rtc_post_message_with_functor") { + sources = [ + "post_message_with_functor.h", + ] + deps = [ + ":checks", + ":rtc_base", + ":rtc_event", + ] +} + rtc_source_set("gtest_prod") { visibility = [ "*" ] sources = [ @@ -1398,6 +1409,21 @@ if (rtc_include_tests) { ] } + rtc_source_set("rtc_post_message_with_functor_unittests") { + testonly = true + + sources = [ + "post_message_with_functor_unittest.cc", + ] + deps = [ + ":checks", + ":gunit_helpers", + ":rtc_base", + ":rtc_post_message_with_functor", + "../test:test_support", + ] + } + rtc_source_set("rtc_base_unittests") { testonly = true defines = [] diff --git a/rtc_base/post_message_with_functor.h b/rtc_base/post_message_with_functor.h new file mode 100644 index 0000000000..389724d594 --- /dev/null +++ b/rtc_base/post_message_with_functor.h @@ -0,0 +1,77 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ +#define RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ + +#include + +#include "rtc_base/checks.h" +#include "rtc_base/constructor_magic.h" +#include "rtc_base/location.h" +#include "rtc_base/message_handler.h" +#include "rtc_base/thread.h" + +namespace rtc { + +namespace post_message_with_functor_internal { + +template +class SingleMessageHandlerWithFunctor : public MessageHandler { + public: + template + explicit SingleMessageHandlerWithFunctor(F&& functor) + : functor_(std::forward(functor)) {} + + void OnMessage(Message* msg) override { + functor_(); + delete this; + } + + private: + ~SingleMessageHandlerWithFunctor() override {} + + typename std::remove_reference::type functor_; + + RTC_DISALLOW_COPY_AND_ASSIGN(SingleMessageHandlerWithFunctor); +}; + +} // namespace post_message_with_functor_internal + +// Asynchronously posts a message that will invoke |functor| on the target +// thread. Ownership is passed and |functor| is destroyed on the target thread. +// Requirements of FunctorT: +// - FunctorT is movable. +// - FunctorT implements "T operator()()" or "T operator()() const" for some T +// (if T is not void, the return value is discarded on the target thread). +// - FunctorT has a public destructor that can be invoked from the target +// thread after operation() has been invoked. +// - The functor must not cause the thread to quit before +// PostMessageWithFunctor() is done. +template +void PostMessageWithFunctor(const Location& posted_from, + Thread* thread, + FunctorT&& functor) { + thread->Post( + posted_from, + new post_message_with_functor_internal::SingleMessageHandlerWithFunctor< + FunctorT>(std::forward(functor))); + // This DCHECK guarantees that the post was successful. + // Post() doesn't say whether it succeeded, but it will only fail if the + // thread is quitting. DCHECKing that the thread is not quitting *after* + // posting might yield some false positives (where the thread did in fact + // quit, but only after posting), but if we have false positives here then we + // have a race condition anyway. + RTC_DCHECK(!thread->IsQuitting()); +} + +} // namespace rtc + +#endif // RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ diff --git a/rtc_base/post_message_with_functor_unittest.cc b/rtc_base/post_message_with_functor_unittest.cc new file mode 100644 index 0000000000..025bfd467c --- /dev/null +++ b/rtc_base/post_message_with_functor_unittest.cc @@ -0,0 +1,229 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/post_message_with_functor.h" + +#include + +#include "rtc_base/bind.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/gunit.h" +#include "rtc_base/ref_counted_object.h" +#include "rtc_base/thread.h" +#include "test/gtest.h" + +namespace rtc { + +namespace { + +void ThreadIsCurrent(Thread* thread, bool* result, Event* event) { + *result = thread->IsCurrent(); + event->Set(); +} + +void WaitAndSetEvent(Event* wait_event, Event* set_event) { + wait_event->Wait(Event::kForever); + set_event->Set(); +} + +// A functor that keeps track of the number of copies and moves. +class LifeCycleFunctor { + public: + struct Stats { + size_t copy_count = 0; + size_t move_count = 0; + }; + + LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {} + LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; } + LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); } + + LifeCycleFunctor& operator=(const LifeCycleFunctor& other) { + stats_ = other.stats_; + event_ = other.event_; + ++stats_->copy_count; + return *this; + } + + LifeCycleFunctor& operator=(LifeCycleFunctor&& other) { + stats_ = other.stats_; + event_ = other.event_; + ++stats_->move_count; + return *this; + } + + void operator()() { event_->Set(); } + + private: + Stats* stats_; + Event* event_; +}; + +// A functor that verifies the thread it was destroyed on. +class DestructionFunctor { + public: + DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event) + : thread_(thread), + thread_was_current_(thread_was_current), + event_(event) {} + ~DestructionFunctor() { + // Only signal the event if this was the functor that was invoked to avoid + // the event being signaled due to the destruction of temporary/moved + // versions of this object. + if (was_invoked_) { + *thread_was_current_ = thread_->IsCurrent(); + event_->Set(); + } + } + + void operator()() { was_invoked_ = true; } + + private: + Thread* thread_; + bool* thread_was_current_; + Event* event_; + bool was_invoked_ = false; +}; + +} // namespace + +TEST(PostMessageWithFunctorTest, InvokesWithBind) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&Event::Set, &event)); + event.Wait(Event::kForever); +} + +TEST(PostMessageWithFunctorTest, InvokesWithLambda) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + [&event] { event.Set(); }); + event.Wait(Event::kForever); +} + +TEST(PostMessageWithFunctorTest, InvokesWithCopiedFunctor) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + LifeCycleFunctor::Stats stats; + Event event; + LifeCycleFunctor functor(&stats, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor); + event.Wait(Event::kForever); + + EXPECT_EQ(1u, stats.copy_count); + EXPECT_EQ(0u, stats.move_count); +} + +TEST(PostMessageWithFunctorTest, InvokesWithMovedFunctor) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + LifeCycleFunctor::Stats stats; + Event event; + LifeCycleFunctor functor(&stats, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + std::move(functor)); + event.Wait(Event::kForever); + + EXPECT_EQ(0u, stats.copy_count); + EXPECT_EQ(1u, stats.move_count); +} + +TEST(PostMessageWithFunctorTest, + InvokesWithCopiedFunctorDestroyedOnTargetThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + DestructionFunctor functor(background_thread.get(), + &was_invoked_on_background_thread, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(PostMessageWithFunctorTest, + InvokesWithMovedFunctorDestroyedOnTargetThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + DestructionFunctor functor(background_thread.get(), + &was_invoked_on_background_thread, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + std::move(functor)); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(PostMessageWithFunctorTest, InvokesOnBackgroundThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&ThreadIsCurrent, background_thread.get(), + &was_invoked_on_background_thread, &event)); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(PostMessageWithFunctorTest, InvokesAsynchronously) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + // The first event ensures that SendSingleMessage() is not blocking this + // thread. The second event ensures that the message is processed. + Event event_set_by_test_thread; + Event event_set_by_background_thread; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &event_set_by_test_thread, + &event_set_by_background_thread)); + event_set_by_test_thread.Set(); + event_set_by_background_thread.Wait(Event::kForever); +} + +TEST(PostMessageWithFunctorTest, InvokesInPostedOrder) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event first; + Event second; + Event third; + Event fourth; + + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &first, &second)); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &second, &third)); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &third, &fourth)); + + // All tasks have been posted before the first one is unblocked. + first.Set(); + // Only if the chain is invoked in posted order will the last event be set. + fourth.Wait(Event::kForever); +} + +} // namespace rtc