diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc index 307d2d594c..4be2131f3f 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.cc +++ b/rtc_base/task_utils/pending_task_safety_flag.cc @@ -15,7 +15,7 @@ namespace webrtc { // static -PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() { +rtc::scoped_refptr PendingTaskSafetyFlag::Create() { return new rtc::RefCountedObject(); } diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h index 1b301c8034..580fb3f912 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.h +++ b/rtc_base/task_utils/pending_task_safety_flag.h @@ -36,12 +36,17 @@ namespace webrtc { // MyMethod(); // })); // +// Or implicitly by letting ToQueuedTask do the checking: +// +// // Running outside of the main thread. +// my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_, +// [this]() { MyMethod(); })); +// // Note that checking the state only works on the construction/destruction // thread of the ReceiveStatisticsProxy instance. class PendingTaskSafetyFlag : public rtc::RefCountInterface { public: - using Pointer = rtc::scoped_refptr; - static Pointer Create(); + static rtc::scoped_refptr Create(); ~PendingTaskSafetyFlag() = default; @@ -56,6 +61,25 @@ class PendingTaskSafetyFlag : public rtc::RefCountInterface { SequenceChecker main_sequence_; }; +// Makes using PendingTaskSafetyFlag very simple. Automatic PTSF creation +// and signalling of destruction when the ScopedTaskSafety instance goes out +// of scope. +// Should be used by the class that wants tasks dropped after destruction. +// Requirements are that the instance be constructed and destructed on +// the same thread as the potentially dropped tasks would be running on. +class ScopedTaskSafety { + public: + ScopedTaskSafety() = default; + ~ScopedTaskSafety() { flag_->SetNotAlive(); } + + // Returns a new reference to the safety flag. + rtc::scoped_refptr flag() const { return flag_; } + + private: + rtc::scoped_refptr flag_ = + PendingTaskSafetyFlag::Create(); +}; + } // namespace webrtc #endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc index 0c1c3c8e52..6df2fe2ffb 100644 --- a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc +++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc @@ -29,7 +29,7 @@ using ::testing::Return; } // namespace TEST(PendingTaskSafetyFlagTest, Basic) { - PendingTaskSafetyFlag::Pointer safety_flag; + rtc::scoped_refptr safety_flag; { // Scope for the |owner| instance. class Owner { @@ -37,12 +37,27 @@ TEST(PendingTaskSafetyFlagTest, Basic) { Owner() = default; ~Owner() { flag_->SetNotAlive(); } - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + rtc::scoped_refptr flag_ = + PendingTaskSafetyFlag::Create(); } owner; EXPECT_TRUE(owner.flag_->alive()); safety_flag = owner.flag_; EXPECT_TRUE(safety_flag->alive()); } + // |owner| now out of scope. + EXPECT_FALSE(safety_flag->alive()); +} + +TEST(PendingTaskSafetyFlagTest, BasicScoped) { + rtc::scoped_refptr safety_flag; + { + struct Owner { + ScopedTaskSafety safety; + } owner; + safety_flag = owner.safety.flag(); + EXPECT_TRUE(safety_flag->alive()); + } + // |owner| now out of scope. EXPECT_FALSE(safety_flag->alive()); } @@ -72,7 +87,8 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) { private: TaskQueueBase* const tq_main_; bool stuff_done_ = false; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + rtc::scoped_refptr flag_{ + PendingTaskSafetyFlag::Create()}; }; std::unique_ptr owner; @@ -106,22 +122,18 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) { } ~Owner() { RTC_DCHECK(tq_main_->IsCurrent()); - flag_->SetNotAlive(); } void DoStuff() { RTC_DCHECK(!tq_main_->IsCurrent()); - tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() { - if (!safe->alive()) - return; - *stuff_done_ = true; - })); + tq_main_->PostTask( + ToQueuedTask(safety_, [this]() { *stuff_done_ = true; })); } private: TaskQueueBase* const tq_main_; bool* const stuff_done_; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + ScopedTaskSafety safety_; }; std::unique_ptr owner; diff --git a/rtc_base/task_utils/to_queued_task.h b/rtc_base/task_utils/to_queued_task.h index cc9325ebd6..07ab0ebe26 100644 --- a/rtc_base/task_utils/to_queued_task.h +++ b/rtc_base/task_utils/to_queued_task.h @@ -39,7 +39,7 @@ class ClosureTask : public QueuedTask { template class SafetyClosureTask : public QueuedTask { public: - explicit SafetyClosureTask(PendingTaskSafetyFlag::Pointer safety, + explicit SafetyClosureTask(rtc::scoped_refptr safety, Closure&& closure) : closure_(std::forward(closure)), safety_flag_(std::move(safety)) {} @@ -52,7 +52,7 @@ class SafetyClosureTask : public QueuedTask { } typename std::decay::type closure_; - PendingTaskSafetyFlag::Pointer safety_flag_; + rtc::scoped_refptr safety_flag_; }; // Extends ClosureTask to also allow specifying cleanup code. @@ -81,13 +81,25 @@ std::unique_ptr ToQueuedTask(Closure&& closure) { } template -std::unique_ptr ToQueuedTask(PendingTaskSafetyFlag::Pointer safety, - Closure&& closure) { +std::unique_ptr ToQueuedTask( + rtc::scoped_refptr safety, + Closure&& closure) { return std::make_unique>( std::move(safety), std::forward(closure)); } -template +template +std::unique_ptr ToQueuedTask(const ScopedTaskSafety& safety, + Closure&& closure) { + return ToQueuedTask(safety.flag(), std::forward(closure)); +} + +template ::type>::type, + ScopedTaskSafety>::value>::type* = nullptr> std::unique_ptr ToQueuedTask(Closure&& closure, Cleanup&& cleanup) { return std::make_unique< webrtc_new_closure_impl::ClosureTaskWithCleanup>( diff --git a/rtc_base/task_utils/to_queued_task_unittest.cc b/rtc_base/task_utils/to_queued_task_unittest.cc index e98c81e9ce..261b9e891b 100644 --- a/rtc_base/task_utils/to_queued_task_unittest.cc +++ b/rtc_base/task_utils/to_queued_task_unittest.cc @@ -127,7 +127,8 @@ TEST(ToQueuedTaskTest, AcceptsMoveOnlyCleanup) { } TEST(ToQueuedTaskTest, PendingTaskSafetyFlag) { - PendingTaskSafetyFlag::Pointer flag(PendingTaskSafetyFlag::Create()); + rtc::scoped_refptr flag = + PendingTaskSafetyFlag::Create(); int count = 0; // Create two identical tasks that increment the |count|. diff --git a/video/call_stats2.cc b/video/call_stats2.cc index af0da0f702..ce68127490 100644 --- a/video/call_stats2.cc +++ b/video/call_stats2.cc @@ -76,16 +76,13 @@ CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue) RTC_DCHECK(task_queue_); process_thread_checker_.Detach(); task_queue_->PostDelayedTask( - ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), - kUpdateIntervalMs); + ToQueuedTask(task_safety_, [this]() { RunTimer(); }), kUpdateIntervalMs); } CallStats::~CallStats() { RTC_DCHECK_RUN_ON(&construction_thread_checker_); RTC_DCHECK(observers_.empty()); - task_safety_flag_->SetNotAlive(); - UpdateHistograms(); } @@ -98,7 +95,7 @@ void CallStats::RunTimer() { last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds(); task_queue_->PostDelayedTask( - ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), interval); + ToQueuedTask(task_safety_, [this]() { RunTimer(); }), interval); } void CallStats::UpdateAndReport() { @@ -156,7 +153,7 @@ void CallStats::OnRttUpdate(int64_t rtt) { RTC_DCHECK_RUN_ON(&process_thread_checker_); int64_t now_ms = clock_->TimeInMilliseconds(); - task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this, rtt, now_ms]() { + task_queue_->PostTask(ToQueuedTask(task_safety_, [this, rtt, now_ms]() { RTC_DCHECK_RUN_ON(&construction_thread_checker_); reports_.push_back(RttTime(rtt, now_ms)); if (time_of_first_rtt_ms_ == -1) diff --git a/video/call_stats2.h b/video/call_stats2.h index f06d33daf7..49d2db7d31 100644 --- a/video/call_stats2.h +++ b/video/call_stats2.h @@ -139,8 +139,7 @@ class CallStats { TaskQueueBase* const task_queue_; // Used to signal destruction to potentially pending tasks. - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; RTC_DISALLOW_COPY_AND_ASSIGN(CallStats); }; diff --git a/video/receive_statistics_proxy2.cc b/video/receive_statistics_proxy2.cc index b818eae018..79684f21e6 100644 --- a/video/receive_statistics_proxy2.cc +++ b/video/receive_statistics_proxy2.cc @@ -129,7 +129,6 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy( ReceiveStatisticsProxy::~ReceiveStatisticsProxy() { RTC_DCHECK_RUN_ON(&main_thread_); - task_safety_flag_->SetNotAlive(); } void ReceiveStatisticsProxy::UpdateHistograms( @@ -689,18 +688,17 @@ VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const { void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [payload_type, this]() { - RTC_DCHECK_RUN_ON(&main_thread_); - stats_.current_payload_type = payload_type; - })); + worker_thread_->PostTask(ToQueuedTask(task_safety_, [payload_type, this]() { + RTC_DCHECK_RUN_ON(&main_thread_); + stats_.current_payload_type = payload_type; + })); } void ReceiveStatisticsProxy::OnDecoderImplementationName( const char* implementation_name) { RTC_DCHECK_RUN_ON(&decode_queue_); worker_thread_->PostTask(ToQueuedTask( - task_safety_flag_, [name = std::string(implementation_name), this]() { + task_safety_, [name = std::string(implementation_name), this]() { RTC_DCHECK_RUN_ON(&main_thread_); stats_.decoder_implementation_name = name; })); @@ -715,7 +713,7 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated( int render_delay_ms) { RTC_DCHECK_RUN_ON(&decode_queue_); worker_thread_->PostTask(ToQueuedTask( - task_safety_flag_, + task_safety_, [max_decode_ms, current_delay_ms, target_delay_ms, jitter_buffer_ms, min_playout_delay_ms, render_delay_ms, this]() { RTC_DCHECK_RUN_ON(&main_thread_); @@ -742,7 +740,7 @@ void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) { void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated( const TimingFrameInfo& info) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [info, this]() { + worker_thread_->PostTask(ToQueuedTask(task_safety_, [info, this]() { RTC_DCHECK_RUN_ON(&main_thread_); if (info.flags != VideoSendTiming::kInvalid) { int64_t now_ms = clock_->TimeInMilliseconds(); @@ -777,11 +775,11 @@ void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated( // [main] worker thread. // So until the sender implementation has been updated, we work around this // here by posting the update to the expected thread. We make a by value - // copy of the |task_safety_flag_| to handle the case if the queued task + // copy of the |task_safety_| to handle the case if the queued task // runs after the |ReceiveStatisticsProxy| has been deleted. In such a // case the packet_counter update won't be recorded. worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [ssrc, packet_counter, this]() { + ToQueuedTask(task_safety_, [ssrc, packet_counter, this]() { RtcpPacketTypesCounterUpdated(ssrc, packet_counter); })); return; @@ -810,7 +808,7 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame, // "com.apple.coremedia.decompressionsession.clientcallback" VideoFrameMetaData meta(frame, clock_->CurrentTime()); worker_thread_->PostTask(ToQueuedTask( - task_safety_flag_, [meta, qp, decode_time_ms, content_type, this]() { + task_safety_, [meta, qp, decode_time_ms, content_type, this]() { OnDecodedFrame(meta, qp, decode_time_ms, content_type); })); } @@ -936,8 +934,8 @@ void ReceiveStatisticsProxy::OnSyncOffsetUpdated(int64_t video_playout_ntp_ms, RTC_DCHECK_RUN_ON(&incoming_render_queue_); int64_t now_ms = clock_->TimeInMilliseconds(); worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [video_playout_ntp_ms, sync_offset_ms, - estimated_freq_khz, now_ms, this]() { + ToQueuedTask(task_safety_, [video_playout_ntp_ms, sync_offset_ms, + estimated_freq_khz, now_ms, this]() { RTC_DCHECK_RUN_ON(&main_thread_); sync_offset_counter_.Add(std::abs(sync_offset_ms)); stats_.sync_offset_ms = sync_offset_ms; @@ -990,24 +988,22 @@ void ReceiveStatisticsProxy::OnCompleteFrame(bool is_keyframe, void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [frames_dropped, this]() { - RTC_DCHECK_RUN_ON(&main_thread_); - stats_.frames_dropped += frames_dropped; - })); + worker_thread_->PostTask(ToQueuedTask(task_safety_, [frames_dropped, this]() { + RTC_DCHECK_RUN_ON(&main_thread_); + stats_.frames_dropped += frames_dropped; + })); } void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [codec_type, qp, this]() { - RTC_DCHECK_RUN_ON(&main_thread_); - last_codec_type_ = codec_type; - if (last_codec_type_ == kVideoCodecVP8 && qp != -1) { - qp_counters_.vp8.Add(qp); - qp_sample_.Add(qp); - } - })); + worker_thread_->PostTask(ToQueuedTask(task_safety_, [codec_type, qp, this]() { + RTC_DCHECK_RUN_ON(&main_thread_); + last_codec_type_ = codec_type; + if (last_codec_type_ == kVideoCodecVP8 && qp != -1) { + qp_counters_.vp8.Add(qp); + qp_sample_.Add(qp); + } + })); } void ReceiveStatisticsProxy::OnStreamInactive() { diff --git a/video/receive_statistics_proxy2.h b/video/receive_statistics_proxy2.h index d6f6f1cc21..1357c407ad 100644 --- a/video/receive_statistics_proxy2.h +++ b/video/receive_statistics_proxy2.h @@ -211,8 +211,7 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, // methods are invoked on such as GetStats(). TaskQueueBase* const worker_thread_; - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; SequenceChecker decode_queue_; rtc::ThreadChecker main_thread_; diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc index 116cf2879b..7e3bed1467 100644 --- a/video/rtp_streams_synchronizer2.cc +++ b/video/rtp_streams_synchronizer2.cc @@ -47,7 +47,6 @@ RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue, RtpStreamsSynchronizer::~RtpStreamsSynchronizer() { RTC_DCHECK_RUN_ON(&main_checker_); - task_safety_flag_->SetNotAlive(); } void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) { @@ -85,13 +84,12 @@ void RtpStreamsSynchronizer::QueueTimer() { } RTC_DCHECK_LE(delay, kSyncIntervalMs); - task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] { - if (!safety->alive()) - return; - RTC_DCHECK_RUN_ON(&main_checker_); - timer_running_ = false; - UpdateDelay(); - }), + task_queue_->PostDelayedTask(ToQueuedTask(task_safety_, + [this] { + RTC_DCHECK_RUN_ON(&main_checker_); + timer_running_ = false; + UpdateDelay(); + }), delay); } diff --git a/video/rtp_streams_synchronizer2.h b/video/rtp_streams_synchronizer2.h index 353434e6a9..83dd0fb6f2 100644 --- a/video/rtp_streams_synchronizer2.h +++ b/video/rtp_streams_synchronizer2.h @@ -70,8 +70,7 @@ class RtpStreamsSynchronizer { bool timer_running_ RTC_GUARDED_BY(main_checker_) = false; // Used to signal destruction to potentially pending tasks. - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; }; } // namespace internal diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index 510c2602c4..b1b482da29 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -269,7 +269,6 @@ VideoReceiveStream2::~VideoReceiveStream2() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); RTC_LOG(LS_INFO) << "~VideoReceiveStream2: " << config_.ToString(); Stop(); - task_safety_flag_->SetNotAlive(); } void VideoReceiveStream2::SignalNetworkState(NetworkState state) { @@ -491,7 +490,7 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) { VideoFrameMetaData frame_meta(video_frame, clock_->CurrentTime()); worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [frame_meta, this]() { + ToQueuedTask(task_safety_, [frame_meta, this]() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); int64_t video_playout_ntp_ms; int64_t sync_offset_ms; @@ -703,7 +702,7 @@ void VideoReceiveStream2::HandleFrameBufferTimeout() { // check if we have received a packet within the last 5 seconds. bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000; if (!stream_is_active) { - worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [this]() { + worker_thread_->PostTask(ToQueuedTask(task_safety_, [this]() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); stats_proxy_.OnStreamInactive(); })); diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index bbed08a7a6..f8cd65dc9d 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -255,8 +255,7 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, rtc::TaskQueue decode_queue_; // Used to signal destruction to potentially pending tasks. - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; }; } // namespace internal } // namespace webrtc