Implement an OperationsChain, to be used by PeerConnection in follow-up.

This allows asynchronous tasks to be queued to be executed in order.
The class is motivated by the "operations chain" in the spec:
https://w3c.github.io/webrtc-pc/#dfn-operations-chain

In a follow-up CL I intend to use this in PeerConnection's
CreateOffer(), CreateAnswer() SetLocalDescription() and
SetRemoteDescription() and unblock https://crbug.com/980885.

For background, motivation, requirements and implementation notes, see
https://docs.google.com/document/d/1XLwNN2kUIGGTwz9LQ0NwJNkcybi9oKnynUEZB1jGA14/edit?usp=sharing

Bug: webrtc:11019
Change-Id: I982e4a1c0e77fa62096c16deed459d9d9e9b63f0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/156120
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29582}
This commit is contained in:
Henrik Boström 2019-10-21 15:21:55 +02:00 committed by Commit Bot
parent fcf79cca7b
commit 27c293665e
5 changed files with 619 additions and 0 deletions

View File

@ -521,6 +521,7 @@ if (rtc_include_tests) {
"rtc_base:rtc_base_unittests",
"rtc_base:rtc_json_unittests",
"rtc_base:rtc_numerics_unittests",
"rtc_base:rtc_operations_chain_unittests",
"rtc_base:rtc_task_queue_unittests",
"rtc_base:sigslot_unittest",
"rtc_base:weak_ptr_unittests",

View File

@ -465,6 +465,21 @@ rtc_library("rtc_task_queue") {
]
}
rtc_source_set("rtc_operations_chain") {
visibility = [ "*" ]
sources = [
"operations_chain.cc",
"operations_chain.h",
]
deps = [
":checks",
":macromagic",
":refcount",
"../api:scoped_refptr",
"synchronization:sequence_checker",
]
}
if (rtc_enable_libevent) {
rtc_library("rtc_task_queue_libevent") {
visibility = [ "../api/task_queue:default_task_queue_factory" ]
@ -1233,6 +1248,21 @@ if (rtc_include_tests) {
]
}
rtc_library("rtc_operations_chain_unittests") {
testonly = true
sources = [
"operations_chain_unittest.cc",
]
deps = [
":rtc_base",
":rtc_base_approved",
":rtc_event",
":rtc_operations_chain",
"../test:test_support",
]
}
rtc_library("weak_ptr_unittests") {
testonly = true

View File

@ -0,0 +1,68 @@
/*
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/operations_chain.h"
#include "rtc_base/checks.h"
namespace rtc {
OperationsChain::CallbackHandle::CallbackHandle(
scoped_refptr<OperationsChain> operations_chain)
: operations_chain_(std::move(operations_chain)) {}
OperationsChain::CallbackHandle::~CallbackHandle() {
RTC_DCHECK(has_run_);
}
void OperationsChain::CallbackHandle::OnOperationComplete() {
RTC_DCHECK(!has_run_);
#ifdef RTC_DCHECK_IS_ON
has_run_ = true;
#endif // RTC_DCHECK_IS_ON
operations_chain_->OnOperationComplete();
// We have no reason to keep the |operations_chain_| alive through reference
// counting anymore.
operations_chain_ = nullptr;
}
// static
scoped_refptr<OperationsChain> OperationsChain::Create() {
return new OperationsChain();
}
OperationsChain::OperationsChain() : RefCountedObject() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
}
OperationsChain::~OperationsChain() {
// Operations keep the chain alive through reference counting so this should
// not be possible. The fact that the chain is empty makes it safe to
// destroy the OperationsChain on any sequence.
RTC_DCHECK(chained_operations_.empty());
}
std::function<void()> OperationsChain::CreateOperationsChainCallback() {
return [handle = rtc::scoped_refptr<CallbackHandle>(
new CallbackHandle(this))]() { handle->OnOperationComplete(); };
}
void OperationsChain::OnOperationComplete() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// The front element is the operation that just completed, remove it.
RTC_DCHECK(!chained_operations_.empty());
chained_operations_.pop();
// If there are any other operations chained, execute the next one.
if (!chained_operations_.empty()) {
chained_operations_.front()->Run();
}
}
} // namespace rtc

183
rtc_base/operations_chain.h Normal file
View File

@ -0,0 +1,183 @@
/*
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_BASE_OPERATIONS_CHAIN_H_
#define RTC_BASE_OPERATIONS_CHAIN_H_
#include <functional>
#include <memory>
#include <queue>
#include <set>
#include <type_traits>
#include <utility>
#include "api/scoped_refptr.h"
#include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/ref_count.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/synchronization/sequence_checker.h"
namespace rtc {
namespace rtc_operations_chain_internal {
// Abstract base class for operations on the OperationsChain. Run() must be
// invoked exactly once during the Operation's lifespan.
class Operation {
public:
virtual ~Operation() {}
virtual void Run() = 0;
};
// FunctorT is the same as in OperationsChain::ChainOperation(). |callback_| is
// passed on to the |functor_| and is used to inform the OperationsChain that
// the operation completed. The functor is responsible for invoking the
// callback when the operation has completed.
template <typename FunctorT>
class OperationWithFunctor final : public Operation {
public:
OperationWithFunctor(FunctorT&& functor, std::function<void()> callback)
: functor_(std::forward<FunctorT>(functor)),
callback_(std::move(callback)) {}
~OperationWithFunctor() override { RTC_DCHECK(has_run_); }
void Run() override {
RTC_DCHECK(!has_run_);
#ifdef RTC_DCHECK_IS_ON
has_run_ = true;
#endif // RTC_DCHECK_IS_ON
functor_(std::move(callback_));
}
private:
typename std::remove_reference<FunctorT>::type functor_;
std::function<void()> callback_;
#ifdef RTC_DCHECK_IS_ON
bool has_run_ = false;
#endif // RTC_DCHECK_IS_ON
};
} // namespace rtc_operations_chain_internal
// An implementation of an operations chain. An operations chain is used to
// ensure that asynchronous tasks are executed in-order with at most one task
// running at a time. The notion of an operation chain is defined in
// https://w3c.github.io/webrtc-pc/#dfn-operations-chain, though unlike this
// implementation, the referenced definition is coupled with a peer connection.
//
// An operation is an asynchronous task. The operation starts when its functor
// is invoked, and completes when the callback that is passed to functor is
// invoked by the operation. The operation must start and complete on the same
// sequence that the operation was "chained" on. As such, the OperationsChain
// operates in a "single-threaded" fashion, but the asynchronous operations may
// use any number of threads to achieve "in parallel" behavior.
//
// When an operation is chained onto the OperationsChain, it is enqueued to be
// executed. Operations are executed in FIFO order, where the next operation
// does not start until the previous operation has completed. OperationsChain
// guarantees that:
// - If the operations chain is empty when an operation is chained, the
// operation starts immediately, inside ChainOperation().
// - If the operations chain is not empty when an operation is chained, the
// operation starts upon the previous operation completing, inside the
// callback.
//
// An operation is contractually obligated to invoke the completion callback
// exactly once. Cancelling a chained operation is not supported by the
// OperationsChain; an operation that wants to be cancellable is responsible for
// aborting its own steps. The callback must still be invoked.
//
// The OperationsChain is kept-alive through reference counting if there are
// operations pending. This, together with the contract, guarantees that all
// operations that are chained get executed.
class OperationsChain final : public RefCountedObject<RefCountInterface> {
public:
static scoped_refptr<OperationsChain> Create();
~OperationsChain();
// Chains an operation. Chained operations are executed in FIFO order. The
// operation starts when |functor| is executed by the OperationsChain and is
// contractually obligated to invoke the callback passed to it when the
// operation is complete. Operations must start and complete on the same
// sequence that this method was invoked on.
//
// If the OperationsChain is empty, the operation starts immediately.
// Otherwise it starts upon the previous operation completing.
//
// Requirements of FunctorT:
// - FunctorT is movable.
// - FunctorT implements "T operator()(std::function<void()> callback)" or
// "T operator()(std::function<void()> callback) const" for some T (if T is
// not void, the return value is discarded in the invoking sequence). The
// operator starts the operation; when the operation is complete, "callback"
// MUST be invoked, and it MUST be so on the sequence that ChainOperation()
// was invoked on.
//
// Lambda expressions are valid functors.
template <typename FunctorT>
void ChainOperation(FunctorT&& functor) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
chained_operations_.push(
std::make_unique<
rtc_operations_chain_internal::OperationWithFunctor<FunctorT>>(
std::forward<FunctorT>(functor), CreateOperationsChainCallback()));
// If this is the only operation in the chain we execute it immediately.
// Otherwise the callback will get invoked when the pending operation
// completes which will trigger the next operation to execute.
if (chained_operations_.size() == 1) {
chained_operations_.front()->Run();
}
}
private:
friend class CallbackHandle;
// The callback that is passed to an operation's functor (that is used to
// inform the OperationsChain that the operation has completed) is of type
// std::function<void()>, which is a copyable type. To allow the callback to
// be copyable, it is backed up by this reference counted handle. See
// CreateOperationsChainCallback().
class CallbackHandle final : public RefCountedObject<RefCountInterface> {
public:
explicit CallbackHandle(scoped_refptr<OperationsChain> operations_chain);
~CallbackHandle();
void OnOperationComplete();
private:
scoped_refptr<OperationsChain> operations_chain_;
#ifdef RTC_DCHECK_IS_ON
bool has_run_ = false;
#endif // RTC_DCHECK_IS_ON
RTC_DISALLOW_COPY_AND_ASSIGN(CallbackHandle);
};
OperationsChain();
std::function<void()> CreateOperationsChainCallback();
void OnOperationComplete();
webrtc::SequenceChecker sequence_checker_;
// FIFO-list of operations that are chained. An operation that is executing
// remains on this list until it has completed by invoking the callback passed
// to it.
std::queue<std::unique_ptr<rtc_operations_chain_internal::Operation>>
chained_operations_ RTC_GUARDED_BY(sequence_checker_);
RTC_DISALLOW_COPY_AND_ASSIGN(OperationsChain);
};
} // namespace rtc
#endif // RTC_BASE_OPERATIONS_CHAIN_H_

View File

@ -0,0 +1,337 @@
/*
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/operations_chain.h"
#include <functional>
#include <memory>
#include <utility>
#include <vector>
#include "rtc_base/bind.h"
#include "rtc_base/event.h"
#include "rtc_base/thread.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace rtc {
using ::testing::ElementsAre;
class OperationTracker {
public:
OperationTracker() : background_thread_(Thread::Create()) {
background_thread_->Start();
}
// The caller is responsible for ensuring that no operations are pending.
~OperationTracker() {}
// Creates a binding for the synchronous operation (see
// StartSynchronousOperation() below).
std::function<void(std::function<void()>)> BindSynchronousOperation(
Event* operation_complete_event) {
return [this, operation_complete_event](std::function<void()> callback) {
StartSynchronousOperation(operation_complete_event, std::move(callback));
};
}
// Creates a binding for the asynchronous operation (see
// StartAsynchronousOperation() below).
std::function<void(std::function<void()>)> BindAsynchronousOperation(
Event* unblock_operation_event,
Event* operation_complete_event) {
return [this, unblock_operation_event,
operation_complete_event](std::function<void()> callback) {
StartAsynchronousOperation(unblock_operation_event,
operation_complete_event, std::move(callback));
};
}
// When an operation is completed, its associated Event* is added to this
// list, in chronological order. This allows you to verify the order that
// operations are executed.
const std::vector<Event*>& completed_operation_events() const {
return completed_operation_events_;
}
private:
// This operation is completed synchronously; the callback is invoked before
// the function returns.
void StartSynchronousOperation(Event* operation_complete_event,
std::function<void()> callback) {
completed_operation_events_.push_back(operation_complete_event);
operation_complete_event->Set();
callback();
}
// This operation is completed asynchronously; it pings |background_thread_|,
// blocking that thread until |unblock_operation_event| is signaled and then
// completes upon posting back to the thread that the operation started on.
// Note that this requires the starting thread to be executing tasks (handle
// messages), i.e. must not be blocked.
void StartAsynchronousOperation(Event* unblock_operation_event,
Event* operation_complete_event,
std::function<void()> callback) {
Thread* current_thread = Thread::Current();
background_thread_->PostTask(
RTC_FROM_HERE, [this, current_thread, unblock_operation_event,
operation_complete_event, callback]() {
unblock_operation_event->Wait(Event::kForever);
current_thread->PostTask(
RTC_FROM_HERE, [this, operation_complete_event, callback]() {
completed_operation_events_.push_back(operation_complete_event);
operation_complete_event->Set();
callback();
});
});
}
std::unique_ptr<Thread> background_thread_;
std::vector<Event*> completed_operation_events_;
};
// The OperationTrackerProxy ensures all operations are chained on a separate
// thread. This allows tests to block while chained operations are posting
// between threads.
class OperationTrackerProxy {
public:
OperationTrackerProxy()
: operations_chain_thread_(Thread::Create()),
operation_tracker_(nullptr),
operations_chain_(nullptr) {
operations_chain_thread_->Start();
}
std::unique_ptr<Event> Initialize() {
std::unique_ptr<Event> event = std::make_unique<Event>();
operations_chain_thread_->PostTask(
RTC_FROM_HERE, [this, event_ptr = event.get()]() {
operation_tracker_ = std::make_unique<OperationTracker>();
operations_chain_ = OperationsChain::Create();
event_ptr->Set();
});
return event;
}
std::unique_ptr<Event> ReleaseOperationChain() {
std::unique_ptr<Event> event = std::make_unique<Event>();
operations_chain_thread_->PostTask(RTC_FROM_HERE,
[this, event_ptr = event.get()]() {
operations_chain_ = nullptr;
event_ptr->Set();
});
return event;
}
// Chains a synchronous operation on the operation chain's thread.
std::unique_ptr<Event> PostSynchronousOperation() {
std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
operations_chain_thread_->PostTask(
RTC_FROM_HERE, [this, operation_complete_event_ptr =
operation_complete_event.get()]() {
operations_chain_->ChainOperation(
operation_tracker_->BindSynchronousOperation(
operation_complete_event_ptr));
});
return operation_complete_event;
}
// Chains an asynchronous operation on the operation chain's thread. This
// involves the operation chain thread and an additional background thread.
std::unique_ptr<Event> PostAsynchronousOperation(
Event* unblock_operation_event) {
std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
operations_chain_thread_->PostTask(
RTC_FROM_HERE,
[this, unblock_operation_event,
operation_complete_event_ptr = operation_complete_event.get()]() {
operations_chain_->ChainOperation(
operation_tracker_->BindAsynchronousOperation(
unblock_operation_event, operation_complete_event_ptr));
});
return operation_complete_event;
}
// The order of completed events. Touches the |operation_tracker_| on the
// calling thread, this is only thread safe if all chained operations have
// completed.
const std::vector<Event*>& completed_operation_events() const {
return operation_tracker_->completed_operation_events();
}
private:
std::unique_ptr<Thread> operations_chain_thread_;
std::unique_ptr<OperationTracker> operation_tracker_;
scoped_refptr<OperationsChain> operations_chain_;
};
TEST(OperationsChainTest, SynchronousOperation) {
OperationTrackerProxy operation_tracker_proxy;
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever);
}
TEST(OperationsChainTest, AsynchronousOperation) {
OperationTrackerProxy operation_tracker_proxy;
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
Event unblock_async_operation_event;
auto async_operation_completed_event =
operation_tracker_proxy.PostAsynchronousOperation(
&unblock_async_operation_event);
// This should not be signaled until we unblock the operation.
EXPECT_FALSE(async_operation_completed_event->Wait(0));
// Unblock the operation and wait for it to complete.
unblock_async_operation_event.Set();
async_operation_completed_event->Wait(Event::kForever);
}
TEST(OperationsChainTest,
SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) {
// Testing synchonicity must be done without the OperationTrackerProxy to
// ensure messages are not processed in parallel. This test has no background
// threads.
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
OperationTracker operation_tracker;
Event event0;
operations_chain->ChainOperation(
operation_tracker.BindSynchronousOperation(&event0));
// This should already be signaled. (If it wasn't, waiting wouldn't help,
// because we'd be blocking the only thread that exists.)
EXPECT_TRUE(event0.Wait(0));
// Chaining another operation should also execute immediately because the
// chain should already be empty.
Event event1;
operations_chain->ChainOperation(
operation_tracker.BindSynchronousOperation(&event1));
EXPECT_TRUE(event1.Wait(0));
}
TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) {
OperationTrackerProxy operation_tracker_proxy;
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
Event unblock_async_operation_event;
auto async_operation_completed_event =
operation_tracker_proxy.PostAsynchronousOperation(
&unblock_async_operation_event);
auto sync_operation_completed_event =
operation_tracker_proxy.PostSynchronousOperation();
unblock_async_operation_event.Set();
sync_operation_completed_event->Wait(Event::kForever);
// The asynchronous avent should have blocked the synchronous event, meaning
// this should already be signaled.
EXPECT_TRUE(async_operation_completed_event->Wait(0));
}
TEST(OperationsChainTest, OperationsAreExecutedInOrder) {
OperationTrackerProxy operation_tracker_proxy;
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
// Chain a mix of asynchronous and synchronous operations.
Event operation0_unblock_event;
auto operation0_completed_event =
operation_tracker_proxy.PostAsynchronousOperation(
&operation0_unblock_event);
Event operation1_unblock_event;
auto operation1_completed_event =
operation_tracker_proxy.PostAsynchronousOperation(
&operation1_unblock_event);
auto operation2_completed_event =
operation_tracker_proxy.PostSynchronousOperation();
auto operation3_completed_event =
operation_tracker_proxy.PostSynchronousOperation();
Event operation4_unblock_event;
auto operation4_completed_event =
operation_tracker_proxy.PostAsynchronousOperation(
&operation4_unblock_event);
auto operation5_completed_event =
operation_tracker_proxy.PostSynchronousOperation();
Event operation6_unblock_event;
auto operation6_completed_event =
operation_tracker_proxy.PostAsynchronousOperation(
&operation6_unblock_event);
// Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and
// don't need to be unblocked.
operation6_unblock_event.Set();
operation4_unblock_event.Set();
operation1_unblock_event.Set();
operation0_unblock_event.Set();
// Await all operations. The await-order shouldn't matter since they all get
// executed eventually.
operation0_completed_event->Wait(Event::kForever);
operation1_completed_event->Wait(Event::kForever);
operation2_completed_event->Wait(Event::kForever);
operation3_completed_event->Wait(Event::kForever);
operation4_completed_event->Wait(Event::kForever);
operation5_completed_event->Wait(Event::kForever);
operation6_completed_event->Wait(Event::kForever);
EXPECT_THAT(
operation_tracker_proxy.completed_operation_events(),
ElementsAre(
operation0_completed_event.get(), operation1_completed_event.get(),
operation2_completed_event.get(), operation3_completed_event.get(),
operation4_completed_event.get(), operation5_completed_event.get(),
operation6_completed_event.get()));
}
TEST(OperationsChainTest,
SafeToReleaseReferenceToOperationChainWhileOperationIsPending) {
OperationTrackerProxy operation_tracker_proxy;
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
Event unblock_async_operation_event;
auto async_operation_completed_event =
operation_tracker_proxy.PostAsynchronousOperation(
&unblock_async_operation_event);
// Pending operations keep the OperationChain alive, making it safe for the
// test to release any references before unblocking the async operation.
operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever);
unblock_async_operation_event.Set();
async_operation_completed_event->Wait(Event::kForever);
}
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
TEST(OperationsChainTest, OperationNotInvokingCallbackShouldCrash) {
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
EXPECT_DEATH(
operations_chain->ChainOperation([](std::function<void()> callback) {}),
"");
}
TEST(OperationsChainTest, OperationInvokingCallbackMultipleTimesShouldCrash) {
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
EXPECT_DEATH(
operations_chain->ChainOperation([](std::function<void()> callback) {
// Signal that the operation has completed multiple times.
callback();
callback();
}),
"");
}
#endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
} // namespace rtc