diff --git a/call/call.cc b/call/call.cc index fba783f35e..b885e3722f 100644 --- a/call/call.cc +++ b/call/call.cc @@ -61,7 +61,7 @@ #include "video/call_stats.h" #include "video/send_delay_stats.h" #include "video/stats_counter.h" -#include "video/video_receive_stream.h" +#include "video/video_receive_stream2.h" #include "video/video_send_stream.h" namespace webrtc { @@ -279,7 +279,7 @@ class Call final : public webrtc::Call, // creates them. std::set audio_receive_streams_ RTC_GUARDED_BY(receive_crit_); - std::set video_receive_streams_ + std::set video_receive_streams_ RTC_GUARDED_BY(receive_crit_); std::map sync_stream_mapping_ @@ -837,10 +837,15 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( RegisterRateObserver(); - VideoReceiveStream* receive_stream = new VideoReceiveStream( - task_queue_factory_, &video_receiver_controller_, num_cpu_cores_, + TaskQueueBase* current = TaskQueueBase::Current(); + if (!current) + current = rtc::ThreadManager::Instance()->CurrentThread(); + RTC_CHECK(current); + VideoReceiveStream2* receive_stream = new VideoReceiveStream2( + task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_, transport_send_ptr_->packet_router(), std::move(configuration), - module_process_thread_.get(), call_stats_.get(), clock_); + module_process_thread_.get(), call_stats_.get(), clock_, + new VCMTiming(clock_)); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); { @@ -870,8 +875,8 @@ void Call::DestroyVideoReceiveStream( TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK(receive_stream != nullptr); - VideoReceiveStream* receive_stream_impl = - static_cast(receive_stream); + VideoReceiveStream2* receive_stream_impl = + static_cast(receive_stream); const VideoReceiveStream::Config& config = receive_stream_impl->config(); { WriteLockScoped write_lock(*receive_crit_); @@ -1007,7 +1012,7 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { UpdateAggregateNetworkState(); { ReadLockScoped read_lock(*receive_crit_); - for (VideoReceiveStream* video_receive_stream : video_receive_streams_) { + for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { video_receive_stream->SignalNetworkState(video_network_state_); } } @@ -1150,7 +1155,7 @@ void Call::ConfigureSync(const std::string& sync_group) { if (sync_audio_stream) sync_stream_mapping_[sync_group] = sync_audio_stream; size_t num_synced_streams = 0; - for (VideoReceiveStream* video_stream : video_receive_streams_) { + for (VideoReceiveStream2* video_stream : video_receive_streams_) { if (video_stream->config().sync_group != sync_group) continue; ++num_synced_streams; @@ -1187,7 +1192,7 @@ PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type, bool rtcp_delivered = false; if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { ReadLockScoped read_lock(*receive_crit_); - for (VideoReceiveStream* stream : video_receive_streams_) { + for (VideoReceiveStream2* stream : video_receive_streams_) { if (stream->DeliverRtcp(packet, length)) rtcp_delivered = true; } diff --git a/call/call_unittest.cc b/call/call_unittest.cc index cf727d4044..8afcf25121 100644 --- a/call/call_unittest.cc +++ b/call/call_unittest.cc @@ -31,6 +31,7 @@ #include "test/gtest.h" #include "test/mock_audio_decoder_factory.h" #include "test/mock_transport.h" +#include "test/run_loop.h" namespace { @@ -56,6 +57,7 @@ struct CallHelper { webrtc::Call* operator->() { return call_.get(); } private: + webrtc::test::RunLoop loop_; webrtc::RtcEventLogNull event_log_; webrtc::FieldTrialBasedConfig field_trials_; std::unique_ptr task_queue_factory_; diff --git a/modules/video_coding/generic_decoder.cc b/modules/video_coding/generic_decoder.cc index 100686d336..ca9b5e2d47 100644 --- a/modules/video_coding/generic_decoder.cc +++ b/modules/video_coding/generic_decoder.cc @@ -57,6 +57,8 @@ VCMReceiveCallback* VCMDecodedFrameCallback::UserReceiveCallback() { } int32_t VCMDecodedFrameCallback::Decoded(VideoFrame& decodedImage) { + // This function may be called on the decode TaskQueue, but may also be called + // on an OS provided queue such as on iOS (see e.g. b/153465112). return Decoded(decodedImage, -1); } diff --git a/test/call_test.h b/test/call_test.h index 3f4aa072e7..4b26097b6c 100644 --- a/test/call_test.h +++ b/test/call_test.h @@ -31,6 +31,7 @@ #include "test/fake_vp8_encoder.h" #include "test/frame_generator_capturer.h" #include "test/rtp_rtcp_observer.h" +#include "test/run_loop.h" namespace webrtc { namespace test { @@ -176,6 +177,8 @@ class CallTest : public ::testing::Test { FlexfecReceiveStream::Config* GetFlexFecConfig(); TaskQueueBase* task_queue() { return task_queue_.get(); } + test::RunLoop loop_; + Clock* const clock_; const FieldTrialBasedConfig field_trials_; diff --git a/video/receive_statistics_proxy2.cc b/video/receive_statistics_proxy2.cc index 50b1ea05ea..3e1bcbc4a4 100644 --- a/video/receive_statistics_proxy2.cc +++ b/video/receive_statistics_proxy2.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source @@ -18,6 +18,7 @@ #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/field_trial.h" @@ -83,9 +84,9 @@ std::string UmaSuffixForContentType(VideoContentType content_type) { ReceiveStatisticsProxy::ReceiveStatisticsProxy( const VideoReceiveStream::Config* config, - Clock* clock) + Clock* clock, + TaskQueueBase* worker_thread) : clock_(clock), - config_(*config), start_ms_(clock->TimeInMilliseconds()), enable_decode_time_histograms_( !field_trial::IsEnabled("WebRTC-DecodeTimeHistogramsKillSwitch")), @@ -104,13 +105,13 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy( kNumMeasurementsVariance), num_bad_states_(0), num_certain_states_(0), + remote_ssrc_(config->rtp.remote_ssrc), // 1000ms window, scale 1000 for ms to s. decode_fps_estimator_(1000, 1000), renders_fps_estimator_(1000, 1000), render_fps_tracker_(100, 10u), render_pixel_tracker_(100, 10u), - video_quality_observer_( - new VideoQualityObserver(VideoContentType::UNSPECIFIED)), + video_quality_observer_(new VideoQualityObserver()), interframe_delay_max_moving_(kMovingMaxWindowMs), freq_offset_counter_(clock, nullptr, kFreqOffsetProcessIntervalMs), avg_rtt_ms_(0), @@ -118,27 +119,48 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy( last_codec_type_(kVideoCodecVP8), num_delayed_frames_rendered_(0), sum_missed_render_deadline_ms_(0), - timing_frame_info_counter_(kMovingMaxWindowMs) { - decode_thread_.Detach(); - network_thread_.Detach(); - stats_.ssrc = config_.rtp.remote_ssrc; + timing_frame_info_counter_(kMovingMaxWindowMs), + worker_thread_(worker_thread) { + RTC_DCHECK(worker_thread); + decode_queue_.Detach(); + incoming_render_queue_.Detach(); + stats_.ssrc = config->rtp.remote_ssrc; +} + +ReceiveStatisticsProxy::~ReceiveStatisticsProxy() { + RTC_DCHECK_RUN_ON(&main_thread_); + task_safety_flag_->SetNotAlive(); } void ReceiveStatisticsProxy::UpdateHistograms( absl::optional fraction_lost, const StreamDataCounters& rtp_stats, const StreamDataCounters* rtx_stats) { - // Not actually running on the decoder thread, but must be called after + { + // TODO(bugs.webrtc.org/11489): Delete this scope after refactoring. + // We're actually on the main thread here, below is the explanation for + // why we use another thread checker. Once refactored, we can clean this + // up and not use the decode_queue_ checker here. + RTC_DCHECK_RUN_ON(&main_thread_); + } + + // We're not actually running on the decoder thread, but must be called after // DecoderThreadStopped, which detaches the thread checker. It is therefore // safe to access |qp_counters_|, which were updated on the decode thread // earlier. - RTC_DCHECK_RUN_ON(&decode_thread_); + RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); + // TODO(bugs.webrtc.org/11489): Many of these variables don't need to be + // inside the scope of a lock. Also consider grabbing the lock only to copy + // the state that histograms need to be reported for, then report histograms + // while not holding the lock. char log_stream_buf[8 * 1024]; rtc::SimpleStringBuilder log_stream(log_stream_buf); + int stream_duration_sec = (clock_->TimeInMilliseconds() - start_ms_) / 1000; + if (stats_.frame_counts.key_frames > 0 || stats_.frame_counts.delta_frames > 0) { RTC_HISTOGRAM_COUNTS_100000("WebRTC.Video.ReceiveStreamLifetimeInSeconds", @@ -408,6 +430,7 @@ void ReceiveStatisticsProxy::UpdateHistograms( StreamDataCounters rtp_rtx_stats = rtp_stats; if (rtx_stats) rtp_rtx_stats.Add(*rtx_stats); + int64_t elapsed_sec = rtp_rtx_stats.TimeSinceFirstPacketInMs(clock_->TimeInMilliseconds()) / 1000; @@ -473,10 +496,13 @@ void ReceiveStatisticsProxy::UpdateHistograms( } RTC_LOG(LS_INFO) << log_stream.str(); - video_quality_observer_->UpdateHistograms(); + video_quality_observer_->UpdateHistograms( + videocontenttypehelpers::IsScreenshare(last_content_type_)); } void ReceiveStatisticsProxy::QualitySample() { + RTC_DCHECK_RUN_ON(&incoming_render_queue_); + int64_t now = clock_->TimeInMilliseconds(); if (last_sample_time_ + kMinSampleLengthMs > now) return; @@ -546,6 +572,8 @@ void ReceiveStatisticsProxy::QualitySample() { } void ReceiveStatisticsProxy::UpdateFramerate(int64_t now_ms) const { + // TODO(bugs.webrtc.org/11489): Currently seems to be called from two threads, + // main and decode. Consider moving both to main. int64_t old_frames_ms = now_ms - kRateStatisticsWindowSizeMs; while (!frame_window_.empty() && frame_window_.begin()->first < old_frames_ms) { @@ -561,6 +589,10 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms( int width, int height, int decode_time_ms) const { + RTC_DCHECK_RUN_ON(&decode_queue_); + // TODO(bugs.webrtc.org/11489): Consider posting the work to the worker + // thread. + bool is_4k = (width == 3840 || width == 4096) && height == 2160; bool is_hd = width == 1920 && height == 1080; // Only update histograms for 4k/HD and VP9/H264. @@ -615,6 +647,7 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms( absl::optional ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs( int64_t now_ms) const { + RTC_DCHECK_RUN_ON(&main_thread_); if (!last_estimated_playout_ntp_timestamp_ms_ || !last_estimated_playout_time_ms_) { return absl::nullopt; @@ -624,6 +657,12 @@ ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs( } VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const { + RTC_DCHECK_RUN_ON(&main_thread_); + + // Like VideoReceiveStream::GetStats, called on the worker thread from + // StatsCollector::ExtractMediaInfo via worker_thread()->Invoke(). + // WebRtcVideoChannel::GetStats(), GetVideoReceiverInfo. + rtc::CritScope lock(&crit_); // Get current frame rates here, as only updating them on new frames prevents // us from ever correctly displaying frame rate of 0. @@ -655,12 +694,16 @@ VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const { } void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) { + RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); stats_.current_payload_type = payload_type; } void ReceiveStatisticsProxy::OnDecoderImplementationName( const char* implementation_name) { + RTC_DCHECK_RUN_ON(&decode_queue_); + // TODO(bugs.webrtc.org/11489): is a lock needed for this variable? Currently + // seems to be only touched on the decoder queue. rtc::CritScope lock(&crit_); stats_.decoder_implementation_name = implementation_name; } @@ -672,6 +715,7 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated( int jitter_buffer_ms, int min_playout_delay_ms, int render_delay_ms) { + RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); stats_.max_decode_ms = max_decode_ms; stats_.current_delay_ms = current_delay_ms; @@ -688,12 +732,14 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated( } void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) { + RTC_DCHECK_RUN_ON(&main_thread_); rtc::CritScope lock(&crit_); num_unique_frames_.emplace(num_unique_frames); } void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated( const TimingFrameInfo& info) { + RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); if (info.flags != VideoSendTiming::kInvalid) { int64_t now_ms = clock_->TimeInMilliseconds(); @@ -715,18 +761,41 @@ void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated( void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated( uint32_t ssrc, const RtcpPacketTypeCounter& packet_counter) { - rtc::CritScope lock(&crit_); - if (stats_.ssrc != ssrc) + if (ssrc != remote_ssrc_) return; + + if (!worker_thread_->IsCurrent()) { + // RtpRtcp::Configuration has a single RtcpPacketTypeCounterObserver and + // that same configuration may be used for both receiver and sender + // (see ModuleRtpRtcpImpl::ModuleRtpRtcpImpl). + // The RTCPSender implementation currently makes calls to this function on a + // process thread whereas the RTCPReceiver implementation calls back on the + // [main] worker thread. + // So until the sender implementation has been updated, we work around this + // here by posting the update to the expected thread. We make a by value + // copy of the |task_safety_flag_| to handle the case if the queued task + // runs after the |ReceiveStatisticsProxy| has been deleted. In such a + // case the packet_counter update won't be recorded. + worker_thread_->PostTask( + ToQueuedTask(task_safety_flag_, [ssrc, packet_counter, this]() { + RtcpPacketTypesCounterUpdated(ssrc, packet_counter); + })); + return; + } + + RTC_DCHECK_RUN_ON(&main_thread_); + rtc::CritScope lock(&crit_); stats_.rtcp_packet_type_counts = packet_counter; } void ReceiveStatisticsProxy::OnCname(uint32_t ssrc, absl::string_view cname) { - rtc::CritScope lock(&crit_); + RTC_DCHECK_RUN_ON(&main_thread_); // TODO(pbos): Handle both local and remote ssrcs here and RTC_DCHECK that we // receive stats from one of them. - if (stats_.ssrc != ssrc) + if (remote_ssrc_ != ssrc) return; + + rtc::CritScope lock(&crit_); stats_.c_name = std::string(cname); } @@ -734,16 +803,28 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame, absl::optional qp, int32_t decode_time_ms, VideoContentType content_type) { + // TODO(bugs.webrtc.org/11489): On iOS this gets called on + // "com.apple.coremedia.decompressionsession.clientcallback" + // See VCMDecodedFrameCallback::Decoded for info on what thread/queue we may + // be on. + // RTC_DCHECK_RUN_ON(&decode_queue_); + // TODO(bugs.webrtc.org/11489): - Same as OnRenderedFrame. Both called from + // within VideoStreamDecoder::FrameToRender + rtc::CritScope lock(&crit_); - uint64_t now_ms = clock_->TimeInMilliseconds(); + const uint64_t now_ms = clock_->TimeInMilliseconds(); - if (videocontenttypehelpers::IsScreenshare(content_type) != - videocontenttypehelpers::IsScreenshare(last_content_type_)) { + const bool is_screenshare = + videocontenttypehelpers::IsScreenshare(content_type); + const bool was_screenshare = + videocontenttypehelpers::IsScreenshare(last_content_type_); + + if (is_screenshare != was_screenshare) { // Reset the quality observer if content type is switched. But first report // stats for the previous part of the call. - video_quality_observer_->UpdateHistograms(); - video_quality_observer_.reset(new VideoQualityObserver(content_type)); + video_quality_observer_->UpdateHistograms(was_screenshare); + video_quality_observer_.reset(new VideoQualityObserver()); } video_quality_observer_->OnDecodedFrame(frame, qp, last_codec_type_); @@ -795,6 +876,11 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame, } void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) { + // See information in OnDecodedFrame for calling context. + // TODO(bugs.webrtc.org/11489): Consider posting the work to the worker + // thread. + // - Called from VideoReceiveStream::OnFrame. + int width = frame.width(); int height = frame.height(); RTC_DCHECK_GT(width, 0); @@ -802,6 +888,9 @@ void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) { int64_t now_ms = clock_->TimeInMilliseconds(); rtc::CritScope lock(&crit_); + // TODO(bugs.webrtc.org/11489): Lose the dependency on |frame| here, just + // include the frame metadata so that this can be done asynchronously without + // blocking the decoder thread. video_quality_observer_->OnRenderedFrame(frame, now_ms); ContentSpecificStats* content_specific_stats = @@ -834,7 +923,10 @@ void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) { void ReceiveStatisticsProxy::OnSyncOffsetUpdated(int64_t video_playout_ntp_ms, int64_t sync_offset_ms, double estimated_freq_khz) { + RTC_DCHECK_RUN_ON(&incoming_render_queue_); rtc::CritScope lock(&crit_); + // TODO(bugs.webrtc.org/11489): Lock possibly not needed for + // sync_offset_counter_ if it's only touched on the decoder thread. sync_offset_counter_.Add(std::abs(sync_offset_ms)); stats_.sync_offset_ms = sync_offset_ms; last_estimated_playout_ntp_timestamp_ms_ = video_playout_ntp_ms; @@ -887,7 +979,7 @@ void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) { } void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) { - RTC_DCHECK_RUN_ON(&decode_thread_); + RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); last_codec_type_ = codec_type; if (last_codec_type_ == kVideoCodecVP8 && qp != -1) { @@ -897,6 +989,8 @@ void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) { } void ReceiveStatisticsProxy::OnStreamInactive() { + RTC_DCHECK_RUN_ON(&decode_queue_); + // TODO(sprang): Figure out any other state that should be reset. rtc::CritScope lock(&crit_); @@ -907,6 +1001,14 @@ void ReceiveStatisticsProxy::OnStreamInactive() { void ReceiveStatisticsProxy::OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) { + // TODO(bugs.webrtc.org/11489): Is this a duplicate of + // VideoReceiveStream::OnRttUpdate? + // - looks like that runs on a/the module process thread. + // + + // BUGBUG + // Actually, it looks like this method is never called except from a unit + // test, GetStatsReportsDecodeTimingStats. rtc::CritScope lock(&crit_); avg_rtt_ms_ = avg_rtt_ms; } @@ -917,7 +1019,7 @@ void ReceiveStatisticsProxy::DecoderThreadStarting() { void ReceiveStatisticsProxy::DecoderThreadStopped() { RTC_DCHECK_RUN_ON(&main_thread_); - decode_thread_.Detach(); + decode_queue_.Detach(); } ReceiveStatisticsProxy::ContentSpecificStats::ContentSpecificStats() diff --git a/video/receive_statistics_proxy2.h b/video/receive_statistics_proxy2.h index 788bd617c4..86a015ecea 100644 --- a/video/receive_statistics_proxy2.h +++ b/video/receive_statistics_proxy2.h @@ -17,6 +17,7 @@ #include #include "absl/types/optional.h" +#include "api/task_queue/task_queue_base.h" #include "call/video_receive_stream.h" #include "modules/include/module_common_types.h" #include "modules/video_coding/include/video_coding_defines.h" @@ -26,6 +27,8 @@ #include "rtc_base/numerics/sample_counter.h" #include "rtc_base/rate_statistics.h" #include "rtc_base/rate_tracker.h" +#include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/thread_checker.h" #include "video/quality_threshold.h" @@ -45,8 +48,9 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, public CallStatsObserver { public: ReceiveStatisticsProxy(const VideoReceiveStream::Config* config, - Clock* clock); - ~ReceiveStatisticsProxy() = default; + Clock* clock, + TaskQueueBase* worker_thread); + ~ReceiveStatisticsProxy() override; VideoReceiveStream::Stats GetStats() const; @@ -141,14 +145,6 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, int64_t now_ms) const RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); Clock* const clock_; - // Ownership of this object lies with the owner of the ReceiveStatisticsProxy - // instance. Lifetime is guaranteed to outlive |this|. - // TODO(tommi): In practice the config_ reference is only used for accessing - // config_.rtp.ulpfec.ulpfec_payload_type. Instead of holding a pointer back, - // we could just store the value of ulpfec_payload_type and change the - // ReceiveStatisticsProxy() ctor to accept a const& of Config (since we'll - // then no longer store a pointer to the object). - const VideoReceiveStream::Config& config_; const int64_t start_ms_; const bool enable_decode_time_histograms_; @@ -162,6 +158,8 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, int num_certain_states_ RTC_GUARDED_BY(crit_); // Note: The |stats_.rtp_stats| member is not used or populated by this class. mutable VideoReceiveStream::Stats stats_ RTC_GUARDED_BY(crit_); + // Same as stats_.ssrc, but const (no lock required). + const uint32_t remote_ssrc_; RateStatistics decode_fps_estimator_ RTC_GUARDED_BY(crit_); RateStatistics renders_fps_estimator_ RTC_GUARDED_BY(crit_); rtc::RateTracker render_fps_tracker_ RTC_GUARDED_BY(crit_); @@ -179,7 +177,7 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, std::map content_specific_stats_ RTC_GUARDED_BY(crit_); MaxCounter freq_offset_counter_ RTC_GUARDED_BY(crit_); - QpCounters qp_counters_ RTC_GUARDED_BY(decode_thread_); + QpCounters qp_counters_ RTC_GUARDED_BY(decode_queue_); int64_t avg_rtt_ms_ RTC_GUARDED_BY(crit_); mutable std::map frame_window_ RTC_GUARDED_BY(&crit_); VideoContentType last_content_type_ RTC_GUARDED_BY(&crit_); @@ -198,9 +196,17 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, RTC_GUARDED_BY(&crit_); absl::optional last_estimated_playout_time_ms_ RTC_GUARDED_BY(&crit_); - rtc::ThreadChecker decode_thread_; - rtc::ThreadChecker network_thread_; + + // The thread on which this instance is constructed and some of its main + // methods are invoked on such as GetStats(). + TaskQueueBase* const worker_thread_; + + PendingTaskSafetyFlag::Pointer task_safety_flag_ = + PendingTaskSafetyFlag::Create(); + + SequenceChecker decode_queue_; rtc::ThreadChecker main_thread_; + SequenceChecker incoming_render_queue_; }; } // namespace internal diff --git a/video/receive_statistics_proxy2_unittest.cc b/video/receive_statistics_proxy2_unittest.cc index 5574d44a0f..bcc96cd76c 100644 --- a/video/receive_statistics_proxy2_unittest.cc +++ b/video/receive_statistics_proxy2_unittest.cc @@ -1,5 +1,5 @@ /* - * Copyright 2016 The WebRTC project authors. All Rights Reserved. + * Copyright 2020 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source @@ -22,9 +22,12 @@ #include "api/video/video_frame.h" #include "api/video/video_frame_buffer.h" #include "api/video/video_rotation.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "rtc_base/thread.h" #include "system_wrappers/include/metrics.h" #include "test/field_trial.h" #include "test/gtest.h" +#include "test/run_loop.h" namespace webrtc { namespace internal { @@ -40,15 +43,15 @@ const int kHeight = 720; // TODO(sakal): ReceiveStatisticsProxy is lacking unittesting. class ReceiveStatisticsProxy2Test : public ::testing::Test { public: - ReceiveStatisticsProxy2Test() : fake_clock_(1234), config_(GetTestConfig()) {} - virtual ~ReceiveStatisticsProxy2Test() {} - - protected: - virtual void SetUp() { + ReceiveStatisticsProxy2Test() : fake_clock_(1234), config_(GetTestConfig()) { metrics::Reset(); - statistics_proxy_.reset(new ReceiveStatisticsProxy(&config_, &fake_clock_)); + statistics_proxy_.reset( + new ReceiveStatisticsProxy(&config_, &fake_clock_, loop_.task_queue())); } + ~ReceiveStatisticsProxy2Test() override { statistics_proxy_.reset(); } + + protected: VideoReceiveStream::Config GetTestConfig() { VideoReceiveStream::Config config(nullptr); config.rtp.local_ssrc = kLocalSsrc; @@ -79,6 +82,7 @@ class ReceiveStatisticsProxy2Test : public ::testing::Test { SimulatedClock fake_clock_; const VideoReceiveStream::Config config_; std::unique_ptr statistics_proxy_; + test::RunLoop loop_; }; TEST_F(ReceiveStatisticsProxy2Test, OnDecodedFrameIncreasesFramesDecoded) { diff --git a/video/video_quality_observer2.cc b/video/video_quality_observer2.cc index 5528815890..b1282c1ca0 100644 --- a/video/video_quality_observer2.cc +++ b/video/video_quality_observer2.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source @@ -37,7 +37,7 @@ constexpr int kMaxNumCachedBlockyFrames = 100; // TODO(ilnik): Add H264/HEVC thresholds. } // namespace -VideoQualityObserver::VideoQualityObserver(VideoContentType content_type) +VideoQualityObserver::VideoQualityObserver() : last_frame_rendered_ms_(-1), num_frames_rendered_(0), first_frame_rendered_ms_(-1), @@ -50,10 +50,12 @@ VideoQualityObserver::VideoQualityObserver(VideoContentType content_type) current_resolution_(Resolution::Low), num_resolution_downgrades_(0), time_in_blocky_video_ms_(0), - content_type_(content_type), is_paused_(false) {} -void VideoQualityObserver::UpdateHistograms() { +void VideoQualityObserver::UpdateHistograms(bool screenshare) { + // TODO(bugs.webrtc.org/11489): Called on the decoder thread - which _might_ + // be the same as the construction thread. + // Don't report anything on an empty video stream. if (num_frames_rendered_ == 0) { return; @@ -67,9 +69,8 @@ void VideoQualityObserver::UpdateHistograms() { last_unfreeze_time_ms_); } - std::string uma_prefix = videocontenttypehelpers::IsScreenshare(content_type_) - ? "WebRTC.Video.Screenshare" - : "WebRTC.Video"; + std::string uma_prefix = + screenshare ? "WebRTC.Video.Screenshare" : "WebRTC.Video"; auto mean_time_between_freezes = smooth_playback_durations_.Avg(kMinRequiredSamples); diff --git a/video/video_quality_observer2.h b/video/video_quality_observer2.h index af71937e43..615e0d3c57 100644 --- a/video/video_quality_observer2.h +++ b/video/video_quality_observer2.h @@ -32,7 +32,7 @@ class VideoQualityObserver { public: // Use either VideoQualityObserver::kBlockyQpThresholdVp8 or // VideoQualityObserver::kBlockyQpThresholdVp9. - explicit VideoQualityObserver(VideoContentType content_type); + VideoQualityObserver(); ~VideoQualityObserver() = default; void OnDecodedFrame(const VideoFrame& frame, @@ -50,7 +50,8 @@ class VideoQualityObserver { uint32_t TotalFramesDurationMs() const; double SumSquaredFrameDurationsSec() const; - void UpdateHistograms(); + // Set |screenshare| to true if the last decoded frame was for screenshare. + void UpdateHistograms(bool screenshare); static const uint32_t kMinFrameSamplesToDetectFreeze; static const uint32_t kMinIncreaseForFreezeMs; @@ -87,8 +88,6 @@ class VideoQualityObserver { int num_resolution_downgrades_; // Similar to resolution, time spent in high-QP video. int64_t time_in_blocky_video_ms_; - // Content type of the last decoded frame. - VideoContentType content_type_; bool is_paused_; // Set of decoded frames with high QP value. diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index 899d9d5e65..0af17d5a45 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source @@ -181,6 +181,7 @@ constexpr int kInactiveStreamThresholdMs = 600000; // 10 minutes. VideoReceiveStream2::VideoReceiveStream2( TaskQueueFactory* task_queue_factory, + TaskQueueBase* current_queue, RtpStreamReceiverControllerInterface* receiver_controller, int num_cpu_cores, PacketRouter* packet_router, @@ -194,10 +195,11 @@ VideoReceiveStream2::VideoReceiveStream2( config_(std::move(config)), num_cpu_cores_(num_cpu_cores), process_thread_(process_thread), + worker_thread_(current_queue), clock_(clock), call_stats_(call_stats), source_tracker_(clock_), - stats_proxy_(&config_, clock_), + stats_proxy_(&config_, clock_, worker_thread_), rtp_receive_statistics_(ReceiveStatistics::Create(clock_)), timing_(timing), video_receiver_(clock_, timing_.get()), @@ -227,6 +229,7 @@ VideoReceiveStream2::VideoReceiveStream2( TaskQueueFactory::Priority::HIGH)) { RTC_LOG(LS_INFO) << "VideoReceiveStream2: " << config_.ToString(); + RTC_DCHECK(worker_thread_); RTC_DCHECK(config_.renderer); RTC_DCHECK(process_thread_); RTC_DCHECK(call_stats_); @@ -266,25 +269,6 @@ VideoReceiveStream2::VideoReceiveStream2( } } -VideoReceiveStream2::VideoReceiveStream2( - TaskQueueFactory* task_queue_factory, - RtpStreamReceiverControllerInterface* receiver_controller, - int num_cpu_cores, - PacketRouter* packet_router, - VideoReceiveStream::Config config, - ProcessThread* process_thread, - CallStats* call_stats, - Clock* clock) - : VideoReceiveStream2(task_queue_factory, - receiver_controller, - num_cpu_cores, - packet_router, - std::move(config), - process_thread, - call_stats, - clock, - new VCMTiming(clock)) {} - VideoReceiveStream2::~VideoReceiveStream2() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); RTC_LOG(LS_INFO) << "~VideoReceiveStream2: " << config_.ToString(); @@ -437,7 +421,8 @@ void VideoReceiveStream2::Stop() { } VideoReceiveStream::Stats VideoReceiveStream2::GetStats() const { - VideoReceiveStream::Stats stats = stats_proxy_.GetStats(); + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + VideoReceiveStream2::Stats stats = stats_proxy_.GetStats(); stats.total_bitrate_bps = 0; StreamStatistician* statistician = rtp_receive_statistics_->GetStatistician(stats.ssrc); @@ -455,6 +440,7 @@ VideoReceiveStream::Stats VideoReceiveStream2::GetStats() const { } void VideoReceiveStream2::UpdateHistograms() { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); absl::optional fraction_lost; StreamDataCounters rtp_stats; StreamStatistician* statistician = @@ -491,6 +477,7 @@ bool VideoReceiveStream2::SetBaseMinimumPlayoutDelayMs(int delay_ms) { return false; } + // TODO(bugs.webrtc.org/11489): Consider posting to worker. rtc::CritScope cs(&playout_delay_lock_); base_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); @@ -504,19 +491,19 @@ int VideoReceiveStream2::GetBaseMinimumPlayoutDelayMs() const { return base_minimum_playout_delay_ms_; } -// TODO(tommi): This method grabs a lock 6 times. +// TODO(bugs.webrtc.org/11489): This method grabs a lock 6 times. void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) { int64_t video_playout_ntp_ms; int64_t sync_offset_ms; double estimated_freq_khz; - // TODO(tommi): GetStreamSyncOffsetInMs grabs three locks. One inside the - // function itself, another in GetChannel() and a third in + // TODO(bugs.webrtc.org/11489): GetStreamSyncOffsetInMs grabs three locks. One + // inside the function itself, another in GetChannel() and a third in // GetPlayoutTimestamp. Seems excessive. Anyhow, I'm assuming the function // succeeds most of the time, which leads to grabbing a fourth lock. if (rtp_stream_sync_.GetStreamSyncOffsetInMs( video_frame.timestamp(), video_frame.render_time_ms(), &video_playout_ntp_ms, &sync_offset_ms, &estimated_freq_khz)) { - // TODO(tommi): OnSyncOffsetUpdated grabs a lock. + // TODO(bugs.webrtc.org/11489): OnSyncOffsetUpdated grabs a lock. stats_proxy_.OnSyncOffsetUpdated(video_playout_ntp_ms, sync_offset_ms, estimated_freq_khz); } @@ -524,7 +511,7 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) { config_.renderer->OnFrame(video_frame); - // TODO(tommi): OnRenderFrame grabs a lock too. + // TODO(bugs.webrtc.org/11489): OnRenderFrame grabs a lock too. stats_proxy_.OnRenderedFrame(video_frame); } @@ -562,6 +549,10 @@ void VideoReceiveStream2::OnCompleteFrame( } last_complete_frame_time_ms_ = time_now_ms; + // TODO(bugs.webrtc.org/11489): We grab the playout_delay_lock_ lock + // potentially twice. Consider checking both min/max and posting to worker if + // there's a change. If we always update playout delays on the worker, we + // don't need a lock. const PlayoutDelay& playout_delay = frame->EncodedImage().playout_delay_; if (playout_delay.min_ms >= 0) { rtc::CritScope cs(&playout_delay_lock_); @@ -617,6 +608,7 @@ void VideoReceiveStream2::SetEstimatedPlayoutNtpTimestampMs( void VideoReceiveStream2::SetMinimumPlayoutDelay(int delay_ms) { RTC_DCHECK_RUN_ON(&module_process_sequence_checker_); + // TODO(bugs.webrtc.org/11489): Consider posting to worker. rtc::CritScope cs(&playout_delay_lock_); syncable_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); @@ -651,6 +643,7 @@ void VideoReceiveStream2::StartNextDecode() { void VideoReceiveStream2::HandleEncodedFrame( std::unique_ptr frame) { + // Running on |decode_queue_|. int64_t now_ms = clock_->TimeInMilliseconds(); // Current OnPreDecode only cares about QP for VP8. @@ -705,6 +698,7 @@ void VideoReceiveStream2::HandleKeyFrameGeneration( } void VideoReceiveStream2::HandleFrameBufferTimeout() { + // Running on |decode_queue_|. int64_t now_ms = clock_->TimeInMilliseconds(); absl::optional last_packet_ms = rtp_video_stream_receiver_.LastReceivedPacketMs(); diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index 7fb940431a..2a0c07c879 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -57,6 +57,7 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, static constexpr int kMaxWaitForKeyFrameMs = 200; VideoReceiveStream2(TaskQueueFactory* task_queue_factory, + TaskQueueBase* current_queue, RtpStreamReceiverControllerInterface* receiver_controller, int num_cpu_cores, PacketRouter* packet_router, @@ -65,14 +66,6 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, CallStats* call_stats, Clock* clock, VCMTiming* timing); - VideoReceiveStream2(TaskQueueFactory* task_queue_factory, - RtpStreamReceiverControllerInterface* receiver_controller, - int num_cpu_cores, - PacketRouter* packet_router, - VideoReceiveStream::Config config, - ProcessThread* process_thread, - CallStats* call_stats, - Clock* clock); ~VideoReceiveStream2() override; const Config& config() const { return config_; } @@ -161,6 +154,7 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, const VideoReceiveStream::Config config_; const int num_cpu_cores_; ProcessThread* const process_thread_; + TaskQueueBase* const worker_thread_; Clock* const clock_; CallStats* const call_stats_;