Extended proxy abstraction, to call certain methods to the worker thread.

Extracted from cl https://codereview.webrtc.org/1766653002/, where
AddOrUpdateSink results in a deadlock.

BUG=webrtc:5426

Review URL: https://codereview.webrtc.org/1861633002

Cr-Commit-Position: refs/heads/master@{#12281}
This commit is contained in:
nisse 2016-04-07 07:45:54 -07:00 committed by Commit bot
parent eb13302e39
commit 5b68ab50bb
14 changed files with 132 additions and 74 deletions

View File

@ -35,22 +35,24 @@ BEGIN_PROXY_MAP(AudioTrack)
PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*)
END_PROXY()
BEGIN_PROXY_MAP(VideoTrack)
BEGIN_WORKER_PROXY_MAP(VideoTrack)
PROXY_CONSTMETHOD0(std::string, kind)
PROXY_CONSTMETHOD0(std::string, id)
PROXY_CONSTMETHOD0(TrackState, state)
PROXY_CONSTMETHOD0(bool, enabled)
PROXY_METHOD1(bool, set_enabled, bool)
PROXY_METHOD2(void,
PROXY_WORKER_METHOD2(void,
AddOrUpdateSink,
rtc::VideoSinkInterface<cricket::VideoFrame>*,
const rtc::VideoSinkWants&)
PROXY_METHOD1(void, RemoveSink, rtc::VideoSinkInterface<cricket::VideoFrame>*)
PROXY_WORKER_METHOD1(void,
RemoveSink,
rtc::VideoSinkInterface<cricket::VideoFrame>*)
PROXY_CONSTMETHOD0(VideoTrackSourceInterface*, GetSource)
PROXY_METHOD1(void, RegisterObserver, ObserverInterface*)
PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*)
END_PROXY()
END_WORKER_PROXY()
} // namespace webrtc

View File

@ -211,7 +211,8 @@ PeerConnectionFactory::CreateVideoSource(
rtc::scoped_refptr<VideoTrackSourceInterface> source(
VideoCapturerTrackSource::Create(worker_thread_, capturer, constraints,
false));
return VideoTrackSourceProxy::Create(signaling_thread_, source);
return VideoTrackSourceProxy::Create(signaling_thread_, worker_thread_,
source);
}
rtc::scoped_refptr<VideoTrackSourceInterface>
@ -219,7 +220,8 @@ PeerConnectionFactory::CreateVideoSource(cricket::VideoCapturer* capturer) {
RTC_DCHECK(signaling_thread_->IsCurrent());
rtc::scoped_refptr<VideoTrackSourceInterface> source(
VideoCapturerTrackSource::Create(worker_thread_, capturer, false));
return VideoTrackSourceProxy::Create(signaling_thread_, source);
return VideoTrackSourceProxy::Create(signaling_thread_, worker_thread_,
source);
}
bool PeerConnectionFactory::StartAecDump(rtc::PlatformFile file,
@ -321,7 +323,7 @@ rtc::scoped_refptr<VideoTrackInterface> PeerConnectionFactory::CreateVideoTrack(
RTC_DCHECK(signaling_thread_->IsCurrent());
rtc::scoped_refptr<VideoTrackInterface> track(
VideoTrack::Create(id, source));
return VideoTrackProxy::Create(signaling_thread_, track);
return VideoTrackProxy::Create(signaling_thread_, worker_thread_, track);
}
rtc::scoped_refptr<AudioTrackInterface>

View File

@ -30,18 +30,20 @@ BEGIN_PROXY_MAP(PeerConnectionFactory)
rtc::scoped_ptr<cricket::PortAllocator> a3,
rtc::scoped_ptr<DtlsIdentityStoreInterface> a4,
PeerConnectionObserver* a5) override {
return owner_thread_->Invoke<rtc::scoped_refptr<PeerConnectionInterface>>(
rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot, this,
a1, a2, a3.release(), a4.release(), a5));
return signaling_thread_
->Invoke<rtc::scoped_refptr<PeerConnectionInterface>>(
rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot,
this, a1, a2, a3.release(), a4.release(), a5));
}
rtc::scoped_refptr<PeerConnectionInterface> CreatePeerConnection(
const PeerConnectionInterface::RTCConfiguration& a1,
rtc::scoped_ptr<cricket::PortAllocator> a3,
rtc::scoped_ptr<DtlsIdentityStoreInterface> a4,
PeerConnectionObserver* a5) override {
return owner_thread_->Invoke<rtc::scoped_refptr<PeerConnectionInterface>>(
rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot, this,
a1, a3.release(), a4.release(), a5));
return signaling_thread_
->Invoke<rtc::scoped_refptr<PeerConnectionInterface>>(
rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot,
this, a1, a3.release(), a4.release(), a5));
}
PROXY_METHOD1(rtc::scoped_refptr<MediaStreamInterface>,
CreateLocalMediaStream, const std::string&)

View File

@ -295,69 +295,106 @@ class MethodCall5 : public rtc::Message,
T5 a5_;
};
// TODO(nisse): Rename this to {BEGIN|END}_SIGNALLING_PROXY_MAP, and
// the below to {BEGIN|END}_PROXY_MAP. Also rename the class to
// c##SignallingProxy.
#define BEGIN_PROXY_MAP(c) \
class c##Proxy : public c##Interface { \
protected: \
typedef c##Interface C; \
c##Proxy(rtc::Thread* thread, C* c) : owner_thread_(thread), c_(c) {} \
c##Proxy(rtc::Thread* signaling_thread, C* c) \
: signaling_thread_(signaling_thread), c_(c) {} \
~c##Proxy() { \
MethodCall0<c##Proxy, void> call(this, &c##Proxy::Release_s); \
call.Marshal(owner_thread_); \
call.Marshal(signaling_thread_); \
} \
\
public: \
static rtc::scoped_refptr<C> Create(rtc::Thread* thread, C* c) { \
return new rtc::RefCountedObject<c##Proxy>(thread, c); \
static rtc::scoped_refptr<C> Create(rtc::Thread* signaling_thread, C* c) { \
return new rtc::RefCountedObject<c##Proxy>(signaling_thread, c); \
}
#define BEGIN_WORKER_PROXY_MAP(c) \
class c##Proxy : public c##Interface { \
protected: \
typedef c##Interface C; \
c##Proxy(rtc::Thread* signaling_thread, rtc::Thread* worker_thread, C* c) \
: signaling_thread_(signaling_thread), \
worker_thread_(worker_thread), \
c_(c) {} \
~c##Proxy() { \
MethodCall0<c##Proxy, void> call(this, &c##Proxy::Release_s); \
call.Marshal(signaling_thread_); \
} \
\
public: \
static rtc::scoped_refptr<C> Create( \
rtc::Thread* signaling_thread, rtc::Thread* worker_thread, C* c) { \
return new rtc::RefCountedObject<c##Proxy>( \
signaling_thread, worker_thread, c); \
}
#define PROXY_METHOD0(r, method) \
r method() override { \
MethodCall0<C, r> call(c_.get(), &C::method); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
#define PROXY_CONSTMETHOD0(r, method) \
r method() const override { \
ConstMethodCall0<C, r> call(c_.get(), &C::method); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
#define PROXY_METHOD1(r, method, t1) \
r method(t1 a1) override { \
MethodCall1<C, r, t1> call(c_.get(), &C::method, a1); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
#define PROXY_CONSTMETHOD1(r, method, t1) \
r method(t1 a1) const override { \
ConstMethodCall1<C, r, t1> call(c_.get(), &C::method, a1); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
#define PROXY_METHOD2(r, method, t1, t2) \
r method(t1 a1, t2 a2) override { \
MethodCall2<C, r, t1, t2> call(c_.get(), &C::method, a1, a2); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
#define PROXY_METHOD3(r, method, t1, t2, t3) \
r method(t1 a1, t2 a2, t3 a3) override { \
MethodCall3<C, r, t1, t2, t3> call(c_.get(), &C::method, a1, a2, a3); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
#define PROXY_METHOD4(r, method, t1, t2, t3, t4) \
r method(t1 a1, t2 a2, t3 a3, t4 a4) override { \
MethodCall4<C, r, t1, t2, t3, t4> call(c_.get(), &C::method, a1, a2, a3, \
a4); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
#define PROXY_METHOD5(r, method, t1, t2, t3, t4, t5) \
r method(t1 a1, t2 a2, t3 a3, t4 a4, t5 a5) override { \
MethodCall5<C, r, t1, t2, t3, t4, t5> call(c_.get(), &C::method, a1, a2, \
a3, a4, a5); \
return call.Marshal(owner_thread_); \
return call.Marshal(signaling_thread_); \
}
// Define methods which should be invoked on the worker thread.
#define PROXY_WORKER_METHOD1(r, method, t1) \
r method(t1 a1) override { \
MethodCall1<C, r, t1> call(c_.get(), &C::method, a1); \
return call.Marshal(worker_thread_); \
}
#define PROXY_WORKER_METHOD2(r, method, t1, t2) \
r method(t1 a1, t2 a2) override { \
MethodCall2<C, r, t1, t2> call(c_.get(), &C::method, a1, a2); \
return call.Marshal(worker_thread_); \
}
#define END_PROXY() \
@ -365,10 +402,20 @@ class MethodCall5 : public rtc::Message,
void Release_s() {\
c_ = NULL;\
}\
mutable rtc::Thread* owner_thread_;\
mutable rtc::Thread* signaling_thread_;\
rtc::scoped_refptr<C> c_;\
};\
#define END_WORKER_PROXY() \
private: \
void Release_s() { \
c_ = NULL; \
} \
mutable rtc::Thread* signaling_thread_; \
mutable rtc::Thread* worker_thread_; \
rtc::scoped_refptr<C> c_; \
}; \
} // namespace webrtc
#endif // WEBRTC_API_PROXY_H_

View File

@ -12,6 +12,7 @@
#include "webrtc/api/mediastreamtrackproxy.h"
#include "webrtc/api/audiotrack.h"
#include "webrtc/api/videosourceproxy.h"
#include "webrtc/api/videotrack.h"
namespace webrtc {
@ -81,11 +82,15 @@ VideoRtpReceiver::VideoRtpReceiver(MediaStreamInterface* stream,
ssrc_(ssrc),
provider_(provider),
source_(new RefCountedObject<VideoTrackSource>(&broadcaster_,
worker_thread,
true /* remote */)),
track_(VideoTrackProxy::Create(
rtc::Thread::Current(),
VideoTrack::Create(track_id, source_.get()))) {
worker_thread,
VideoTrack::Create(
track_id,
VideoTrackSourceProxy::Create(rtc::Thread::Current(),
worker_thread,
source_)))) {
source_->SetState(MediaSourceInterface::kLive);
provider_->SetVideoPlayout(ssrc_, true, &broadcaster_);
stream->AddTrack(track_);

View File

@ -30,7 +30,6 @@ class FakeVideoTrackSource : public VideoTrackSource {
protected:
FakeVideoTrackSource()
: VideoTrackSource(&fake_video_capturer_,
rtc::Thread::Current(),
false /* remote */) {}
virtual ~FakeVideoTrackSource() {}

View File

@ -286,8 +286,9 @@ VideoCapturerTrackSource::VideoCapturerTrackSource(
rtc::Thread* worker_thread,
cricket::VideoCapturer* capturer,
bool remote)
: VideoTrackSource(capturer, worker_thread, remote),
: VideoTrackSource(capturer, remote),
signaling_thread_(rtc::Thread::Current()),
worker_thread_(worker_thread),
video_capturer_(capturer),
started_(false) {
video_capturer_->SignalStateChange.connect(
@ -350,7 +351,7 @@ void VideoCapturerTrackSource::Initialize(
format_ = GetBestCaptureFormat(formats);
// Start the camera with our best guess.
if (!worker_thread()->Invoke<bool>(
if (!worker_thread_->Invoke<bool>(
rtc::Bind(&cricket::VideoCapturer::StartCapturing,
video_capturer_.get(), format_))) {
SetState(kEnded);
@ -370,7 +371,7 @@ void VideoCapturerTrackSource::Stop() {
return;
}
started_ = false;
worker_thread()->Invoke<void>(
worker_thread_->Invoke<void>(
rtc::Bind(&cricket::VideoCapturer::Stop, video_capturer_.get()));
}
@ -378,7 +379,7 @@ void VideoCapturerTrackSource::Restart() {
if (started_) {
return;
}
if (!worker_thread()->Invoke<bool>(
if (!worker_thread_->Invoke<bool>(
rtc::Bind(&cricket::VideoCapturer::StartCapturing,
video_capturer_.get(), format_))) {
SetState(kEnded);

View File

@ -75,6 +75,7 @@ class VideoCapturerTrackSource : public VideoTrackSource,
cricket::CaptureState capture_state);
rtc::Thread* signaling_thread_;
rtc::Thread* worker_thread_;
rtc::AsyncInvoker invoker_;
rtc::scoped_ptr<cricket::VideoCapturer> video_capturer_;
bool started_;

View File

@ -20,23 +20,25 @@ namespace webrtc {
// implementation is
// destroyed on the signaling thread and marshals all method calls to the
// signaling thread.
BEGIN_PROXY_MAP(VideoTrackSource)
PROXY_CONSTMETHOD0(SourceState, state)
PROXY_CONSTMETHOD0(bool, remote)
PROXY_METHOD0(cricket::VideoCapturer*, GetVideoCapturer)
PROXY_METHOD0(void, Stop)
PROXY_METHOD0(void, Restart)
PROXY_CONSTMETHOD0(bool, is_screencast)
PROXY_CONSTMETHOD0(rtc::Optional<bool>, needs_denoising)
PROXY_METHOD1(bool, GetStats, Stats*)
PROXY_METHOD2(void,
AddOrUpdateSink,
rtc::VideoSinkInterface<cricket::VideoFrame>*,
const rtc::VideoSinkWants&)
PROXY_METHOD1(void, RemoveSink, rtc::VideoSinkInterface<cricket::VideoFrame>*)
PROXY_METHOD1(void, RegisterObserver, ObserverInterface*)
PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*)
END_PROXY()
BEGIN_WORKER_PROXY_MAP(VideoTrackSource)
PROXY_CONSTMETHOD0(SourceState, state)
PROXY_CONSTMETHOD0(bool, remote)
PROXY_METHOD0(cricket::VideoCapturer*, GetVideoCapturer)
PROXY_METHOD0(void, Stop)
PROXY_METHOD0(void, Restart)
PROXY_CONSTMETHOD0(bool, is_screencast)
PROXY_CONSTMETHOD0(rtc::Optional<bool>, needs_denoising)
PROXY_METHOD1(bool, GetStats, Stats*)
PROXY_WORKER_METHOD2(void,
AddOrUpdateSink,
rtc::VideoSinkInterface<cricket::VideoFrame>*,
const rtc::VideoSinkWants&)
PROXY_WORKER_METHOD1(void,
RemoveSink,
rtc::VideoSinkInterface<cricket::VideoFrame>*)
PROXY_METHOD1(void, RegisterObserver, ObserverInterface*)
PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*)
END_WORKER_PROXY()
} // namespace webrtc

View File

@ -20,6 +20,7 @@ VideoTrack::VideoTrack(const std::string& label,
VideoTrackSourceInterface* video_source)
: MediaStreamTrack<VideoTrackInterface>(label),
video_source_(video_source) {
worker_thread_checker_.DetachFromThread();
video_source_->RegisterObserver(this);
}
@ -31,10 +32,12 @@ std::string VideoTrack::kind() const {
return kVideoKind;
}
// AddOrUpdateSink and RemoveSink should be called on the worker
// thread.
void VideoTrack::AddOrUpdateSink(
rtc::VideoSinkInterface<cricket::VideoFrame>* sink,
const rtc::VideoSinkWants& wants) {
RTC_DCHECK(thread_checker_.CalledOnValidThread());
RTC_DCHECK(worker_thread_checker_.CalledOnValidThread());
VideoSourceBase::AddOrUpdateSink(sink, wants);
rtc::VideoSinkWants modified_wants = wants;
modified_wants.black_frames = !enabled();
@ -43,23 +46,25 @@ void VideoTrack::AddOrUpdateSink(
void VideoTrack::RemoveSink(
rtc::VideoSinkInterface<cricket::VideoFrame>* sink) {
RTC_DCHECK(thread_checker_.CalledOnValidThread());
RTC_DCHECK(worker_thread_checker_.CalledOnValidThread());
VideoSourceBase::RemoveSink(sink);
video_source_->RemoveSink(sink);
}
bool VideoTrack::set_enabled(bool enable) {
RTC_DCHECK(thread_checker_.CalledOnValidThread());
RTC_DCHECK(signaling_thread_checker_.CalledOnValidThread());
for (auto& sink_pair : sink_pairs()) {
rtc::VideoSinkWants modified_wants = sink_pair.wants;
modified_wants.black_frames = !enable;
// video_source_ is a proxy object, marshalling the call to the
// worker thread.
video_source_->AddOrUpdateSink(sink_pair.sink, modified_wants);
}
return MediaStreamTrack<VideoTrackInterface>::set_enabled(enable);
}
void VideoTrack::OnChanged() {
RTC_DCHECK(thread_checker_.CalledOnValidThread());
RTC_DCHECK(signaling_thread_checker_.CalledOnValidThread());
if (video_source_->state() == MediaSourceInterface::kEnded) {
set_state(kEnded);
} else {

View File

@ -47,7 +47,8 @@ class VideoTrack : public MediaStreamTrack<VideoTrackInterface>,
// Implements ObserverInterface. Observes |video_source_| state.
void OnChanged() override;
rtc::ThreadChecker thread_checker_;
rtc::ThreadChecker signaling_thread_checker_;
rtc::ThreadChecker worker_thread_checker_;
rtc::scoped_refptr<VideoTrackSourceInterface> video_source_;
};

View File

@ -31,7 +31,7 @@ class VideoTrackTest : public testing::Test {
VideoTrackTest() {
static const char kVideoTrackId[] = "track_id";
video_track_source_ = new rtc::RefCountedObject<VideoTrackSource>(
&capturer_, rtc::Thread::Current(), true /* remote */);
&capturer_, true /* remote */);
video_track_ = VideoTrack::Create(kVideoTrackId, video_track_source_);
capturer_.Start(
cricket::VideoFormat(640, 480, cricket::VideoFormat::FpsToInterval(30),

View File

@ -12,18 +12,14 @@
#include <string>
#include "webrtc/base/bind.h"
namespace webrtc {
VideoTrackSource::VideoTrackSource(
rtc::VideoSourceInterface<cricket::VideoFrame>* source,
rtc::Thread* worker_thread,
bool remote)
: source_(source),
worker_thread_(worker_thread),
state_(kInitializing),
remote_(remote) {}
: source_(source), state_(kInitializing), remote_(remote) {
worker_thread_checker_.DetachFromThread();
}
void VideoTrackSource::SetState(SourceState new_state) {
if (state_ != new_state) {
@ -39,22 +35,20 @@ void VideoTrackSource::OnSourceDestroyed() {
void VideoTrackSource::AddOrUpdateSink(
rtc::VideoSinkInterface<cricket::VideoFrame>* sink,
const rtc::VideoSinkWants& wants) {
RTC_DCHECK(worker_thread_checker_.CalledOnValidThread());
if (!source_) {
return;
}
worker_thread_->Invoke<void>(rtc::Bind(
&rtc::VideoSourceInterface<cricket::VideoFrame>::AddOrUpdateSink, source_,
sink, wants));
source_->AddOrUpdateSink(sink, wants);
}
void VideoTrackSource::RemoveSink(
rtc::VideoSinkInterface<cricket::VideoFrame>* sink) {
RTC_DCHECK(worker_thread_checker_.CalledOnValidThread());
if (!source_) {
return;
}
worker_thread_->Invoke<void>(
rtc::Bind(&rtc::VideoSourceInterface<cricket::VideoFrame>::RemoveSink,
source_, sink));
source_->RemoveSink(sink);
}
} // namespace webrtc

View File

@ -13,6 +13,7 @@
#include "webrtc/api/mediastreaminterface.h"
#include "webrtc/api/notifier.h"
#include "webrtc/base/thread_checker.h"
#include "webrtc/media/base/mediachannel.h"
#include "webrtc/media/base/videosinkinterface.h"
@ -22,7 +23,6 @@ namespace webrtc {
class VideoTrackSource : public Notifier<VideoTrackSourceInterface> {
public:
VideoTrackSource(rtc::VideoSourceInterface<cricket::VideoFrame>* source,
rtc::Thread* worker_thread,
bool remote);
void SetState(SourceState new_state);
// OnSourceDestroyed clears this instance pointer to |source_|. It is useful
@ -48,12 +48,9 @@ class VideoTrackSource : public Notifier<VideoTrackSourceInterface> {
cricket::VideoCapturer* GetVideoCapturer() override { return nullptr; }
protected:
rtc::Thread* worker_thread() { return worker_thread_; }
private:
rtc::ThreadChecker worker_thread_checker_;
rtc::VideoSourceInterface<cricket::VideoFrame>* source_;
rtc::Thread* worker_thread_;
cricket::VideoOptions options_;
SourceState state_;
const bool remote_;