diff --git a/modules/video_coding/video_coding_impl.h b/modules/video_coding/video_coding_impl.h index 8a442c8225..3b692d1bc6 100644 --- a/modules/video_coding/video_coding_impl.h +++ b/modules/video_coding/video_coding_impl.h @@ -143,7 +143,7 @@ class VideoReceiver : public Module { VCMTiming* timing, NackSender* nack_sender = nullptr, KeyFrameRequestSender* keyframe_request_sender = nullptr); - ~VideoReceiver(); + ~VideoReceiver() override; int32_t RegisterReceiveCodec(const VideoCodec* receiveCodec, int32_t numberOfCores, @@ -161,9 +161,6 @@ class VideoReceiver : public Module { int32_t Decode(const webrtc::VCMEncodedFrame* frame); - // Called on the decoder thread when thread is exiting. - void DecodingStopped(); - int32_t IncomingPacket(const uint8_t* incomingPayload, size_t payloadLength, const WebRtcRTPHeader& rtpInfo); @@ -188,41 +185,72 @@ class VideoReceiver : public Module { int64_t TimeUntilNextProcess() override; void Process() override; + void ProcessThreadAttached(ProcessThread* process_thread) override; void TriggerDecoderShutdown(); + // Notification methods that are used to check our internal state and validate + // threading assumptions. These are called by VideoReceiveStream. + // See |IsDecoderThreadRunning()| for more details. + void DecoderThreadStarting(); + void DecoderThreadStopped(); + protected: - int32_t Decode(const webrtc::VCMEncodedFrame& frame) - RTC_EXCLUSIVE_LOCKS_REQUIRED(receive_crit_); + int32_t Decode(const webrtc::VCMEncodedFrame& frame); int32_t RequestKeyFrame(); private: - rtc::ThreadChecker construction_thread_; + // Used for DCHECKing thread correctness. + // In build where DCHECKs are enabled, will return false before + // DecoderThreadStarting is called, then true until DecoderThreadStopped + // is called. + // In builds where DCHECKs aren't enabled, it will return true. + bool IsDecoderThreadRunning(); + + rtc::ThreadChecker construction_thread_checker_; + rtc::ThreadChecker decoder_thread_checker_; + rtc::ThreadChecker module_thread_checker_; Clock* const clock_; rtc::CriticalSection process_crit_; - rtc::CriticalSection receive_crit_; VCMTiming* _timing; VCMReceiver _receiver; VCMDecodedFrameCallback _decodedFrameCallback; - VCMFrameTypeCallback* _frameTypeCallback RTC_GUARDED_BY(process_crit_); - VCMReceiveStatisticsCallback* _receiveStatsCallback - RTC_GUARDED_BY(process_crit_); - VCMPacketRequestCallback* _packetRequestCallback - RTC_GUARDED_BY(process_crit_); - VCMFrameBuffer _frameFromFile; + // These callbacks are set on the construction thread before being attached + // to the module thread or decoding started, so a lock is not required. + VCMFrameTypeCallback* _frameTypeCallback; + VCMReceiveStatisticsCallback* _receiveStatsCallback; + VCMPacketRequestCallback* _packetRequestCallback; + + // Used on both the module and decoder thread. bool _scheduleKeyRequest RTC_GUARDED_BY(process_crit_); bool drop_frames_until_keyframe_ RTC_GUARDED_BY(process_crit_); - size_t max_nack_list_size_ RTC_GUARDED_BY(process_crit_); - VCMDecoderDataBase _codecDataBase RTC_GUARDED_BY(receive_crit_); - EncodedImageCallback* pre_decode_image_callback_; + // Modified on the construction thread while not attached to the process + // thread. Once attached to the process thread, its value is only read + // so a lock is not required. + size_t max_nack_list_size_; - VCMProcessTimer _receiveStatsTimer; - VCMProcessTimer _retransmissionTimer; - VCMProcessTimer _keyRequestTimer; - QpParser qp_parser_; - ThreadUnsafeOneTimeEvent first_frame_received_; + // Callbacks are set before the decoder thread starts. + // Once the decoder thread has been started, usage of |_codecDataBase| moves + // over to the decoder thread. + VCMDecoderDataBase _codecDataBase; + EncodedImageCallback* const pre_decode_image_callback_; + + VCMProcessTimer _receiveStatsTimer RTC_GUARDED_BY(module_thread_checker_); + VCMProcessTimer _retransmissionTimer RTC_GUARDED_BY(module_thread_checker_); + VCMProcessTimer _keyRequestTimer RTC_GUARDED_BY(module_thread_checker_); + QpParser qp_parser_ RTC_GUARDED_BY(decoder_thread_checker_); + ThreadUnsafeOneTimeEvent first_frame_received_ + RTC_GUARDED_BY(decoder_thread_checker_); + // Modified on the construction thread. Can be read without a lock and assumed + // to be non-null on the module and decoder threads. + ProcessThread* process_thread_ = nullptr; + bool is_attached_to_process_thread_ + RTC_GUARDED_BY(construction_thread_checker_) = false; +#if RTC_DCHECK_IS_ON + bool decoder_thread_is_running_ = false; +#endif }; } // namespace vcm diff --git a/modules/video_coding/video_receiver.cc b/modules/video_coding/video_receiver.cc index 038c523faa..25b2908f0e 100644 --- a/modules/video_coding/video_receiver.cc +++ b/modules/video_coding/video_receiver.cc @@ -10,12 +10,14 @@ #include "common_types.h" // NOLINT(build/include) #include "common_video/libyuv/include/webrtc_libyuv.h" +#include "modules/utility/include/process_thread.h" #include "modules/video_coding/encoded_frame.h" #include "modules/video_coding/include/video_codec_interface.h" #include "modules/video_coding/jitter_buffer.h" #include "modules/video_coding/packet.h" #include "modules/video_coding/video_coding_impl.h" #include "rtc_base/checks.h" +#include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/clock.h" @@ -40,7 +42,6 @@ VideoReceiver::VideoReceiver(Clock* clock, _frameTypeCallback(nullptr), _receiveStatsCallback(nullptr), _packetRequestCallback(nullptr), - _frameFromFile(), _scheduleKeyRequest(false), drop_frames_until_keyframe_(false), max_nack_list_size_(0), @@ -48,18 +49,23 @@ VideoReceiver::VideoReceiver(Clock* clock, pre_decode_image_callback_(pre_decode_image_callback), _receiveStatsTimer(1000, clock_), _retransmissionTimer(10, clock_), - _keyRequestTimer(500, clock_) {} + _keyRequestTimer(500, clock_) { + decoder_thread_checker_.DetachFromThread(); + module_thread_checker_.DetachFromThread(); +} -VideoReceiver::~VideoReceiver() {} +VideoReceiver::~VideoReceiver() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); +} void VideoReceiver::Process() { + RTC_DCHECK_RUN_ON(&module_thread_checker_); // Receive-side statistics // TODO(philipel): Remove this if block when we know what to do with // ReceiveStatisticsProxy::QualitySample. if (_receiveStatsTimer.TimeUntilProcess() == 0) { _receiveStatsTimer.Processed(); - rtc::CritScope cs(&process_crit_); if (_receiveStatsCallback != nullptr) { _receiveStatsCallback->OnReceiveRatesUpdated(0, 0); } @@ -68,10 +74,10 @@ void VideoReceiver::Process() { // Key frame requests if (_keyRequestTimer.TimeUntilProcess() == 0) { _keyRequestTimer.Processed(); - bool request_key_frame = false; - { + bool request_key_frame = _frameTypeCallback != nullptr; + if (request_key_frame) { rtc::CritScope cs(&process_crit_); - request_key_frame = _scheduleKeyRequest && _frameTypeCallback != nullptr; + request_key_frame = _scheduleKeyRequest; } if (request_key_frame) RequestKeyFrame(); @@ -82,13 +88,8 @@ void VideoReceiver::Process() { // disabled when NACK is off. if (_retransmissionTimer.TimeUntilProcess() == 0) { _retransmissionTimer.Processed(); - bool callback_registered = false; - uint16_t length; - { - rtc::CritScope cs(&process_crit_); - length = max_nack_list_size_; - callback_registered = _packetRequestCallback != nullptr; - } + bool callback_registered = _packetRequestCallback != nullptr; + uint16_t length = max_nack_list_size_; if (callback_registered && length > 0) { // Collect sequence numbers from the default receiver. bool request_key_frame = false; @@ -107,7 +108,19 @@ void VideoReceiver::Process() { } } +void VideoReceiver::ProcessThreadAttached(ProcessThread* process_thread) { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + if (process_thread) { + is_attached_to_process_thread_ = true; + RTC_DCHECK(!process_thread_ || process_thread_ == process_thread); + process_thread_ = process_thread; + } else { + is_attached_to_process_thread_ = false; + } +} + int64_t VideoReceiver::TimeUntilNextProcess() { + RTC_DCHECK_RUN_ON(&module_thread_checker_); int64_t timeUntilNextProcess = _receiveStatsTimer.TimeUntilProcess(); if (_receiver.NackMode() != kNoNack) { // We need a Process call more often if we are relying on @@ -122,7 +135,7 @@ int64_t VideoReceiver::TimeUntilNextProcess() { } int32_t VideoReceiver::SetReceiveChannelParameters(int64_t rtt) { - rtc::CritScope cs(&receive_crit_); + RTC_DCHECK_RUN_ON(&module_thread_checker_); _receiver.UpdateRtt(rtt); return 0; } @@ -142,7 +155,6 @@ int32_t VideoReceiver::SetVideoProtection(VCMVideoProtection videoProtection, } case kProtectionNackFEC: { - rtc::CritScope cs(&receive_crit_); RTC_DCHECK(enable); _receiver.SetNackMode(kNack, media_optimization::kLowRttNackMs, @@ -165,20 +177,22 @@ int32_t VideoReceiver::SetVideoProtection(VCMVideoProtection videoProtection, // ready for rendering. int32_t VideoReceiver::RegisterReceiveCallback( VCMReceiveCallback* receiveCallback) { - RTC_DCHECK(construction_thread_.CalledOnValidThread()); - // TODO(tommi): Callback may be null, but only after the decoder thread has - // been stopped. Use the signal we now get that tells us when the decoder - // thread isn't running, to DCHECK that the method is never called while it - // is. Once we're confident, we can remove the lock. - rtc::CritScope cs(&receive_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); + // This value is set before the decoder thread starts and unset after + // the decoder thread has been stopped. _decodedFrameCallback.SetUserReceiveCallback(receiveCallback); return VCM_OK; } int32_t VideoReceiver::RegisterReceiveStatisticsCallback( VCMReceiveStatisticsCallback* receiveStats) { - RTC_DCHECK(construction_thread_.CalledOnValidThread()); - rtc::CritScope cs(&process_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning() && !is_attached_to_process_thread_); + // |_receiver| is used on both the decoder and module threads. + // However, since we make sure that we never do anything on the module thread + // when the decoder thread is not running, we don't need a lock for the + // |_receiver| or |_receiveStatsCallback| here. _receiver.RegisterStatsCallback(receiveStats); _receiveStatsCallback = receiveStats; return VCM_OK; @@ -187,10 +201,8 @@ int32_t VideoReceiver::RegisterReceiveStatisticsCallback( // Register an externally defined decoder object. void VideoReceiver::RegisterExternalDecoder(VideoDecoder* externalDecoder, uint8_t payloadType) { - RTC_DCHECK(construction_thread_.CalledOnValidThread()); - // TODO(tommi): This method must be called when the decoder thread is not - // running. Do we need a lock in that case? - rtc::CritScope cs(&receive_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); if (externalDecoder == nullptr) { RTC_CHECK(_codecDataBase.DeregisterExternalDecoder(payloadType)); return; @@ -201,53 +213,91 @@ void VideoReceiver::RegisterExternalDecoder(VideoDecoder* externalDecoder, // Register a frame type request callback. int32_t VideoReceiver::RegisterFrameTypeCallback( VCMFrameTypeCallback* frameTypeCallback) { - rtc::CritScope cs(&process_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning() && !is_attached_to_process_thread_); + // This callback is used on the module thread, but since we don't get + // callbacks on the module thread while the decoder thread isn't running + // (and this function must not be called when the decoder is running), + // we don't need a lock here. _frameTypeCallback = frameTypeCallback; return VCM_OK; } int32_t VideoReceiver::RegisterPacketRequestCallback( VCMPacketRequestCallback* callback) { - rtc::CritScope cs(&process_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning() && !is_attached_to_process_thread_); + // This callback is used on the module thread, but since we don't get + // callbacks on the module thread while the decoder thread isn't running + // (and this function must not be called when the decoder is running), + // we don't need a lock here. _packetRequestCallback = callback; return VCM_OK; } void VideoReceiver::TriggerDecoderShutdown() { - RTC_DCHECK(construction_thread_.CalledOnValidThread()); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(IsDecoderThreadRunning()); _receiver.TriggerDecoderShutdown(); } +void VideoReceiver::DecoderThreadStarting() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); + if (process_thread_ && !is_attached_to_process_thread_) { + process_thread_->RegisterModule(this, RTC_FROM_HERE); + } +#if RTC_DCHECK_IS_ON + decoder_thread_is_running_ = true; +#endif +} + +void VideoReceiver::DecoderThreadStopped() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(IsDecoderThreadRunning()); + if (process_thread_ && is_attached_to_process_thread_) { + process_thread_->DeRegisterModule(this); + } +#if RTC_DCHECK_IS_ON + decoder_thread_is_running_ = false; + decoder_thread_checker_.DetachFromThread(); +#endif +} + // Decode next frame, blocking. // Should be called as often as possible to get the most out of the decoder. int32_t VideoReceiver::Decode(uint16_t maxWaitTimeMs) { - bool prefer_late_decoding = false; - { - // TODO(tommi): Chances are that this lock isn't required. - rtc::CritScope cs(&receive_crit_); - prefer_late_decoding = _codecDataBase.PrefersLateDecoding(); - } - - VCMEncodedFrame* frame = - _receiver.FrameForDecoding(maxWaitTimeMs, prefer_late_decoding); + RTC_DCHECK_RUN_ON(&decoder_thread_checker_); + VCMEncodedFrame* frame = _receiver.FrameForDecoding( + maxWaitTimeMs, _codecDataBase.PrefersLateDecoding()); if (!frame) return VCM_FRAME_NOT_READY; + bool drop_frame = false; { rtc::CritScope cs(&process_crit_); if (drop_frames_until_keyframe_) { // Still getting delta frames, schedule another keyframe request as if // decode failed. if (frame->FrameType() != kVideoFrameKey) { + drop_frame = true; _scheduleKeyRequest = true; - _receiver.ReleaseFrame(frame); - return VCM_FRAME_NOT_READY; + // TODO(tommi): Consider if we could instead post a task to the module + // thread and call RequestKeyFrame directly. Here we call WakeUp so that + // TimeUntilNextProcess() gets called straight away. + process_thread_->WakeUp(this); + } else { + drop_frames_until_keyframe_ = false; } - drop_frames_until_keyframe_ = false; } } + if (drop_frame) { + _receiver.ReleaseFrame(frame); + return VCM_FRAME_NOT_READY; + } + if (pre_decode_image_callback_) { EncodedImage encoded_image(frame->EncodedImage()); int qp = -1; @@ -258,7 +308,6 @@ int32_t VideoReceiver::Decode(uint16_t maxWaitTimeMs) { frame->CodecSpecific(), nullptr); } - rtc::CritScope cs(&receive_crit_); // If this frame was too late, we should adjust the delay accordingly _timing->UpdateCurrentDelay(frame->RenderTimeMs(), clock_->TimeInMilliseconds()); @@ -278,7 +327,7 @@ int32_t VideoReceiver::Decode(uint16_t maxWaitTimeMs) { // TODO(philipel): Clean up among the Decode functions as we replace // VCMEncodedFrame with FrameObject. int32_t VideoReceiver::Decode(const webrtc::VCMEncodedFrame* frame) { - rtc::CritScope lock(&receive_crit_); + RTC_DCHECK_RUN_ON(&decoder_thread_checker_); if (pre_decode_image_callback_) { EncodedImage encoded_image(frame->EncodedImage()); int qp = -1; @@ -291,19 +340,20 @@ int32_t VideoReceiver::Decode(const webrtc::VCMEncodedFrame* frame) { return Decode(*frame); } -void VideoReceiver::DecodingStopped() { - // No further calls to Decode() will be made after this point. - // TODO(tommi): Make use of this to clarify and check threading model. -} - int32_t VideoReceiver::RequestKeyFrame() { + RTC_DCHECK_RUN_ON(&module_thread_checker_); + + // Since we deregister from the module thread when the decoder thread isn't + // running, we should get no calls here if decoding isn't being done. + RTC_DCHECK(IsDecoderThreadRunning()); + TRACE_EVENT0("webrtc", "RequestKeyFrame"); - rtc::CritScope cs(&process_crit_); if (_frameTypeCallback != nullptr) { const int32_t ret = _frameTypeCallback->RequestKeyFrame(); if (ret < 0) { return ret; } + rtc::CritScope cs(&process_crit_); _scheduleKeyRequest = false; } else { return VCM_MISSING_CALLBACK; @@ -313,6 +363,7 @@ int32_t VideoReceiver::RequestKeyFrame() { // Must be called from inside the receive side critical section. int32_t VideoReceiver::Decode(const VCMEncodedFrame& frame) { + RTC_DCHECK_RUN_ON(&decoder_thread_checker_); TRACE_EVENT0("webrtc", "VideoReceiver::Decode"); // Change decoder if payload type has changed VCMGenericDecoder* decoder = @@ -327,11 +378,8 @@ int32_t VideoReceiver::Decode(const VCMEncodedFrame& frame) { int32_t VideoReceiver::RegisterReceiveCodec(const VideoCodec* receiveCodec, int32_t numberOfCores, bool requireKeyFrame) { - RTC_DCHECK(construction_thread_.CalledOnValidThread()); - // TODO(tommi): This method must only be called when the decoder thread - // is not running. Do we need a lock? If not, it looks like we might not need - // a lock at all for |_codecDataBase|. - rtc::CritScope cs(&receive_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); if (receiveCodec == nullptr) { return VCM_PARAMETER_ERROR; } @@ -346,6 +394,7 @@ int32_t VideoReceiver::RegisterReceiveCodec(const VideoCodec* receiveCodec, int32_t VideoReceiver::IncomingPacket(const uint8_t* incomingPayload, size_t payloadLength, const WebRtcRTPHeader& rtpInfo) { + RTC_DCHECK_RUN_ON(&module_thread_checker_); if (rtpInfo.frameType == kVideoFrameKey) { TRACE_EVENT1("webrtc", "VCM::PacketKeyFrame", "seqnum", rtpInfo.header.sequenceNumber); @@ -377,6 +426,7 @@ int32_t VideoReceiver::IncomingPacket(const uint8_t* incomingPayload, // to sync with audio. Not included in VideoCodingModule::Delay() // Defaults to 0 ms. int32_t VideoReceiver::SetMinimumPlayoutDelay(uint32_t minPlayoutDelayMs) { + RTC_DCHECK_RUN_ON(&module_thread_checker_); _timing->set_min_playout_delay(minPlayoutDelayMs); return VCM_OK; } @@ -384,22 +434,24 @@ int32_t VideoReceiver::SetMinimumPlayoutDelay(uint32_t minPlayoutDelayMs) { // The estimated delay caused by rendering, defaults to // kDefaultRenderDelayMs = 10 ms int32_t VideoReceiver::SetRenderDelay(uint32_t timeMS) { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); _timing->set_render_delay(timeMS); return VCM_OK; } // Current video delay int32_t VideoReceiver::Delay() const { + RTC_DCHECK_RUN_ON(&module_thread_checker_); return _timing->TargetVideoDelay(); } +// Only used by VCMRobustnessTest. int VideoReceiver::SetReceiverRobustnessMode( VideoCodingModule::ReceiverRobustness robustnessMode, VCMDecodeErrorMode decode_error_mode) { - RTC_DCHECK(construction_thread_.CalledOnValidThread()); - // TODO(tommi): This method must only be called when the decoder thread - // is not running and we don't need to hold this lock. - rtc::CritScope cs(&receive_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); switch (robustnessMode) { case VideoCodingModule::kNone: _receiver.SetNackMode(kNoNack, -1, -1); @@ -417,15 +469,17 @@ int VideoReceiver::SetReceiverRobustnessMode( } void VideoReceiver::SetDecodeErrorMode(VCMDecodeErrorMode decode_error_mode) { - rtc::CritScope cs(&receive_crit_); + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); _receiver.SetDecodeErrorMode(decode_error_mode); } void VideoReceiver::SetNackSettings(size_t max_nack_list_size, int max_packet_age_to_nack, int max_incomplete_time_ms) { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); if (max_nack_list_size != 0) { - rtc::CritScope cs(&process_crit_); max_nack_list_size_ = max_nack_list_size; } _receiver.SetNackSettings(max_nack_list_size, max_packet_age_to_nack, @@ -433,8 +487,22 @@ void VideoReceiver::SetNackSettings(size_t max_nack_list_size, } int VideoReceiver::SetMinReceiverDelay(int desired_delay_ms) { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(!IsDecoderThreadRunning()); + // TODO(tommi): Is the method only used by tests? Maybe could be offered + // via a test only subclass? + // Info from Stefan: If it is indeed only used by tests I think it's just that + // it hasn't been cleaned up when the calling code was cleaned up. return _receiver.SetMinReceiverDelay(desired_delay_ms); } +bool VideoReceiver::IsDecoderThreadRunning() { +#if RTC_DCHECK_IS_ON + return decoder_thread_is_running_; +#else + return true; +#endif +} + } // namespace vcm } // namespace webrtc diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc index 9a572cae9f..2b191f2cf7 100644 --- a/video/video_receive_stream.cc +++ b/video/video_receive_stream.cc @@ -226,6 +226,7 @@ void VideoReceiveStream::Start() { process_thread_->RegisterModule(&video_receiver_, RTC_FROM_HERE); // Start the decode thread + video_receiver_.DecoderThreadStarting(); decode_thread_.Start(); rtp_video_stream_receiver_.StartReceive(); } @@ -249,6 +250,7 @@ void VideoReceiveStream::Stop() { video_receiver_.TriggerDecoderShutdown(); decode_thread_.Stop(); + video_receiver_.DecoderThreadStopped(); // Deregister external decoders so they are no longer running during // destruction. This effectively stops the VCM since the decoder thread is // stopped, the VCM is deregistered and no asynchronous decoder threads are @@ -423,7 +425,6 @@ bool VideoReceiveStream::Decode() { frame_buffer_->NextFrame(wait_ms, &frame); if (res == video_coding::FrameBuffer::ReturnReason::kStopped) { - video_receiver_.DecodingStopped(); return false; }