From 5b68ab50bbd6ada3725aa1aeac3e87242986b6be Mon Sep 17 00:00:00 2001 From: nisse Date: Thu, 7 Apr 2016 07:45:54 -0700 Subject: [PATCH] Extended proxy abstraction, to call certain methods to the worker thread. Extracted from cl https://codereview.webrtc.org/1766653002/, where AddOrUpdateSink results in a deadlock. BUG=webrtc:5426 Review URL: https://codereview.webrtc.org/1861633002 Cr-Commit-Position: refs/heads/master@{#12281} --- webrtc/api/mediastreamtrackproxy.h | 10 ++-- webrtc/api/peerconnectionfactory.cc | 8 ++- webrtc/api/peerconnectionfactoryproxy.h | 14 +++-- webrtc/api/proxy.h | 73 ++++++++++++++++++++----- webrtc/api/rtpreceiver.cc | 9 ++- webrtc/api/test/fakevideotracksource.h | 1 - webrtc/api/videocapturertracksource.cc | 9 +-- webrtc/api/videocapturertracksource.h | 1 + webrtc/api/videosourceproxy.h | 36 ++++++------ webrtc/api/videotrack.cc | 13 +++-- webrtc/api/videotrack.h | 3 +- webrtc/api/videotrack_unittest.cc | 2 +- webrtc/api/videotracksource.cc | 20 +++---- webrtc/api/videotracksource.h | 7 +-- 14 files changed, 132 insertions(+), 74 deletions(-) diff --git a/webrtc/api/mediastreamtrackproxy.h b/webrtc/api/mediastreamtrackproxy.h index f68773223b..6ca6dbbe80 100644 --- a/webrtc/api/mediastreamtrackproxy.h +++ b/webrtc/api/mediastreamtrackproxy.h @@ -35,22 +35,24 @@ BEGIN_PROXY_MAP(AudioTrack) PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*) END_PROXY() -BEGIN_PROXY_MAP(VideoTrack) +BEGIN_WORKER_PROXY_MAP(VideoTrack) PROXY_CONSTMETHOD0(std::string, kind) PROXY_CONSTMETHOD0(std::string, id) PROXY_CONSTMETHOD0(TrackState, state) PROXY_CONSTMETHOD0(bool, enabled) PROXY_METHOD1(bool, set_enabled, bool) - PROXY_METHOD2(void, + PROXY_WORKER_METHOD2(void, AddOrUpdateSink, rtc::VideoSinkInterface*, const rtc::VideoSinkWants&) - PROXY_METHOD1(void, RemoveSink, rtc::VideoSinkInterface*) + PROXY_WORKER_METHOD1(void, + RemoveSink, + rtc::VideoSinkInterface*) PROXY_CONSTMETHOD0(VideoTrackSourceInterface*, GetSource) PROXY_METHOD1(void, RegisterObserver, ObserverInterface*) PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*) -END_PROXY() +END_WORKER_PROXY() } // namespace webrtc diff --git a/webrtc/api/peerconnectionfactory.cc b/webrtc/api/peerconnectionfactory.cc index 852b7a8cce..c3c120ce7a 100644 --- a/webrtc/api/peerconnectionfactory.cc +++ b/webrtc/api/peerconnectionfactory.cc @@ -211,7 +211,8 @@ PeerConnectionFactory::CreateVideoSource( rtc::scoped_refptr source( VideoCapturerTrackSource::Create(worker_thread_, capturer, constraints, false)); - return VideoTrackSourceProxy::Create(signaling_thread_, source); + return VideoTrackSourceProxy::Create(signaling_thread_, worker_thread_, + source); } rtc::scoped_refptr @@ -219,7 +220,8 @@ PeerConnectionFactory::CreateVideoSource(cricket::VideoCapturer* capturer) { RTC_DCHECK(signaling_thread_->IsCurrent()); rtc::scoped_refptr source( VideoCapturerTrackSource::Create(worker_thread_, capturer, false)); - return VideoTrackSourceProxy::Create(signaling_thread_, source); + return VideoTrackSourceProxy::Create(signaling_thread_, worker_thread_, + source); } bool PeerConnectionFactory::StartAecDump(rtc::PlatformFile file, @@ -321,7 +323,7 @@ rtc::scoped_refptr PeerConnectionFactory::CreateVideoTrack( RTC_DCHECK(signaling_thread_->IsCurrent()); rtc::scoped_refptr track( VideoTrack::Create(id, source)); - return VideoTrackProxy::Create(signaling_thread_, track); + return VideoTrackProxy::Create(signaling_thread_, worker_thread_, track); } rtc::scoped_refptr diff --git a/webrtc/api/peerconnectionfactoryproxy.h b/webrtc/api/peerconnectionfactoryproxy.h index 829bf8177e..a25e93b4f5 100644 --- a/webrtc/api/peerconnectionfactoryproxy.h +++ b/webrtc/api/peerconnectionfactoryproxy.h @@ -30,18 +30,20 @@ BEGIN_PROXY_MAP(PeerConnectionFactory) rtc::scoped_ptr a3, rtc::scoped_ptr a4, PeerConnectionObserver* a5) override { - return owner_thread_->Invoke>( - rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot, this, - a1, a2, a3.release(), a4.release(), a5)); + return signaling_thread_ + ->Invoke>( + rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot, + this, a1, a2, a3.release(), a4.release(), a5)); } rtc::scoped_refptr CreatePeerConnection( const PeerConnectionInterface::RTCConfiguration& a1, rtc::scoped_ptr a3, rtc::scoped_ptr a4, PeerConnectionObserver* a5) override { - return owner_thread_->Invoke>( - rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot, this, - a1, a3.release(), a4.release(), a5)); + return signaling_thread_ + ->Invoke>( + rtc::Bind(&PeerConnectionFactoryProxy::CreatePeerConnection_ot, + this, a1, a3.release(), a4.release(), a5)); } PROXY_METHOD1(rtc::scoped_refptr, CreateLocalMediaStream, const std::string&) diff --git a/webrtc/api/proxy.h b/webrtc/api/proxy.h index 1351a0427e..b5a8071bcf 100644 --- a/webrtc/api/proxy.h +++ b/webrtc/api/proxy.h @@ -295,69 +295,106 @@ class MethodCall5 : public rtc::Message, T5 a5_; }; +// TODO(nisse): Rename this to {BEGIN|END}_SIGNALLING_PROXY_MAP, and +// the below to {BEGIN|END}_PROXY_MAP. Also rename the class to +// c##SignallingProxy. #define BEGIN_PROXY_MAP(c) \ class c##Proxy : public c##Interface { \ protected: \ typedef c##Interface C; \ - c##Proxy(rtc::Thread* thread, C* c) : owner_thread_(thread), c_(c) {} \ + c##Proxy(rtc::Thread* signaling_thread, C* c) \ + : signaling_thread_(signaling_thread), c_(c) {} \ ~c##Proxy() { \ MethodCall0 call(this, &c##Proxy::Release_s); \ - call.Marshal(owner_thread_); \ + call.Marshal(signaling_thread_); \ } \ \ public: \ - static rtc::scoped_refptr Create(rtc::Thread* thread, C* c) { \ - return new rtc::RefCountedObject(thread, c); \ + static rtc::scoped_refptr Create(rtc::Thread* signaling_thread, C* c) { \ + return new rtc::RefCountedObject(signaling_thread, c); \ + } + +#define BEGIN_WORKER_PROXY_MAP(c) \ + class c##Proxy : public c##Interface { \ + protected: \ + typedef c##Interface C; \ + c##Proxy(rtc::Thread* signaling_thread, rtc::Thread* worker_thread, C* c) \ + : signaling_thread_(signaling_thread), \ + worker_thread_(worker_thread), \ + c_(c) {} \ + ~c##Proxy() { \ + MethodCall0 call(this, &c##Proxy::Release_s); \ + call.Marshal(signaling_thread_); \ + } \ + \ + public: \ + static rtc::scoped_refptr Create( \ + rtc::Thread* signaling_thread, rtc::Thread* worker_thread, C* c) { \ + return new rtc::RefCountedObject( \ + signaling_thread, worker_thread, c); \ } #define PROXY_METHOD0(r, method) \ r method() override { \ MethodCall0 call(c_.get(), &C::method); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ } #define PROXY_CONSTMETHOD0(r, method) \ r method() const override { \ ConstMethodCall0 call(c_.get(), &C::method); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ } #define PROXY_METHOD1(r, method, t1) \ r method(t1 a1) override { \ MethodCall1 call(c_.get(), &C::method, a1); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ } #define PROXY_CONSTMETHOD1(r, method, t1) \ r method(t1 a1) const override { \ ConstMethodCall1 call(c_.get(), &C::method, a1); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ } #define PROXY_METHOD2(r, method, t1, t2) \ r method(t1 a1, t2 a2) override { \ MethodCall2 call(c_.get(), &C::method, a1, a2); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ } #define PROXY_METHOD3(r, method, t1, t2, t3) \ r method(t1 a1, t2 a2, t3 a3) override { \ MethodCall3 call(c_.get(), &C::method, a1, a2, a3); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ } #define PROXY_METHOD4(r, method, t1, t2, t3, t4) \ r method(t1 a1, t2 a2, t3 a3, t4 a4) override { \ MethodCall4 call(c_.get(), &C::method, a1, a2, a3, \ a4); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ } #define PROXY_METHOD5(r, method, t1, t2, t3, t4, t5) \ r method(t1 a1, t2 a2, t3 a3, t4 a4, t5 a5) override { \ MethodCall5 call(c_.get(), &C::method, a1, a2, \ a3, a4, a5); \ - return call.Marshal(owner_thread_); \ + return call.Marshal(signaling_thread_); \ + } + +// Define methods which should be invoked on the worker thread. +#define PROXY_WORKER_METHOD1(r, method, t1) \ + r method(t1 a1) override { \ + MethodCall1 call(c_.get(), &C::method, a1); \ + return call.Marshal(worker_thread_); \ + } + +#define PROXY_WORKER_METHOD2(r, method, t1, t2) \ + r method(t1 a1, t2 a2) override { \ + MethodCall2 call(c_.get(), &C::method, a1, a2); \ + return call.Marshal(worker_thread_); \ } #define END_PROXY() \ @@ -365,10 +402,20 @@ class MethodCall5 : public rtc::Message, void Release_s() {\ c_ = NULL;\ }\ - mutable rtc::Thread* owner_thread_;\ + mutable rtc::Thread* signaling_thread_;\ rtc::scoped_refptr c_;\ };\ +#define END_WORKER_PROXY() \ + private: \ + void Release_s() { \ + c_ = NULL; \ + } \ + mutable rtc::Thread* signaling_thread_; \ + mutable rtc::Thread* worker_thread_; \ + rtc::scoped_refptr c_; \ + }; \ + } // namespace webrtc #endif // WEBRTC_API_PROXY_H_ diff --git a/webrtc/api/rtpreceiver.cc b/webrtc/api/rtpreceiver.cc index 0150dfd546..38245dcccf 100644 --- a/webrtc/api/rtpreceiver.cc +++ b/webrtc/api/rtpreceiver.cc @@ -12,6 +12,7 @@ #include "webrtc/api/mediastreamtrackproxy.h" #include "webrtc/api/audiotrack.h" +#include "webrtc/api/videosourceproxy.h" #include "webrtc/api/videotrack.h" namespace webrtc { @@ -81,11 +82,15 @@ VideoRtpReceiver::VideoRtpReceiver(MediaStreamInterface* stream, ssrc_(ssrc), provider_(provider), source_(new RefCountedObject(&broadcaster_, - worker_thread, true /* remote */)), track_(VideoTrackProxy::Create( rtc::Thread::Current(), - VideoTrack::Create(track_id, source_.get()))) { + worker_thread, + VideoTrack::Create( + track_id, + VideoTrackSourceProxy::Create(rtc::Thread::Current(), + worker_thread, + source_)))) { source_->SetState(MediaSourceInterface::kLive); provider_->SetVideoPlayout(ssrc_, true, &broadcaster_); stream->AddTrack(track_); diff --git a/webrtc/api/test/fakevideotracksource.h b/webrtc/api/test/fakevideotracksource.h index 0b70a56b4e..1cb264b736 100644 --- a/webrtc/api/test/fakevideotracksource.h +++ b/webrtc/api/test/fakevideotracksource.h @@ -30,7 +30,6 @@ class FakeVideoTrackSource : public VideoTrackSource { protected: FakeVideoTrackSource() : VideoTrackSource(&fake_video_capturer_, - rtc::Thread::Current(), false /* remote */) {} virtual ~FakeVideoTrackSource() {} diff --git a/webrtc/api/videocapturertracksource.cc b/webrtc/api/videocapturertracksource.cc index cb539614be..b99a2d1eda 100644 --- a/webrtc/api/videocapturertracksource.cc +++ b/webrtc/api/videocapturertracksource.cc @@ -286,8 +286,9 @@ VideoCapturerTrackSource::VideoCapturerTrackSource( rtc::Thread* worker_thread, cricket::VideoCapturer* capturer, bool remote) - : VideoTrackSource(capturer, worker_thread, remote), + : VideoTrackSource(capturer, remote), signaling_thread_(rtc::Thread::Current()), + worker_thread_(worker_thread), video_capturer_(capturer), started_(false) { video_capturer_->SignalStateChange.connect( @@ -350,7 +351,7 @@ void VideoCapturerTrackSource::Initialize( format_ = GetBestCaptureFormat(formats); // Start the camera with our best guess. - if (!worker_thread()->Invoke( + if (!worker_thread_->Invoke( rtc::Bind(&cricket::VideoCapturer::StartCapturing, video_capturer_.get(), format_))) { SetState(kEnded); @@ -370,7 +371,7 @@ void VideoCapturerTrackSource::Stop() { return; } started_ = false; - worker_thread()->Invoke( + worker_thread_->Invoke( rtc::Bind(&cricket::VideoCapturer::Stop, video_capturer_.get())); } @@ -378,7 +379,7 @@ void VideoCapturerTrackSource::Restart() { if (started_) { return; } - if (!worker_thread()->Invoke( + if (!worker_thread_->Invoke( rtc::Bind(&cricket::VideoCapturer::StartCapturing, video_capturer_.get(), format_))) { SetState(kEnded); diff --git a/webrtc/api/videocapturertracksource.h b/webrtc/api/videocapturertracksource.h index 0d1142deba..96f8bffc23 100644 --- a/webrtc/api/videocapturertracksource.h +++ b/webrtc/api/videocapturertracksource.h @@ -75,6 +75,7 @@ class VideoCapturerTrackSource : public VideoTrackSource, cricket::CaptureState capture_state); rtc::Thread* signaling_thread_; + rtc::Thread* worker_thread_; rtc::AsyncInvoker invoker_; rtc::scoped_ptr video_capturer_; bool started_; diff --git a/webrtc/api/videosourceproxy.h b/webrtc/api/videosourceproxy.h index f43c0db69f..4d687d4625 100644 --- a/webrtc/api/videosourceproxy.h +++ b/webrtc/api/videosourceproxy.h @@ -20,23 +20,25 @@ namespace webrtc { // implementation is // destroyed on the signaling thread and marshals all method calls to the // signaling thread. -BEGIN_PROXY_MAP(VideoTrackSource) -PROXY_CONSTMETHOD0(SourceState, state) -PROXY_CONSTMETHOD0(bool, remote) -PROXY_METHOD0(cricket::VideoCapturer*, GetVideoCapturer) -PROXY_METHOD0(void, Stop) -PROXY_METHOD0(void, Restart) -PROXY_CONSTMETHOD0(bool, is_screencast) -PROXY_CONSTMETHOD0(rtc::Optional, needs_denoising) -PROXY_METHOD1(bool, GetStats, Stats*) -PROXY_METHOD2(void, - AddOrUpdateSink, - rtc::VideoSinkInterface*, - const rtc::VideoSinkWants&) -PROXY_METHOD1(void, RemoveSink, rtc::VideoSinkInterface*) -PROXY_METHOD1(void, RegisterObserver, ObserverInterface*) -PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*) -END_PROXY() +BEGIN_WORKER_PROXY_MAP(VideoTrackSource) + PROXY_CONSTMETHOD0(SourceState, state) + PROXY_CONSTMETHOD0(bool, remote) + PROXY_METHOD0(cricket::VideoCapturer*, GetVideoCapturer) + PROXY_METHOD0(void, Stop) + PROXY_METHOD0(void, Restart) + PROXY_CONSTMETHOD0(bool, is_screencast) + PROXY_CONSTMETHOD0(rtc::Optional, needs_denoising) + PROXY_METHOD1(bool, GetStats, Stats*) + PROXY_WORKER_METHOD2(void, + AddOrUpdateSink, + rtc::VideoSinkInterface*, + const rtc::VideoSinkWants&) + PROXY_WORKER_METHOD1(void, + RemoveSink, + rtc::VideoSinkInterface*) + PROXY_METHOD1(void, RegisterObserver, ObserverInterface*) + PROXY_METHOD1(void, UnregisterObserver, ObserverInterface*) +END_WORKER_PROXY() } // namespace webrtc diff --git a/webrtc/api/videotrack.cc b/webrtc/api/videotrack.cc index bd380254b2..234b4cf7ae 100644 --- a/webrtc/api/videotrack.cc +++ b/webrtc/api/videotrack.cc @@ -20,6 +20,7 @@ VideoTrack::VideoTrack(const std::string& label, VideoTrackSourceInterface* video_source) : MediaStreamTrack(label), video_source_(video_source) { + worker_thread_checker_.DetachFromThread(); video_source_->RegisterObserver(this); } @@ -31,10 +32,12 @@ std::string VideoTrack::kind() const { return kVideoKind; } +// AddOrUpdateSink and RemoveSink should be called on the worker +// thread. void VideoTrack::AddOrUpdateSink( rtc::VideoSinkInterface* sink, const rtc::VideoSinkWants& wants) { - RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RTC_DCHECK(worker_thread_checker_.CalledOnValidThread()); VideoSourceBase::AddOrUpdateSink(sink, wants); rtc::VideoSinkWants modified_wants = wants; modified_wants.black_frames = !enabled(); @@ -43,23 +46,25 @@ void VideoTrack::AddOrUpdateSink( void VideoTrack::RemoveSink( rtc::VideoSinkInterface* sink) { - RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RTC_DCHECK(worker_thread_checker_.CalledOnValidThread()); VideoSourceBase::RemoveSink(sink); video_source_->RemoveSink(sink); } bool VideoTrack::set_enabled(bool enable) { - RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RTC_DCHECK(signaling_thread_checker_.CalledOnValidThread()); for (auto& sink_pair : sink_pairs()) { rtc::VideoSinkWants modified_wants = sink_pair.wants; modified_wants.black_frames = !enable; + // video_source_ is a proxy object, marshalling the call to the + // worker thread. video_source_->AddOrUpdateSink(sink_pair.sink, modified_wants); } return MediaStreamTrack::set_enabled(enable); } void VideoTrack::OnChanged() { - RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RTC_DCHECK(signaling_thread_checker_.CalledOnValidThread()); if (video_source_->state() == MediaSourceInterface::kEnded) { set_state(kEnded); } else { diff --git a/webrtc/api/videotrack.h b/webrtc/api/videotrack.h index 3835d2c936..2f875321c1 100644 --- a/webrtc/api/videotrack.h +++ b/webrtc/api/videotrack.h @@ -47,7 +47,8 @@ class VideoTrack : public MediaStreamTrack, // Implements ObserverInterface. Observes |video_source_| state. void OnChanged() override; - rtc::ThreadChecker thread_checker_; + rtc::ThreadChecker signaling_thread_checker_; + rtc::ThreadChecker worker_thread_checker_; rtc::scoped_refptr video_source_; }; diff --git a/webrtc/api/videotrack_unittest.cc b/webrtc/api/videotrack_unittest.cc index d35bcdb2a0..b1cd0a6739 100644 --- a/webrtc/api/videotrack_unittest.cc +++ b/webrtc/api/videotrack_unittest.cc @@ -31,7 +31,7 @@ class VideoTrackTest : public testing::Test { VideoTrackTest() { static const char kVideoTrackId[] = "track_id"; video_track_source_ = new rtc::RefCountedObject( - &capturer_, rtc::Thread::Current(), true /* remote */); + &capturer_, true /* remote */); video_track_ = VideoTrack::Create(kVideoTrackId, video_track_source_); capturer_.Start( cricket::VideoFormat(640, 480, cricket::VideoFormat::FpsToInterval(30), diff --git a/webrtc/api/videotracksource.cc b/webrtc/api/videotracksource.cc index f8212d7a70..17d32fbc71 100644 --- a/webrtc/api/videotracksource.cc +++ b/webrtc/api/videotracksource.cc @@ -12,18 +12,14 @@ #include -#include "webrtc/base/bind.h" - namespace webrtc { VideoTrackSource::VideoTrackSource( rtc::VideoSourceInterface* source, - rtc::Thread* worker_thread, bool remote) - : source_(source), - worker_thread_(worker_thread), - state_(kInitializing), - remote_(remote) {} + : source_(source), state_(kInitializing), remote_(remote) { + worker_thread_checker_.DetachFromThread(); +} void VideoTrackSource::SetState(SourceState new_state) { if (state_ != new_state) { @@ -39,22 +35,20 @@ void VideoTrackSource::OnSourceDestroyed() { void VideoTrackSource::AddOrUpdateSink( rtc::VideoSinkInterface* sink, const rtc::VideoSinkWants& wants) { + RTC_DCHECK(worker_thread_checker_.CalledOnValidThread()); if (!source_) { return; } - worker_thread_->Invoke(rtc::Bind( - &rtc::VideoSourceInterface::AddOrUpdateSink, source_, - sink, wants)); + source_->AddOrUpdateSink(sink, wants); } void VideoTrackSource::RemoveSink( rtc::VideoSinkInterface* sink) { + RTC_DCHECK(worker_thread_checker_.CalledOnValidThread()); if (!source_) { return; } - worker_thread_->Invoke( - rtc::Bind(&rtc::VideoSourceInterface::RemoveSink, - source_, sink)); + source_->RemoveSink(sink); } } // namespace webrtc diff --git a/webrtc/api/videotracksource.h b/webrtc/api/videotracksource.h index 108209dc2c..3d8d502c8f 100644 --- a/webrtc/api/videotracksource.h +++ b/webrtc/api/videotracksource.h @@ -13,6 +13,7 @@ #include "webrtc/api/mediastreaminterface.h" #include "webrtc/api/notifier.h" +#include "webrtc/base/thread_checker.h" #include "webrtc/media/base/mediachannel.h" #include "webrtc/media/base/videosinkinterface.h" @@ -22,7 +23,6 @@ namespace webrtc { class VideoTrackSource : public Notifier { public: VideoTrackSource(rtc::VideoSourceInterface* source, - rtc::Thread* worker_thread, bool remote); void SetState(SourceState new_state); // OnSourceDestroyed clears this instance pointer to |source_|. It is useful @@ -48,12 +48,9 @@ class VideoTrackSource : public Notifier { cricket::VideoCapturer* GetVideoCapturer() override { return nullptr; } - protected: - rtc::Thread* worker_thread() { return worker_thread_; } - private: + rtc::ThreadChecker worker_thread_checker_; rtc::VideoSourceInterface* source_; - rtc::Thread* worker_thread_; cricket::VideoOptions options_; SourceState state_; const bool remote_;