diff --git a/BUILD.gn b/BUILD.gn index 0f68aacfd1..4c10d668ae 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -491,6 +491,7 @@ if (rtc_include_tests) { "rtc_base:rtc_base_unittests", "rtc_base:rtc_json_unittests", "rtc_base:rtc_numerics_unittests", + "rtc_base:rtc_post_message_with_functor_unittests", "rtc_base:rtc_task_queue_unittests", "rtc_base:sequenced_task_checker_unittests", "rtc_base:sigslot_unittest", diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index c152533469..9809a8ed5f 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -1063,6 +1063,17 @@ rtc_static_library("rtc_base") { } } +rtc_source_set("rtc_post_message_with_functor") { + sources = [ + "post_message_with_functor.h", + ] + deps = [ + ":checks", + ":rtc_base", + ":rtc_event", + ] +} + rtc_source_set("gtest_prod") { visibility = [ "*" ] sources = [ @@ -1398,6 +1409,21 @@ if (rtc_include_tests) { ] } + rtc_source_set("rtc_post_message_with_functor_unittests") { + testonly = true + + sources = [ + "post_message_with_functor_unittest.cc", + ] + deps = [ + ":checks", + ":gunit_helpers", + ":rtc_base", + ":rtc_post_message_with_functor", + "../test:test_support", + ] + } + rtc_source_set("rtc_base_unittests") { testonly = true defines = [] diff --git a/rtc_base/post_message_with_functor.h b/rtc_base/post_message_with_functor.h new file mode 100644 index 0000000000..389724d594 --- /dev/null +++ b/rtc_base/post_message_with_functor.h @@ -0,0 +1,77 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ +#define RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ + +#include + +#include "rtc_base/checks.h" +#include "rtc_base/constructor_magic.h" +#include "rtc_base/location.h" +#include "rtc_base/message_handler.h" +#include "rtc_base/thread.h" + +namespace rtc { + +namespace post_message_with_functor_internal { + +template +class SingleMessageHandlerWithFunctor : public MessageHandler { + public: + template + explicit SingleMessageHandlerWithFunctor(F&& functor) + : functor_(std::forward(functor)) {} + + void OnMessage(Message* msg) override { + functor_(); + delete this; + } + + private: + ~SingleMessageHandlerWithFunctor() override {} + + typename std::remove_reference::type functor_; + + RTC_DISALLOW_COPY_AND_ASSIGN(SingleMessageHandlerWithFunctor); +}; + +} // namespace post_message_with_functor_internal + +// Asynchronously posts a message that will invoke |functor| on the target +// thread. Ownership is passed and |functor| is destroyed on the target thread. +// Requirements of FunctorT: +// - FunctorT is movable. +// - FunctorT implements "T operator()()" or "T operator()() const" for some T +// (if T is not void, the return value is discarded on the target thread). +// - FunctorT has a public destructor that can be invoked from the target +// thread after operation() has been invoked. +// - The functor must not cause the thread to quit before +// PostMessageWithFunctor() is done. +template +void PostMessageWithFunctor(const Location& posted_from, + Thread* thread, + FunctorT&& functor) { + thread->Post( + posted_from, + new post_message_with_functor_internal::SingleMessageHandlerWithFunctor< + FunctorT>(std::forward(functor))); + // This DCHECK guarantees that the post was successful. + // Post() doesn't say whether it succeeded, but it will only fail if the + // thread is quitting. DCHECKing that the thread is not quitting *after* + // posting might yield some false positives (where the thread did in fact + // quit, but only after posting), but if we have false positives here then we + // have a race condition anyway. + RTC_DCHECK(!thread->IsQuitting()); +} + +} // namespace rtc + +#endif // RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ diff --git a/rtc_base/post_message_with_functor_unittest.cc b/rtc_base/post_message_with_functor_unittest.cc new file mode 100644 index 0000000000..025bfd467c --- /dev/null +++ b/rtc_base/post_message_with_functor_unittest.cc @@ -0,0 +1,229 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/post_message_with_functor.h" + +#include + +#include "rtc_base/bind.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/gunit.h" +#include "rtc_base/ref_counted_object.h" +#include "rtc_base/thread.h" +#include "test/gtest.h" + +namespace rtc { + +namespace { + +void ThreadIsCurrent(Thread* thread, bool* result, Event* event) { + *result = thread->IsCurrent(); + event->Set(); +} + +void WaitAndSetEvent(Event* wait_event, Event* set_event) { + wait_event->Wait(Event::kForever); + set_event->Set(); +} + +// A functor that keeps track of the number of copies and moves. +class LifeCycleFunctor { + public: + struct Stats { + size_t copy_count = 0; + size_t move_count = 0; + }; + + LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {} + LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; } + LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); } + + LifeCycleFunctor& operator=(const LifeCycleFunctor& other) { + stats_ = other.stats_; + event_ = other.event_; + ++stats_->copy_count; + return *this; + } + + LifeCycleFunctor& operator=(LifeCycleFunctor&& other) { + stats_ = other.stats_; + event_ = other.event_; + ++stats_->move_count; + return *this; + } + + void operator()() { event_->Set(); } + + private: + Stats* stats_; + Event* event_; +}; + +// A functor that verifies the thread it was destroyed on. +class DestructionFunctor { + public: + DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event) + : thread_(thread), + thread_was_current_(thread_was_current), + event_(event) {} + ~DestructionFunctor() { + // Only signal the event if this was the functor that was invoked to avoid + // the event being signaled due to the destruction of temporary/moved + // versions of this object. + if (was_invoked_) { + *thread_was_current_ = thread_->IsCurrent(); + event_->Set(); + } + } + + void operator()() { was_invoked_ = true; } + + private: + Thread* thread_; + bool* thread_was_current_; + Event* event_; + bool was_invoked_ = false; +}; + +} // namespace + +TEST(PostMessageWithFunctorTest, InvokesWithBind) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&Event::Set, &event)); + event.Wait(Event::kForever); +} + +TEST(PostMessageWithFunctorTest, InvokesWithLambda) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + [&event] { event.Set(); }); + event.Wait(Event::kForever); +} + +TEST(PostMessageWithFunctorTest, InvokesWithCopiedFunctor) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + LifeCycleFunctor::Stats stats; + Event event; + LifeCycleFunctor functor(&stats, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor); + event.Wait(Event::kForever); + + EXPECT_EQ(1u, stats.copy_count); + EXPECT_EQ(0u, stats.move_count); +} + +TEST(PostMessageWithFunctorTest, InvokesWithMovedFunctor) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + LifeCycleFunctor::Stats stats; + Event event; + LifeCycleFunctor functor(&stats, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + std::move(functor)); + event.Wait(Event::kForever); + + EXPECT_EQ(0u, stats.copy_count); + EXPECT_EQ(1u, stats.move_count); +} + +TEST(PostMessageWithFunctorTest, + InvokesWithCopiedFunctorDestroyedOnTargetThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + DestructionFunctor functor(background_thread.get(), + &was_invoked_on_background_thread, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(PostMessageWithFunctorTest, + InvokesWithMovedFunctorDestroyedOnTargetThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + DestructionFunctor functor(background_thread.get(), + &was_invoked_on_background_thread, &event); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + std::move(functor)); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(PostMessageWithFunctorTest, InvokesOnBackgroundThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&ThreadIsCurrent, background_thread.get(), + &was_invoked_on_background_thread, &event)); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(PostMessageWithFunctorTest, InvokesAsynchronously) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + // The first event ensures that SendSingleMessage() is not blocking this + // thread. The second event ensures that the message is processed. + Event event_set_by_test_thread; + Event event_set_by_background_thread; + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &event_set_by_test_thread, + &event_set_by_background_thread)); + event_set_by_test_thread.Set(); + event_set_by_background_thread.Wait(Event::kForever); +} + +TEST(PostMessageWithFunctorTest, InvokesInPostedOrder) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event first; + Event second; + Event third; + Event fourth; + + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &first, &second)); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &second, &third)); + PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), + Bind(&WaitAndSetEvent, &third, &fourth)); + + // All tasks have been posted before the first one is unblocked. + first.Set(); + // Only if the chain is invoked in posted order will the last event be set. + fourth.Wait(Event::kForever); +} + +} // namespace rtc