From 43f3982d6fcc21b27222bb941eed2e5bc4885225 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Wed, 5 Dec 2018 15:46:58 +0100 Subject: [PATCH] Remove TaskQueue::PostAndReply as unused Bug: webrtc:10191, webrtc:9728 Change-Id: Iaaa7c88bbbbfdd6e3e9bf5ab683bbdb2962a5cab Reviewed-on: https://webrtc-review.googlesource.com/c/107202 Commit-Queue: Danil Chapovalov Reviewed-by: Karl Wiberg Cr-Commit-Position: refs/heads/master@{#26202} --- rtc_base/task_queue.h | 36 ++------ rtc_base/task_queue_gcd.cc | 60 ------------- rtc_base/task_queue_libevent.cc | 147 +------------------------------- rtc_base/task_queue_unittest.cc | 87 ------------------- rtc_base/task_queue_win.cc | 35 -------- 5 files changed, 9 insertions(+), 356 deletions(-) diff --git a/rtc_base/task_queue.h b/rtc_base/task_queue.h index ea89b255b3..fa84625da1 100644 --- a/rtc_base/task_queue.h +++ b/rtc_base/task_queue.h @@ -175,11 +175,6 @@ class RTC_LOCKABLE RTC_EXPORT TaskQueue { // Ownership of the task is passed to PostTask. void PostTask(std::unique_ptr task); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply); // Schedules a task to execute a specified number of milliseconds from when // the call is made. The precision should be considered as "best effort" @@ -208,32 +203,15 @@ class RTC_LOCKABLE RTC_EXPORT TaskQueue { PostDelayedTask(NewClosure(std::forward(closure)), milliseconds); } - template - void PostTaskAndReply(Closure1&& task, - Closure2&& reply, - TaskQueue* reply_queue) { - PostTaskAndReply(NewClosure(std::forward(task)), - NewClosure(std::forward(reply)), reply_queue); - } - - template - void PostTaskAndReply(std::unique_ptr task, Closure&& reply) { - PostTaskAndReply(std::move(task), NewClosure(std::forward(reply))); - } - - template - void PostTaskAndReply(Closure&& task, std::unique_ptr reply) { - PostTaskAndReply(NewClosure(std::forward(task)), std::move(reply)); - } - - template - void PostTaskAndReply(Closure1&& task, Closure2&& reply) { - PostTaskAndReply(NewClosure(std::forward(task)), - NewClosure(std::forward(reply))); - } - private: class Impl; + // TODO(danilchap): Remove when external implementaions of TaskQueue remove + // these two functions. + void PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue); + void PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply); const scoped_refptr impl_; diff --git a/rtc_base/task_queue_gcd.cc b/rtc_base/task_queue_gcd.cc index c7731dda3e..f2f3412e38 100644 --- a/rtc_base/task_queue_gcd.cc +++ b/rtc_base/task_queue_gcd.cc @@ -55,10 +55,6 @@ class TaskQueue::Impl : public RefCountInterface { bool IsCurrent() const; void PostTask(std::unique_ptr task); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue); - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); private: @@ -97,41 +93,6 @@ class TaskQueue::Impl : public RefCountInterface { std::unique_ptr task; }; - // Special case context for holding two tasks, a |first_task| + the task - // that's owned by the parent struct, TaskContext, that then becomes the - // second (i.e. 'reply') task. - struct PostTaskAndReplyContext : public TaskContext { - explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx, - std::unique_ptr first_task, - QueueContext* second_queue_ctx, - std::unique_ptr second_task) - : TaskContext(second_queue_ctx, std::move(second_task)), - first_queue_ctx(first_queue_ctx), - first_task(std::move(first_task)), - reply_queue_(second_queue_ctx->queue->impl_->queue_) { - // Retain the reply queue for as long as this object lives. - // If we don't, we may have memory leaks and/or failures. - dispatch_retain(reply_queue_); - } - ~PostTaskAndReplyContext() override { dispatch_release(reply_queue_); } - - static void RunTask(void* context) { - auto* rc = static_cast(context); - if (rc->first_queue_ctx->is_active) { - AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue); - if (!rc->first_task->Run()) - rc->first_task.release(); - } - // Post the reply task. This hands the work over to the parent struct. - // This task will eventually delete |this|. - dispatch_async_f(rc->reply_queue_, rc, &TaskContext::RunTask); - } - - QueueContext* const first_queue_ctx; - std::unique_ptr first_task; - dispatch_queue_t reply_queue_; - }; - dispatch_queue_t queue_; QueueContext* const context_; }; @@ -192,14 +153,6 @@ void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, context, &TaskContext::RunTask); } -void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue) { - auto* context = new PostTaskAndReplyContext( - context_, std::move(task), reply_queue->context_, std::move(reply)); - dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask); -} - // Boilerplate for the PIMPL pattern. TaskQueue::TaskQueue(const char* queue_name, Priority priority) : impl_(new RefCountedObject(queue_name, this, priority)) { @@ -221,19 +174,6 @@ void TaskQueue::PostTask(std::unique_ptr task) { return TaskQueue::impl_->PostTask(std::move(task)); } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - reply_queue->impl_.get()); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - impl_.get()); -} - void TaskQueue::PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) { return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 905bbdac0e..d30c82df97 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -44,16 +44,14 @@ using internal::AutoSetCurrentQueuePtr; namespace { static const char kQuit = 1; static const char kRunTask = 2; -static const char kRunReplyTask = 3; using Priority = TaskQueue::Priority; // This ignores the SIGPIPE signal on the calling thread. // This signal can be fired when trying to write() to a pipe that's being // closed or while closing a pipe that's being written to. -// We can run into that situation (e.g. reply tasks that don't get a chance to -// run because the task queue is being deleted) so we ignore this signal and -// continue as normal. +// We can run into that situation so we ignore this signal and continue as +// normal. // As a side note for this implementation, it would be great if we could safely // restore the sigmask, but unfortunately the operation of restoring it, can // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS) @@ -133,10 +131,6 @@ class TaskQueue::Impl : public RefCountInterface { bool IsCurrent() const; void PostTask(std::unique_ptr task); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue); - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); private: @@ -145,14 +139,8 @@ class TaskQueue::Impl : public RefCountInterface { static void RunTask(int fd, short flags, void* context); // NOLINT static void RunTimer(int fd, short flags, void* context); // NOLINT - class ReplyTaskOwner; - class PostAndReplyTask; class SetTimerTask; - typedef RefCountedObject ReplyTaskOwnerRef; - - void PrepareReplyTask(scoped_refptr reply_task); - struct QueueContext; TaskQueue* const queue_; int wakeup_pipe_in_ = -1; @@ -162,8 +150,6 @@ class TaskQueue::Impl : public RefCountInterface { PlatformThread thread_; rtc::CriticalSection pending_lock_; std::list> pending_ RTC_GUARDED_BY(pending_lock_); - std::list> pending_replies_ - RTC_GUARDED_BY(pending_lock_); }; struct TaskQueue::Impl::QueueContext { @@ -174,90 +160,6 @@ struct TaskQueue::Impl::QueueContext { std::list pending_timers_; }; -// Posting a reply task is tricky business. This class owns the reply task -// and a reference to it is held by both the reply queue and the first task. -// Here's an outline of what happens when dealing with a reply task. -// * The ReplyTaskOwner owns the |reply_| task. -// * One ref owned by PostAndReplyTask -// * One ref owned by the reply TaskQueue -// * ReplyTaskOwner has a flag |run_task_| initially set to false. -// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). -// * After successfully running the original |task_|, PostAndReplyTask() calls -// set_should_run_task(). This sets |run_task_| to true. -// * In PostAndReplyTask's dtor: -// * It releases its reference to ReplyTaskOwner (important to do this first). -// * Sends (write()) a kRunReplyTask message to the reply queue's pipe. -// * PostAndReplyTask doesn't care if write() fails, but when it does: -// * The reply queue is gone. -// * ReplyTaskOwner has already been deleted and the reply task too. -// * If write() succeeds: -// * ReplyQueue receives the kRunReplyTask message -// * Goes through all pending tasks, finding the first that HasOneRef() -// * Calls ReplyTaskOwner::Run() -// * if set_should_run_task() was called, the reply task will be run -// * Release the reference to ReplyTaskOwner -// * ReplyTaskOwner and associated |reply_| are deleted. -class TaskQueue::Impl::ReplyTaskOwner { - public: - ReplyTaskOwner(std::unique_ptr reply) - : reply_(std::move(reply)) {} - - void Run() { - RTC_DCHECK(reply_); - if (run_task_) { - if (!reply_->Run()) - reply_.release(); - } - reply_.reset(); - } - - void set_should_run_task() { - RTC_DCHECK(!run_task_); - run_task_ = true; - } - - private: - std::unique_ptr reply_; - bool run_task_ = false; -}; - -class TaskQueue::Impl::PostAndReplyTask : public QueuedTask { - public: - PostAndReplyTask(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue, - int reply_pipe) - : task_(std::move(task)), - reply_pipe_(reply_pipe), - reply_task_owner_( - new RefCountedObject(std::move(reply))) { - reply_queue->PrepareReplyTask(reply_task_owner_); - } - - ~PostAndReplyTask() override { - reply_task_owner_ = nullptr; - IgnoreSigPipeSignalOnCurrentThread(); - // Send a signal to the reply queue that the reply task can run now. - // Depending on whether |set_should_run_task()| was called by the - // PostAndReplyTask(), the reply task may or may not actually run. - // In either case, it will be deleted. - char message = kRunReplyTask; - RTC_UNUSED(write(reply_pipe_, &message, sizeof(message))); - } - - private: - bool Run() override { - if (!task_->Run()) - task_.release(); - reply_task_owner_->set_should_run_task(); - return true; - } - - std::unique_ptr task_; - int reply_pipe_; - scoped_refptr> reply_task_owner_; -}; - class TaskQueue::Impl::SetTimerTask : public QueuedTask { public: SetTimerTask(std::unique_ptr task, uint32_t milliseconds) @@ -396,15 +298,6 @@ void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, } } -void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue) { - std::unique_ptr wrapper_task( - new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, - reply_queue->wakeup_pipe_in_)); - PostTask(std::move(wrapper_task)); -} - // static void TaskQueue::Impl::ThreadMain(void* context) { TaskQueue::Impl* me = static_cast(context); @@ -448,22 +341,6 @@ void TaskQueue::Impl::OnWakeup(int socket, task.release(); break; } - case kRunReplyTask: { - scoped_refptr reply_task; - { - CritScope lock(&ctx->queue->pending_lock_); - for (auto it = ctx->queue->pending_replies_.begin(); - it != ctx->queue->pending_replies_.end(); ++it) { - if ((*it)->HasOneRef()) { - reply_task = std::move(*it); - ctx->queue->pending_replies_.erase(it); - break; - } - } - } - reply_task->Run(); - break; - } default: RTC_NOTREACHED(); break; @@ -488,13 +365,6 @@ void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT delete timer; } -void TaskQueue::Impl::PrepareReplyTask( - scoped_refptr reply_task) { - RTC_DCHECK(reply_task); - CritScope lock(&pending_lock_); - pending_replies_.push_back(std::move(reply_task)); -} - TaskQueue::TaskQueue(const char* queue_name, Priority priority) : impl_(new RefCountedObject(queue_name, this, priority)) { } @@ -515,19 +385,6 @@ void TaskQueue::PostTask(std::unique_ptr task) { return TaskQueue::impl_->PostTask(std::move(task)); } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - reply_queue->impl_.get()); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - impl_.get()); -} - void TaskQueue::PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) { return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); diff --git a/rtc_base/task_queue_unittest.cc b/rtc_base/task_queue_unittest.cc index 9825635a73..a76c09eba8 100644 --- a/rtc_base/task_queue_unittest.cc +++ b/rtc_base/task_queue_unittest.cc @@ -192,19 +192,6 @@ TEST(TaskQueueTest, PostDelayedAfterDestruct) { EXPECT_FALSE(run.Wait(0)); // and should not run. } -TEST(TaskQueueTest, PostAndReply) { - static const char kPostQueue[] = "PostQueue"; - static const char kReplyQueue[] = "ReplyQueue"; - Event event; - TaskQueue post_queue(kPostQueue); - TaskQueue reply_queue(kReplyQueue); - - post_queue.PostTaskAndReply(Bind(&CheckCurrent, nullptr, &post_queue), - Bind(&CheckCurrent, &event, &reply_queue), - &reply_queue); - EXPECT_TRUE(event.Wait(1000)); -} - TEST(TaskQueueTest, PostAndReuse) { static const char kPostQueue[] = "PostQueue"; static const char kReplyQueue[] = "ReplyQueue"; @@ -252,20 +239,6 @@ TEST(TaskQueueTest, PostAndReuse) { EXPECT_TRUE(event.Wait(1000)); } -TEST(TaskQueueTest, PostAndReplyLambda) { - static const char kPostQueue[] = "PostQueue"; - static const char kReplyQueue[] = "ReplyQueue"; - Event event; - TaskQueue post_queue(kPostQueue); - TaskQueue reply_queue(kReplyQueue); - - bool my_flag = false; - post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; }, - [&event]() { event.Set(); }, &reply_queue); - EXPECT_TRUE(event.Wait(1000)); - EXPECT_TRUE(my_flag); -} - TEST(TaskQueueTest, PostCopyableClosure) { struct CopyableClosure { CopyableClosure(int* num_copies, int* num_moves, Event* event) @@ -365,66 +338,6 @@ TEST(TaskQueueTest, PostMoveOnlyCleanup) { EXPECT_TRUE(event_run.Wait(0)); } -// This test covers a particular bug that we had in the libevent implementation -// where we could hit a deadlock while trying to post a reply task to a queue -// that was being deleted. The test isn't guaranteed to hit that case but it's -// written in a way that makes it likely and by running with --gtest_repeat=1000 -// the bug would occur. Alas, now it should be fixed. -TEST(TaskQueueTest, PostAndReplyDeadlock) { - Event event; - TaskQueue post_queue("PostQueue"); - TaskQueue reply_queue("ReplyQueue"); - - post_queue.PostTaskAndReply([&event]() { event.Set(); }, []() {}, - &reply_queue); - EXPECT_TRUE(event.Wait(1000)); -} - -// http://bugs.webrtc.org/9728 -#if defined(WEBRTC_WIN) -#define MAYBE_DeleteTaskQueueAfterPostAndReply \ - DISABLED_DeleteTaskQueueAfterPostAndReply -#else -#define MAYBE_DeleteTaskQueueAfterPostAndReply DeleteTaskQueueAfterPostAndReply -#endif -TEST(TaskQueueTest, MAYBE_DeleteTaskQueueAfterPostAndReply) { - Event task_deleted; - Event reply_deleted; - auto* task_queue = new TaskQueue("Queue"); - - task_queue->PostTaskAndReply( - /*task=*/rtc::NewClosure( - /*closure=*/[] {}, - /*cleanup=*/[&task_deleted] { task_deleted.Set(); }), - /*reply=*/rtc::NewClosure( - /*closure=*/[] {}, - /*cleanup=*/[&reply_deleted] { reply_deleted.Set(); })); - - delete task_queue; - - EXPECT_TRUE(task_deleted.Wait(1000)); - EXPECT_TRUE(reply_deleted.Wait(1000)); -} - -void TestPostTaskAndReply(TaskQueue* work_queue, Event* event) { - ASSERT_FALSE(work_queue->IsCurrent()); - work_queue->PostTaskAndReply(Bind(&CheckCurrent, nullptr, work_queue), - NewClosure([event]() { event->Set(); })); -} - -// Does a PostTaskAndReply from within a task to post and reply to the current -// queue. All in all there will be 3 tasks posted and run. -TEST(TaskQueueTest, PostAndReply2) { - static const char kQueueName[] = "PostAndReply2"; - static const char kWorkQueueName[] = "PostAndReply2_Worker"; - Event event; - TaskQueue queue(kQueueName); - TaskQueue work_queue(kWorkQueueName); - - queue.PostTask(Bind(&TestPostTaskAndReply, &work_queue, &event)); - EXPECT_TRUE(event.Wait(1000)); -} - // Tests posting more messages than a queue can queue up. // In situations like that, tasks will get dropped. TEST(TaskQueueTest, PostALot) { diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc index c1e7c4621c..565cc9b70f 100644 --- a/rtc_base/task_queue_win.cc +++ b/rtc_base/task_queue_win.cc @@ -190,10 +190,6 @@ class TaskQueue::Impl : public RefCountInterface { } void PostTask(std::unique_ptr task); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue); - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); void RunPendingTasks(); @@ -322,24 +318,6 @@ void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, } } -void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue) { - QueuedTask* task_ptr = task.release(); - QueuedTask* reply_task_ptr = reply.release(); - DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); - PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { - if (task_ptr->Run()) - delete task_ptr; - // If the thread's message queue is full, we can't queue the task and will - // have to drop it (i.e. delete). - if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, - reinterpret_cast(reply_task_ptr))) { - delete reply_task_ptr; - } - }); -} - void TaskQueue::Impl::RunPendingTasks() { while (true) { std::unique_ptr task; @@ -500,19 +478,6 @@ void TaskQueue::PostTask(std::unique_ptr task) { return TaskQueue::impl_->PostTask(std::move(task)); } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - reply_queue->impl_.get()); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - impl_.get()); -} - void TaskQueue::PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) { return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);