diff --git a/call/BUILD.gn b/call/BUILD.gn index b72afed446..e03f15df54 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -526,6 +526,7 @@ if (rtc_include_tests) { ] absl_deps = [ "//third_party/abseil-cpp/absl/container:inlined_vector", + "//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc index 1f9b662490..54a683bb81 100644 --- a/call/adaptation/resource_adaptation_processor.cc +++ b/call/adaptation/resource_adaptation_processor.cc @@ -41,11 +41,11 @@ void ResourceAdaptationProcessor::ResourceListenerDelegate:: OnResourceUsageStateMeasured(rtc::scoped_refptr resource, ResourceUsageState usage_state) { if (!task_queue_->IsCurrent()) { - task_queue_->PostTask(ToQueuedTask( + task_queue_->PostTask( [this_ref = rtc::scoped_refptr(this), resource, usage_state] { this_ref->OnResourceUsageStateMeasured(resource, usage_state); - })); + }); return; } RTC_DCHECK_RUN_ON(task_queue_); @@ -142,8 +142,8 @@ void ResourceAdaptationProcessor::RemoveResource( void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource( rtc::scoped_refptr resource) { if (!task_queue_->IsCurrent()) { - task_queue_->PostTask(ToQueuedTask( - [this, resource]() { RemoveLimitationsImposedByResource(resource); })); + task_queue_->PostTask( + [this, resource]() { RemoveLimitationsImposedByResource(resource); }); return; } RTC_DCHECK_RUN_ON(task_queue_); diff --git a/call/adaptation/resource_adaptation_processor_unittest.cc b/call/adaptation/resource_adaptation_processor_unittest.cc index 97c01b395e..7f09e22e6c 100644 --- a/call/adaptation/resource_adaptation_processor_unittest.cc +++ b/call/adaptation/resource_adaptation_processor_unittest.cc @@ -430,8 +430,8 @@ TEST_F(ResourceAdaptationProcessorTest, SetInputStates(true, kDefaultFrameRate, kDefaultFrameSize); TaskQueueForTest resource_task_queue("ResourceTaskQueue"); - resource_task_queue.PostTask(ToQueuedTask( - [&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); })); + resource_task_queue.PostTask( + [&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); }); EXPECT_EQ_WAIT(1u, restrictions_listener_.restrictions_updated_count(), kDefaultTimeoutMs); @@ -447,10 +447,10 @@ TEST_F(ResourceAdaptationProcessorTest, // has passed it on to the processor's task queue. rtc::Event resource_event; TaskQueueForTest resource_task_queue("ResourceTaskQueue"); - resource_task_queue.PostTask(ToQueuedTask([&]() { + resource_task_queue.PostTask([&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); resource_event.Set(); - })); + }); EXPECT_TRUE(resource_event.Wait(kDefaultTimeoutMs)); // Now destroy the processor while handling the overuse is in flight. @@ -470,10 +470,10 @@ TEST_F(ResourceAdaptationProcessorTest, rtc::Event overuse_event; TaskQueueForTest resource_task_queue("ResourceTaskQueue"); // Queues task for `resource_` overuse while `processor_` is still listening. - resource_task_queue.PostTask(ToQueuedTask([&]() { + resource_task_queue.PostTask([&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); overuse_event.Set(); - })); + }); EXPECT_TRUE(overuse_event.Wait(kDefaultTimeoutMs)); // Once we know the overuse task is queued, remove `resource_` so that // `processor_` is not listening to it. diff --git a/call/call.cc b/call/call.cc index ad08b8a6f3..9be095320e 100644 --- a/call/call.cc +++ b/call/call.cc @@ -1209,14 +1209,14 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { } else { // TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to // post to the worker thread. - worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure))); + worker_thread_->PostTask(SafeTask(task_safety_.flag(), std::move(closure))); } } void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) { RTC_DCHECK_RUN_ON(network_thread_); worker_thread_->PostTask( - ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() { + SafeTask(task_safety_.flag(), [this, transport_overhead_per_packet]() { // TODO(bugs.webrtc.org/11993): Move this over to the network thread. RTC_DCHECK_RUN_ON(worker_thread_); for (auto& kv : audio_send_ssrcs_) { @@ -1408,7 +1408,7 @@ void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) { // TODO(bugs.webrtc.org/11993): This should execute directly on the network // thread. worker_thread_->PostTask( - ToQueuedTask(task_safety_, [this, packet = std::move(packet)]() { + SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() { RTC_DCHECK_RUN_ON(worker_thread_); receive_stats_.AddReceivedRtcpBytes(static_cast(packet.size())); diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc index 5be6385d5e..54892f0e49 100644 --- a/call/call_perf_tests.cc +++ b/call/call_perf_tests.cc @@ -113,7 +113,7 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver, task_queue_(task_queue) {} void OnFrame(const VideoFrame& video_frame) override { - task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); })); + task_queue_->PostTask([this]() { CheckStats(); }); } void CheckStats() { @@ -343,7 +343,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, } task_queue()->PostTask( - ToQueuedTask([to_delete = observer.release()]() { delete to_delete; })); + [to_delete = observer.release()]() { delete to_delete; }); } TEST_F(CallPerfTest, Synchronization_PlaysOutAudioAndVideoWithoutClockDrift) { @@ -680,7 +680,7 @@ void CallPerfTest::TestMinTransmitBitrate(bool pad_to_min_bitrate) { private: // TODO(holmer): Run this with a timer instead of once per packet. Action OnSendRtp(const uint8_t* packet, size_t length) override { - task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() { + task_queue_->PostTask(SafeTask(task_safety_flag_, [this]() { VideoSendStream::Stats stats = send_stream_->GetStats(); if (!stats.substreams.empty()) { @@ -1146,7 +1146,7 @@ void CallPerfTest::TestEncodeFramerate(VideoEncoderFactory* encoder_factory, const Timestamp now = clock_->CurrentTime(); if (now - last_getstats_time_ > kMinGetStatsInterval) { last_getstats_time_ = now; - task_queue_->PostTask(ToQueuedTask([this, now]() { + task_queue_->PostTask([this, now]() { VideoSendStream::Stats stats = send_stream_->GetStats(); for (const auto& stat : stats.substreams) { encode_frame_rate_lists_[stat.first].push_back( @@ -1156,7 +1156,7 @@ void CallPerfTest::TestEncodeFramerate(VideoEncoderFactory* encoder_factory, VerifyStats(); observation_complete_.Set(); } - })); + }); } return SEND_PACKET; } diff --git a/call/degraded_call.cc b/call/degraded_call.cc index 2c01c997c4..bc3f587dbc 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -62,20 +62,20 @@ bool DegradedCall::FakeNetworkPipeOnTaskQueue::Process() { return false; } - task_queue_->PostTask(ToQueuedTask(task_safety_, [this, time_to_next] { + task_queue_->PostTask(SafeTask(task_safety_.flag(), [this, time_to_next] { RTC_DCHECK_RUN_ON(task_queue_); int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds(); if (!next_process_ms_ || next_process_time < *next_process_ms_) { next_process_ms_ = next_process_time; task_queue_->PostDelayedHighPrecisionTask( - ToQueuedTask(task_safety_, - [this] { - RTC_DCHECK_RUN_ON(task_queue_); - if (!Process()) { - next_process_ms_.reset(); - } - }), - *time_to_next); + SafeTask(task_safety_.flag(), + [this] { + RTC_DCHECK_RUN_ON(task_queue_); + if (!Process()) { + next_process_ms_.reset(); + } + }), + TimeDelta::Millis(*time_to_next)); } })); @@ -146,8 +146,9 @@ DegradedCall::DegradedCall( receive_pipe_->SetReceiver(call_->Receiver()); if (receive_configs_.size() > 1) { call_->network_thread()->PostDelayedTask( - ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }), - receive_configs_[0].duration.ms()); + SafeTask(task_safety_.flag(), + [this] { UpdateReceiveNetworkConfig(); }), + receive_configs_[0].duration); } } if (!send_configs_.empty()) { @@ -157,8 +158,8 @@ DegradedCall::DegradedCall( call_->network_thread(), task_safety_, clock_, std::move(network)); if (send_configs_.size() > 1) { call_->network_thread()->PostDelayedTask( - ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }), - send_configs_[0].duration.ms()); + SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }), + send_configs_[0].duration); } } } @@ -352,8 +353,8 @@ void DegradedCall::UpdateSendNetworkConfig() { send_config_index_ = (send_config_index_ + 1) % send_configs_.size(); send_simulated_network_->SetConfig(send_configs_[send_config_index_]); call_->network_thread()->PostDelayedTask( - ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }), - send_configs_[send_config_index_].duration.ms()); + SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }), + send_configs_[send_config_index_].duration); } void DegradedCall::UpdateReceiveNetworkConfig() { @@ -361,7 +362,7 @@ void DegradedCall::UpdateReceiveNetworkConfig() { receive_simulated_network_->SetConfig( receive_configs_[receive_config_index_]); call_->network_thread()->PostDelayedTask( - ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }), - receive_configs_[receive_config_index_].duration.ms()); + SafeTask(task_safety_.flag(), [this] { UpdateReceiveNetworkConfig(); }), + receive_configs_[receive_config_index_].duration); } } // namespace webrtc diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index e59ea745a0..b64cc7897f 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -15,6 +15,7 @@ #include #include +#include "absl/functional/any_invocable.h" #include "call/rtp_transport_controller_send.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/byte_io.h" @@ -203,10 +204,9 @@ class RtpVideoSenderTestFixture { // default thread as the transport queue, explicit checks for the transport // queue (not just using a SequenceChecker) aren't possible unless such a // queue is actually active. So RunOnTransportQueue is a convenience function - // that allow for running a closure on the transport queue, similar to + // that allow for running a `task` on the transport queue, similar to // SendTask(). - template - void RunOnTransportQueue(Closure&& task) { + void RunOnTransportQueue(absl::AnyInvocable task) { transport_controller_.GetWorkerQueue()->PostTask(std::move(task)); AdvanceTime(TimeDelta::Millis(0)); }