diff --git a/modules/video_coding/BUILD.gn b/modules/video_coding/BUILD.gn index 12d02bf067..29a5d8c503 100644 --- a/modules/video_coding/BUILD.gn +++ b/modules/video_coding/BUILD.gn @@ -151,6 +151,7 @@ rtc_static_library("video_coding") { "..:module_api_public", "../../api:fec_controller_api", "../../api:rtp_headers", + "../../api/task_queue:global_task_queue_factory", "../../api/units:data_rate", "../../api/video:builtin_video_bitrate_allocator_factory", "../../api/video:encoded_frame", @@ -170,6 +171,7 @@ rtc_static_library("video_coding") { "../../rtc_base/experiments:rtt_mult_experiment", "../../rtc_base/synchronization:sequence_checker", "../../rtc_base/system:fallthrough", + "../../rtc_base/task_utils:repeating_task", "../../rtc_base/third_party/base64", "../../rtc_base/time:timestamp_extrapolator", "../../system_wrappers", diff --git a/modules/video_coding/frame_buffer2.cc b/modules/video_coding/frame_buffer2.cc index 5d427b01e1..20b680e170 100644 --- a/modules/video_coding/frame_buffer2.cc +++ b/modules/video_coding/frame_buffer2.cc @@ -17,6 +17,7 @@ #include #include +#include "absl/memory/memory.h" #include "api/video/encoded_image.h" #include "api/video/video_timing.h" #include "modules/video_coding/include/video_coding_defines.h" @@ -53,6 +54,7 @@ FrameBuffer::FrameBuffer(Clock* clock, VCMReceiveStatisticsCallback* stats_callback) : decoded_frames_history_(kMaxFramesHistory), clock_(clock), + callback_queue_(nullptr), jitter_estimator_(jitter_estimator), timing_(timing), inter_frame_delay_(clock_->TimeInMilliseconds()), @@ -65,6 +67,55 @@ FrameBuffer::FrameBuffer(Clock* clock, FrameBuffer::~FrameBuffer() {} +void FrameBuffer::NextFrame( + int64_t max_wait_time_ms, + bool keyframe_required, + rtc::TaskQueue* callback_queue, + std::function, ReturnReason)> handler) { + RTC_DCHECK_RUN_ON(callback_queue); + TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame"); + int64_t latest_return_time_ms = + clock_->TimeInMilliseconds() + max_wait_time_ms; + rtc::CritScope lock(&crit_); + if (stopped_) { + return; + } + latest_return_time_ms_ = latest_return_time_ms; + keyframe_required_ = keyframe_required; + frame_handler_ = handler; + callback_queue_ = callback_queue; + StartWaitForNextFrameOnQueue(); +} + +void FrameBuffer::StartWaitForNextFrameOnQueue() { + RTC_DCHECK(callback_queue_); + RTC_DCHECK(!callback_task_.Running()); + int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds()); + callback_task_ = RepeatingTaskHandle::DelayedStart( + callback_queue_->Get(), TimeDelta::ms(wait_ms), [this] { + // If this task has not been cancelled, we did not get any new frames + // while waiting. Continue with frame delivery. + rtc::CritScope lock(&crit_); + if (!frames_to_decode_.empty()) { + // We have frames, deliver! + frame_handler_(absl::WrapUnique(GetNextFrame()), kFrameFound); + CancelCallback(); + return TimeDelta::Zero(); // Ignored. + } else if (clock_->TimeInMilliseconds() >= latest_return_time_ms_) { + // We have timed out, signal this and stop repeating. + frame_handler_(nullptr, kTimeout); + CancelCallback(); + return TimeDelta::Zero(); // Ignored. + } else { + // If there's no frames to decode and there is still time left, it + // means that the frame buffer was cleared between creation and + // execution of this task. Continue waiting for the remaining time. + int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds()); + return TimeDelta::ms(wait_ms); + } + }); +} + FrameBuffer::ReturnReason FrameBuffer::NextFrame( int64_t max_wait_time_ms, std::unique_ptr* frame_out, @@ -313,6 +364,7 @@ void FrameBuffer::Stop() { rtc::CritScope lock(&crit_); stopped_ = true; new_continuous_frame_event_.Set(); + CancelCallback(); } void FrameBuffer::Clear() { @@ -342,6 +394,12 @@ bool FrameBuffer::ValidReferences(const EncodedFrame& frame) const { return true; } +void FrameBuffer::CancelCallback() { + frame_handler_ = {}; + callback_task_.Stop(); + callback_queue_ = nullptr; +} + bool FrameBuffer::IsCompleteSuperFrame(const EncodedFrame& frame) { if (frame.inter_layer_predicted) { // Check that all previous spatial layers are already inserted. @@ -487,9 +545,19 @@ int64_t FrameBuffer::InsertFrame(std::unique_ptr frame) { last_continuous_picture_id = last_continuous_frame_->picture_id; // Since we now have new continuous frames there might be a better frame - // to return from NextFrame. Signal that thread so that it again can choose - // which frame to return. + // to return from NextFrame. new_continuous_frame_event_.Set(); + + if (callback_queue_) { + callback_queue_->PostTask([this] { + rtc::CritScope lock(&crit_); + if (!callback_task_.Running()) + return; + RTC_CHECK(frame_handler_); + callback_task_.Stop(); + StartWaitForNextFrameOnQueue(); + }); + } } return last_continuous_picture_id; diff --git a/modules/video_coding/frame_buffer2.h b/modules/video_coding/frame_buffer2.h index 78a617155a..57dbaa40e0 100644 --- a/modules/video_coding/frame_buffer2.h +++ b/modules/video_coding/frame_buffer2.h @@ -27,6 +27,8 @@ #include "rtc_base/event.h" #include "rtc_base/experiments/rtt_mult_experiment.h" #include "rtc_base/numerics/sequence_number_util.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -64,6 +66,11 @@ class FrameBuffer { ReturnReason NextFrame(int64_t max_wait_time_ms, std::unique_ptr* frame_out, bool keyframe_required = false); + void NextFrame( + int64_t max_wait_time_ms, + bool keyframe_required, + rtc::TaskQueue* callback_queue, + std::function, ReturnReason)> handler); // Tells the FrameBuffer which protection mode that is in use. Affects // the frame timing. @@ -121,6 +128,9 @@ class FrameBuffer { int64_t FindNextFrame(int64_t now_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); EncodedFrame* GetNextFrame() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + void StartWaitForNextFrameOnQueue() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + void CancelCallback() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + // Update all directly dependent and indirectly dependent frames and mark // them as continuous if all their references has been fulfilled. void PropagateContinuity(FrameMap::iterator start) @@ -163,6 +173,11 @@ class FrameBuffer { rtc::CriticalSection crit_; Clock* const clock_; + + rtc::TaskQueue* callback_queue_ RTC_GUARDED_BY(crit_); + RepeatingTaskHandle callback_task_ RTC_GUARDED_BY(crit_); + std::function, ReturnReason)> + frame_handler_ RTC_GUARDED_BY(crit_); int64_t latest_return_time_ms_ RTC_GUARDED_BY(crit_); bool keyframe_required_ RTC_GUARDED_BY(crit_); diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc index 6e3a063559..29c88201d4 100644 --- a/video/video_receive_stream.cc +++ b/video/video_receive_stream.cc @@ -185,6 +185,8 @@ VideoReceiveStream::VideoReceiveStream( num_cpu_cores_(num_cpu_cores), process_thread_(process_thread), clock_(clock), + use_task_queue_( + !field_trial::IsDisabled("WebRTC-Video-DecodeOnTaskQueue")), decode_thread_(&DecodeThreadFunction, this, "DecodingThread", @@ -213,7 +215,10 @@ VideoReceiveStream::VideoReceiveStream( .value_or(kMaxWaitForKeyFrameMs)), max_wait_for_frame_ms_(KeyframeIntervalSettings::ParseFromFieldTrials() .MaxWaitForFrameMs() - .value_or(kMaxWaitForFrameMs)) { + .value_or(kMaxWaitForFrameMs)), + decode_queue_(task_queue_factory_->CreateTaskQueue( + "DecodingQueue", + TaskQueueFactory::Priority::HIGH)) { RTC_LOG(LS_INFO) << "VideoReceiveStream: " << config_.ToString(); RTC_DCHECK(config_.renderer); @@ -309,7 +314,7 @@ void VideoReceiveStream::SetSync(Syncable* audio_syncable) { void VideoReceiveStream::Start() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); - if (decode_thread_.IsRunning()) { + if (decoder_running_) { return; } @@ -388,7 +393,16 @@ void VideoReceiveStream::Start() { // Start the decode thread video_receiver_.DecoderThreadStarting(); stats_proxy_.DecoderThreadStarting(); - decode_thread_.Start(); + if (!use_task_queue_) { + decode_thread_.Start(); + } else { + decode_queue_.PostTask([this] { + RTC_DCHECK_RUN_ON(&decode_queue_); + decoder_stopped_ = false; + StartNextDecode(); + }); + } + decoder_running_ = true; rtp_video_stream_receiver_.StartReceive(); } @@ -399,16 +413,31 @@ void VideoReceiveStream::Stop() { stats_proxy_.OnUniqueFramesCounted( rtp_video_stream_receiver_.GetUniqueFramesSeen()); - frame_buffer_->Stop(); + if (!use_task_queue_) { + frame_buffer_->Stop(); + } else { + decode_queue_.PostTask([this] { frame_buffer_->Stop(); }); + } call_stats_->DeregisterStatsObserver(this); - if (decode_thread_.IsRunning()) { + if (decoder_running_) { // TriggerDecoderShutdown will release any waiting decoder thread and make // it stop immediately, instead of waiting for a timeout. Needs to be called // before joining the decoder thread. video_receiver_.TriggerDecoderShutdown(); - decode_thread_.Stop(); + if (!use_task_queue_) { + decode_thread_.Stop(); + } else { + rtc::Event done; + decode_queue_.PostTask([this, &done] { + RTC_DCHECK_RUN_ON(&decode_queue_); + decoder_stopped_ = true; + done.Set(); + }); + done.Wait(rtc::Event::kForever); + } + decoder_running_ = false; video_receiver_.DecoderThreadStopped(); stats_proxy_.DecoderThreadStopped(); // Deregister external decoders so they are no longer running during @@ -572,6 +601,38 @@ int64_t VideoReceiveStream::GetWaitMs() const { return keyframe_required_ ? max_wait_for_keyframe_ms_ : max_wait_for_frame_ms_; } + +void VideoReceiveStream::StartNextDecode() { + RTC_DCHECK(use_task_queue_); + TRACE_EVENT0("webrtc", "VideoReceiveStream::StartNextDecode"); + + struct DecodeTask { + void operator()() { + RTC_DCHECK_RUN_ON(&stream->decode_queue_); + if (stream->decoder_stopped_) + return; + if (frame) { + stream->HandleEncodedFrame(std::move(frame)); + } else { + stream->HandleFrameBufferTimeout(); + } + stream->StartNextDecode(); + } + VideoReceiveStream* stream; + std::unique_ptr frame; + }; + + // TODO(philipel): Call NextFrame with |keyframe_required| argument set when + // downstream project has been fixed. + frame_buffer_->NextFrame( + GetWaitMs(), /*keyframe_required*/ false, &decode_queue_, + [this](std::unique_ptr frame, ReturnReason res) { + RTC_DCHECK_EQ(frame == nullptr, res == ReturnReason::kTimeout); + RTC_DCHECK_EQ(frame != nullptr, res == ReturnReason::kFrameFound); + decode_queue_.PostTask(DecodeTask{this, std::move(frame)}); + }); +} + void VideoReceiveStream::DecodeThreadFunction(void* ptr) { ScopedRegisterThreadForDebugging thread_dbg(RTC_FROM_HERE); while (static_cast(ptr)->Decode()) { @@ -579,6 +640,7 @@ void VideoReceiveStream::DecodeThreadFunction(void* ptr) { } bool VideoReceiveStream::Decode() { + RTC_DCHECK(!use_task_queue_); TRACE_EVENT0("webrtc", "VideoReceiveStream::Decode"); std::unique_ptr frame; diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h index 585a2cc308..dc4e4b7e73 100644 --- a/video/video_receive_stream.h +++ b/video/video_receive_stream.h @@ -23,6 +23,7 @@ #include "modules/video_coding/frame_buffer2.h" #include "modules/video_coding/video_coding_impl.h" #include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_queue.h" #include "system_wrappers/include/clock.h" #include "video/receive_statistics_proxy.h" #include "video/rtp_streams_synchronizer.h" @@ -133,6 +134,7 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, private: int64_t GetWaitMs() const; + void StartNextDecode() RTC_RUN_ON(decode_queue_); static void DecodeThreadFunction(void* ptr); bool Decode(); void HandleEncodedFrame(std::unique_ptr frame); @@ -153,10 +155,15 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, ProcessThread* const process_thread_; Clock* const clock_; + const bool use_task_queue_; + rtc::PlatformThread decode_thread_; CallStats* const call_stats_; + bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false; + bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true; + ReceiveStatisticsProxy stats_proxy_; // Shared by media and rtx stream receivers, since the latter has no RtpRtcp // module of its own. @@ -211,6 +218,9 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, // Maximum delay as decided by the RTP playout delay extension. int frame_maximum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1; + + // Defined last so they are destroyed before all other members. + rtc::TaskQueue decode_queue_; }; } // namespace internal } // namespace webrtc