Migrate video/ to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: Ibd98d3a0c548443578953ef3e25aee9919eea3d3
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267980
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37465}
This commit is contained in:
Danil Chapovalov 2022-07-06 10:14:29 +02:00 committed by WebRTC LUCI CQ
parent f9f9d544a5
commit 95eeaa7aca
28 changed files with 158 additions and 170 deletions

View File

@ -74,7 +74,6 @@ rtc_library("video") {
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/units:frequency",
"../api/units:time_delta",
"../api/units:timestamp",
@ -228,7 +227,6 @@ rtc_library("frame_cadence_adapter") {
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../api/units:timestamp",
"../api/video:video_frame",
@ -309,7 +307,6 @@ rtc_library("task_queue_frame_decode_scheduler") {
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/units:timestamp",
"../rtc_base:checks",
"../system_wrappers",
@ -395,7 +392,6 @@ rtc_library("video_stream_encoder_impl") {
"../api/adaptation:resource_adaptation_api",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue",
"../api/task_queue:to_queued_task",
"../api/units:data_rate",
"../api/video:encoded_image",
"../api/video:render_resolution",
@ -452,6 +448,7 @@ rtc_library("video_stream_encoder_impl") {
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/types:optional",
]
}
@ -810,7 +807,6 @@ if (rtc_include_tests) {
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
"../api/task_queue:to_queued_task",
"../api/test/video:function_video_factory",
"../api/units:data_rate",
"../api/units:frequency",
@ -917,6 +913,7 @@ if (rtc_include_tests) {
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/functional:bind_front",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",

View File

@ -39,7 +39,6 @@ rtc_library("video_adaptation") {
"../../api:sequence_checker",
"../../api/adaptation:resource_adaptation_api",
"../../api/task_queue:task_queue",
"../../api/task_queue:to_queued_task",
"../../api/units:data_rate",
"../../api/units:time_delta",
"../../api/video:video_adaptation",
@ -93,7 +92,6 @@ if (rtc_include_tests) {
"../../api:field_trials_view",
"../../api:scoped_refptr",
"../../api/task_queue:task_queue",
"../../api/task_queue:to_queued_task",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../api/video:encoded_image",
@ -119,6 +117,9 @@ if (rtc_include_tests) {
"../../test:test_support",
"../../test/time_controller:time_controller",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/types:optional",
]
}
}

View File

@ -14,7 +14,6 @@
#include <utility>
#include "api/sequence_checker.h"
#include "api/task_queue/to_queued_task.h"
namespace webrtc {

View File

@ -12,7 +12,6 @@
#include <utility>
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h"
#include "rtc_base/experiments/balanced_degradation_settings.h"
#include "rtc_base/logging.h"

View File

@ -13,7 +13,7 @@
#include <memory>
#include <utility>
#include "api/task_queue/to_queued_task.h"
#include "absl/functional/any_invocable.h"
#include "api/units/timestamp.h"
#include "call/adaptation/test/fake_video_stream_input_state_provider.h"
#include "call/adaptation/test/mock_resource_listener.h"
@ -46,7 +46,7 @@ class PixelLimitResourceTest : public ::testing::Test {
input_state_provider_.SetInputState(current_pixels, 30, current_pixels);
}
void RunTaskOnTaskQueue(std::unique_ptr<QueuedTask> task) {
void RunTaskOnTaskQueue(absl::AnyInvocable<void() &&> task) {
task_queue_->PostTask(std::move(task));
time_controller_.AdvanceTime(TimeDelta::Millis(0));
}
@ -63,7 +63,7 @@ TEST_F(PixelLimitResourceTest, ResourceIsSilentByDefault) {
// Because our mock is strick, the test would fail if
// OnResourceUsageStateMeasured() is invoked.
testing::StrictMock<MockResourceListener> resource_listener;
RunTaskOnTaskQueue(ToQueuedTask([&]() {
RunTaskOnTaskQueue([&]() {
rtc::scoped_refptr<PixelLimitResource> pixel_limit_resource =
PixelLimitResource::Create(task_queue_.get(), &input_state_provider_);
pixel_limit_resource->SetResourceListener(&resource_listener);
@ -72,14 +72,14 @@ TEST_F(PixelLimitResourceTest, ResourceIsSilentByDefault) {
// Advance a significant amount of time.
time_controller_.AdvanceTime(kResourceUsageCheckIntervalMs * 10);
pixel_limit_resource->SetResourceListener(nullptr);
}));
});
}
TEST_F(PixelLimitResourceTest,
OveruseIsReportedWhileCurrentPixelsIsGreaterThanMaxPixels) {
constexpr int kMaxPixels = 640 * 480;
testing::StrictMock<MockResourceListener> resource_listener;
RunTaskOnTaskQueue(ToQueuedTask([&]() {
RunTaskOnTaskQueue([&]() {
rtc::scoped_refptr<PixelLimitResource> pixel_limit_resource =
PixelLimitResource::Create(task_queue_.get(), &input_state_provider_);
pixel_limit_resource->SetResourceListener(&resource_listener);
@ -106,7 +106,7 @@ TEST_F(PixelLimitResourceTest,
time_controller_.AdvanceTime(kResourceUsageCheckIntervalMs * 3);
pixel_limit_resource->SetResourceListener(nullptr);
}));
});
}
TEST_F(PixelLimitResourceTest,
@ -114,7 +114,7 @@ TEST_F(PixelLimitResourceTest,
constexpr int kMaxPixels = 640 * 480;
const int kMinPixels = GetLowerResolutionThan(kMaxPixels);
testing::StrictMock<MockResourceListener> resource_listener;
RunTaskOnTaskQueue(ToQueuedTask([&]() {
RunTaskOnTaskQueue([&]() {
rtc::scoped_refptr<PixelLimitResource> pixel_limit_resource =
PixelLimitResource::Create(task_queue_.get(), &input_state_provider_);
pixel_limit_resource->SetResourceListener(&resource_listener);
@ -141,7 +141,7 @@ TEST_F(PixelLimitResourceTest,
time_controller_.AdvanceTime(kResourceUsageCheckIntervalMs * 3);
pixel_limit_resource->SetResourceListener(nullptr);
}));
});
}
} // namespace webrtc

View File

@ -12,7 +12,6 @@
#include <utility>
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h"
#include "rtc_base/experiments/balanced_degradation_settings.h"
#include "rtc_base/time_utils.h"

View File

@ -15,7 +15,6 @@
#include <utility>
#include "absl/algorithm/container.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "system_wrappers/include/metrics.h"
@ -147,7 +146,7 @@ void CallStats::OnRttUpdate(int64_t rtt) {
if (task_queue_->IsCurrent()) {
update();
} else {
task_queue_->PostTask(ToQueuedTask(task_safety_, std::move(update)));
task_queue_->PostTask(SafeTask(task_safety_.flag(), std::move(update)));
}
}

View File

@ -13,7 +13,6 @@
#include <memory>
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/to_queued_task.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "rtc_base/thread.h"
#include "system_wrappers/include/metrics.h"
@ -43,14 +42,13 @@ class CallStats2Test : public ::testing::Test {
// Queues an rtt update call on the process thread.
void AsyncSimulateRttUpdate(int64_t rtt) {
RtcpRttStats* rtcp_rtt_stats = call_stats_.AsRtcpRttStats();
task_queue_.PostTask(ToQueuedTask(
[rtcp_rtt_stats, rtt] { rtcp_rtt_stats->OnRttUpdate(rtt); }));
task_queue_.PostTask(
[rtcp_rtt_stats, rtt] { rtcp_rtt_stats->OnRttUpdate(rtt); });
}
protected:
void FlushProcessAndWorker() {
task_queue_.PostTask(
ToQueuedTask([this] { loop_.PostTask([this]() { loop_.Quit(); }); }));
task_queue_.PostTask([this] { loop_.PostTask([this] { loop_.Quit(); }); });
loop_.Run();
}

View File

@ -11,8 +11,8 @@
#include <memory>
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/test/simulated_network.h"
#include "api/units/time_delta.h"
#include "api/video/builtin_video_bitrate_allocator_factory.h"
#include "api/video/video_bitrate_allocation.h"
#include "call/fake_network_pipe.h"
@ -130,7 +130,7 @@ class BandwidthStatsTest : public test::EndToEndTest {
Action OnSendRtp(const uint8_t* packet, size_t length) override {
// Stats need to be fetched on the thread where the caller objects were
// constructed.
task_queue_->PostTask(ToQueuedTask([this]() {
task_queue_->PostTask([this]() {
if (!sender_call_ || !receiver_call_) {
return;
}
@ -146,7 +146,7 @@ class BandwidthStatsTest : public test::EndToEndTest {
observation_complete_.Set();
}
}
}));
});
return SEND_PACKET;
}
@ -248,7 +248,7 @@ TEST_F(BandwidthEndToEndTest, RembWithSendSideBwe) {
void OnCallsCreated(Call* sender_call, Call* receiver_call) override {
RTC_DCHECK(sender_call);
sender_call_ = sender_call;
task_queue_->PostTask(ToQueuedTask([this]() { PollStats(); }));
task_queue_->PostTask([this]() { PollStats(); });
}
void PollStats() {
@ -284,7 +284,8 @@ TEST_F(BandwidthEndToEndTest, RembWithSendSideBwe) {
break;
}
task_queue_->PostDelayedTask(ToQueuedTask([this] { PollStats(); }), 1000);
task_queue_->PostDelayedTask([this] { PollStats(); },
TimeDelta::Seconds(1));
}
void PerformTest() override {

View File

@ -14,7 +14,6 @@
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/task_queue/to_queued_task.h"
#include "api/test/simulated_network.h"
#include "api/video_codecs/video_encoder.h"
#include "call/fake_network_pipe.h"

View File

@ -14,6 +14,7 @@
#include "api/task_queue/task_queue_base.h"
#include "api/test/simulated_network.h"
#include "api/test/video/function_video_encoder_factory.h"
#include "api/units/time_delta.h"
#include "call/fake_network_pipe.h"
#include "call/simulated_network.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
@ -204,7 +205,7 @@ TEST_F(RetransmissionEndToEndTest, ReceivesNackAndRetransmitsAudio) {
TEST_F(RetransmissionEndToEndTest,
StopSendingKeyframeRequestsForInactiveStream) {
class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask {
class KeyframeRequestObserver : public test::EndToEndTest {
public:
explicit KeyframeRequestObserver(TaskQueueBase* task_queue)
: clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {}
@ -221,7 +222,7 @@ TEST_F(RetransmissionEndToEndTest,
test::RtcpPacketParser parser;
EXPECT_TRUE(parser.Parse(packet, length));
if (parser.pli()->num_packets() > 0)
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
task_queue_->PostTask([this] { Run(); });
return SEND_PACKET;
}
@ -229,7 +230,7 @@ TEST_F(RetransmissionEndToEndTest,
if (receive_stream_->GetStats().frames_decoded > 0) {
frame_decoded_ = true;
} else if (clock_->TimeInMilliseconds() - start_time_ < 5000) {
task_queue_->PostDelayedTask(std::unique_ptr<QueuedTask>(this), 100);
task_queue_->PostDelayedTask([this] { Run(); }, TimeDelta::Millis(100));
return false;
}
return true;
@ -237,11 +238,11 @@ TEST_F(RetransmissionEndToEndTest,
void PerformTest() override {
start_time_ = clock_->TimeInMilliseconds();
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
task_queue_->PostTask([this] { Run(); });
test_done_.Wait(rtc::Event::kForever);
}
bool Run() override {
void Run() {
if (!frame_decoded_) {
if (PollStats()) {
send_stream_->Stop();
@ -259,7 +260,6 @@ TEST_F(RetransmissionEndToEndTest,
receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
test_done_.Set();
}
return false;
}
private:

View File

@ -416,7 +416,7 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
Action OnSendRtp(const uint8_t* packet, size_t length) override {
if (sent_rtp_ >= kNumRtpPacketsToSend) {
// Need to check the stats on the correct thread.
task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
task_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
VideoReceiveStreamInterface::Stats stats =
receive_stream_->GetStats();
if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
@ -601,7 +601,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
}
}
task_queue_->PostTask(
ToQueuedTask(task_safety_flag_, [this]() { VerifyStats(); }));
SafeTask(task_safety_flag_, [this]() { VerifyStats(); }));
return SEND_PACKET;
}

View File

@ -91,7 +91,7 @@ class FrameBuffer2Proxy : public FrameBufferProxy {
void StartNextDecode(bool keyframe_required) override {
if (!decode_queue_->IsCurrent()) {
decode_queue_->PostTask(ToQueuedTask(
decode_queue_->PostTask(SafeTask(
decode_safety_,
[this, keyframe_required] { StartNextDecode(keyframe_required); }));
return;
@ -272,8 +272,8 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
void StartNextDecode(bool keyframe_required) override {
if (!worker_queue_->IsCurrent()) {
worker_queue_->PostTask(ToQueuedTask(
worker_safety_,
worker_queue_->PostTask(SafeTask(
worker_safety_.flag(),
[this, keyframe_required] { StartNextDecode(keyframe_required); }));
return;
}
@ -361,8 +361,8 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
decoder_ready_for_new_frame_ = false;
// VideoReceiveStream2 wants frames on the decoder thread.
decode_queue_->PostTask(ToQueuedTask(
decode_safety_, [this, frame = std::move(frame)]() mutable {
decode_queue_->PostTask(
SafeTask(decode_safety_, [this, frame = std::move(frame)]() mutable {
receiver_->OnEncodedFrame(std::move(frame));
}));
}

View File

@ -21,7 +21,6 @@
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "api/video/video_frame.h"
@ -378,12 +377,12 @@ void ZeroHertzAdapterMode::OnFrame(Timestamp post_time,
current_frame_id_++;
scheduled_repeat_ = absl::nullopt;
queue_->PostDelayedHighPrecisionTask(
ToQueuedTask(safety_,
[this] {
RTC_DCHECK_RUN_ON(&sequence_checker_);
ProcessOnDelayedCadence();
}),
frame_delay_.ms());
SafeTask(safety_.flag(),
[this] {
RTC_DCHECK_RUN_ON(&sequence_checker_);
ProcessOnDelayedCadence();
}),
frame_delay_);
}
void ZeroHertzAdapterMode::OnDiscardedFrame() {
@ -500,12 +499,12 @@ void ZeroHertzAdapterMode::ScheduleRepeat(int frame_id, bool idle_repeat) {
TimeDelta repeat_delay = RepeatDuration(idle_repeat);
queue_->PostDelayedHighPrecisionTask(
ToQueuedTask(safety_,
[this, frame_id] {
RTC_DCHECK_RUN_ON(&sequence_checker_);
ProcessRepeatedFrameOnDelayedCadence(frame_id);
}),
repeat_delay.ms());
SafeTask(safety_.flag(),
[this, frame_id] {
RTC_DCHECK_RUN_ON(&sequence_checker_);
ProcessRepeatedFrameOnDelayedCadence(frame_id);
}),
repeat_delay);
}
// RTC_RUN_ON(&sequence_checker_)
@ -654,7 +653,7 @@ void FrameCadenceAdapterImpl::OnFrame(const VideoFrame& frame) {
// Local time in webrtc time base.
Timestamp post_time = clock_->CurrentTime();
frames_scheduled_for_processing_.fetch_add(1, std::memory_order_relaxed);
queue_->PostTask(ToQueuedTask(safety_.flag(), [this, post_time, frame] {
queue_->PostTask(SafeTask(safety_.flag(), [this, post_time, frame] {
RTC_DCHECK_RUN_ON(queue_);
if (zero_hertz_adapter_created_timestamp_.has_value()) {
TimeDelta time_until_first_frame =
@ -676,7 +675,7 @@ void FrameCadenceAdapterImpl::OnFrame(const VideoFrame& frame) {
void FrameCadenceAdapterImpl::OnDiscardedFrame() {
callback_->OnDiscardedFrame();
queue_->PostTask(ToQueuedTask(safety_.flag(), [this] {
queue_->PostTask(SafeTask(safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(queue_);
if (zero_hertz_adapter_) {
zero_hertz_adapter_->OnDiscardedFrame();
@ -689,7 +688,7 @@ void FrameCadenceAdapterImpl::OnConstraintsChanged(
RTC_LOG(LS_INFO) << __func__ << " this " << this << " min_fps "
<< constraints.min_fps.value_or(-1) << " max_fps "
<< constraints.max_fps.value_or(-1);
queue_->PostTask(ToQueuedTask(safety_.flag(), [this, constraints] {
queue_->PostTask(SafeTask(safety_.flag(), [this, constraints] {
RTC_DCHECK_RUN_ON(queue_);
bool was_zero_hertz_enabled = IsZeroHertzScreenshareEnabled();
source_constraints_ = constraints;

View File

@ -13,10 +13,11 @@
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "api/video/nv12_buffer.h"
#include "api/video/video_frame.h"
@ -643,10 +644,8 @@ class ZeroHertzLayerQualityConvergenceTest : public ::testing::Test {
}
}
void ScheduleDelayed(TimeDelta delay, std::function<void()> function) {
TaskQueueBase::Current()->PostDelayedTask(
ToQueuedTask([function = std::move(function)] { function(); }),
delay.ms());
void ScheduleDelayed(TimeDelta delay, absl::AnyInvocable<void() &&> task) {
TaskQueueBase::Current()->PostDelayedTask(std::move(task), delay);
}
protected:
@ -1042,7 +1041,7 @@ TEST(FrameCadenceAdapterRealTimeTest, TimestampsDoNotDrift) {
int64_t original_ntp_time_ms;
int64_t original_timestamp_us;
rtc::Event event;
queue->PostTask(ToQueuedTask([&] {
queue->PostTask([&] {
adapter = CreateAdapter(enabler, clock);
adapter->Initialize(&callback);
adapter->SetZeroHertzModeEnabled(
@ -1070,13 +1069,13 @@ TEST(FrameCadenceAdapterRealTimeTest, TimestampsDoNotDrift) {
}
}));
adapter->OnFrame(frame);
}));
});
event.Wait(rtc::Event::kForever);
rtc::Event finalized;
queue->PostTask(ToQueuedTask([&] {
queue->PostTask([&] {
adapter = nullptr;
finalized.Set();
}));
});
finalized.Wait(rtc::Event::kForever);
}

View File

@ -14,7 +14,6 @@
#include <cmath>
#include <utility>
#include "api/task_queue/to_queued_task.h"
#include "modules/video_coding/include/video_codec_interface.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
@ -642,7 +641,7 @@ VideoReceiveStreamInterface::Stats ReceiveStatisticsProxy::GetStats() const {
void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(task_safety_, [payload_type, this]() {
worker_thread_->PostTask(SafeTask(task_safety_.flag(), [payload_type, this] {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.current_payload_type = payload_type;
}));
@ -651,8 +650,8 @@ void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) {
void ReceiveStatisticsProxy::OnDecoderImplementationName(
const char* implementation_name) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(
task_safety_, [name = std::string(implementation_name), this]() {
worker_thread_->PostTask(SafeTask(
task_safety_.flag(), [name = std::string(implementation_name), this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.decoder_implementation_name = name;
}));
@ -668,8 +667,8 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated(
// Only called on main_thread_ with FrameBuffer3
if (!worker_thread_->IsCurrent()) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(
task_safety_,
worker_thread_->PostTask(SafeTask(
task_safety_.flag(),
[max_decode_ms, current_delay_ms, target_delay_ms, jitter_buffer_ms,
min_playout_delay_ms, render_delay_ms, this]() {
OnFrameBufferTimingsUpdated(max_decode_ms, current_delay_ms,
@ -704,8 +703,9 @@ void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
// Only called on main_thread_ with FrameBuffer3
if (!worker_thread_->IsCurrent()) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(
task_safety_, [info, this]() { OnTimingFrameInfoUpdated(info); }));
worker_thread_->PostTask(SafeTask(task_safety_.flag(), [info, this]() {
OnTimingFrameInfoUpdated(info);
}));
return;
}
RTC_DCHECK_RUN_ON(&main_thread_);
@ -745,7 +745,7 @@ void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated(
// runs after the `ReceiveStatisticsProxy` has been deleted. In such a
// case the packet_counter update won't be recorded.
worker_thread_->PostTask(
ToQueuedTask(task_safety_, [ssrc, packet_counter, this]() {
SafeTask(task_safety_.flag(), [ssrc, packet_counter, this]() {
RtcpPacketTypesCounterUpdated(ssrc, packet_counter);
}));
return;
@ -792,8 +792,8 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame,
// "com.apple.coremedia.decompressionsession.clientcallback"
VideoFrameMetaData meta(frame, current_time);
worker_thread_->PostTask(
ToQueuedTask(task_safety_, [meta, qp, decode_time, processing_delay,
assembly_time, content_type, this]() {
SafeTask(task_safety_.flag(), [meta, qp, decode_time, processing_delay,
assembly_time, content_type, this]() {
OnDecodedFrame(meta, qp, decode_time, processing_delay, assembly_time,
content_type);
}));
@ -973,22 +973,24 @@ void ReceiveStatisticsProxy::OnCompleteFrame(bool is_keyframe,
void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) {
// Can be called on either the decode queue or the worker thread
// See FrameBuffer2 for more details.
worker_thread_->PostTask(ToQueuedTask(task_safety_, [frames_dropped, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.frames_dropped += frames_dropped;
}));
worker_thread_->PostTask(
SafeTask(task_safety_.flag(), [frames_dropped, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.frames_dropped += frames_dropped;
}));
}
void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(task_safety_, [codec_type, qp, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
last_codec_type_ = codec_type;
if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
qp_counters_.vp8.Add(qp);
qp_sample_.Add(qp);
}
}));
worker_thread_->PostTask(
SafeTask(task_safety_.flag(), [codec_type, qp, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
last_codec_type_ = codec_type;
if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
qp_counters_.vp8.Add(qp);
qp_sample_.Add(qp);
}
}));
}
void ReceiveStatisticsProxy::OnStreamInactive() {

View File

@ -18,7 +18,6 @@
#include "absl/types/optional.h"
#include "api/scoped_refptr.h"
#include "api/task_queue/to_queued_task.h"
#include "api/video/i420_buffer.h"
#include "api/video/video_frame.h"
#include "api/video/video_frame_buffer.h"

View File

@ -14,7 +14,6 @@
#include <vector>
#include "absl/memory/memory.h"
#include "api/task_queue/to_queued_task.h"
#include "modules/rtp_rtcp/source/rtp_descriptor_authentication.h"
#include "rtc_base/thread.h"
@ -104,10 +103,10 @@ void RtpVideoStreamReceiverFrameTransformerDelegate::OnTransformedFrame(
std::unique_ptr<TransformableFrameInterface> frame) {
rtc::scoped_refptr<RtpVideoStreamReceiverFrameTransformerDelegate> delegate(
this);
network_thread_->PostTask(ToQueuedTask(
network_thread_->PostTask(
[delegate = std::move(delegate), frame = std::move(frame)]() mutable {
delegate->ManageFrame(std::move(frame));
}));
});
}
void RtpVideoStreamReceiverFrameTransformerDelegate::ManageFrame(

View File

@ -17,7 +17,6 @@
#include "absl/memory/memory.h"
#include "api/call/transport.h"
#include "api/task_queue/to_queued_task.h"
#include "call/video_receive_stream.h"
#include "modules/rtp_rtcp/source/rtp_descriptor_authentication.h"
#include "rtc_base/event.h"

View File

@ -14,7 +14,6 @@
#include <utility>
#include "api/sequence_checker.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h"
namespace webrtc {
@ -46,17 +45,17 @@ void TaskQueueFrameDecodeScheduler::ScheduleFrame(
TimeDelta wait = std::max(
TimeDelta::Zero(), schedule.latest_decode_time - clock_->CurrentTime());
bookkeeping_queue_->PostDelayedTask(
ToQueuedTask(task_safety_.flag(),
[this, rtp, schedule, cb = std::move(cb)] {
RTC_DCHECK_RUN_ON(bookkeeping_queue_);
// If the next frame rtp has changed since this task was
// this scheduled release should be skipped.
if (scheduled_rtp_ != rtp)
return;
scheduled_rtp_ = absl::nullopt;
cb(rtp, schedule.render_time);
}),
wait.ms());
SafeTask(task_safety_.flag(),
[this, rtp, schedule, cb = std::move(cb)] {
RTC_DCHECK_RUN_ON(bookkeeping_queue_);
// If the next frame rtp has changed since this task was
// this scheduled release should be skipped.
if (scheduled_rtp_ != rtp)
return;
scheduled_rtp_ = absl::nullopt;
cb(rtp, schedule.render_time);
}),
wait);
}
void TaskQueueFrameDecodeScheduler::CancelOutstanding() {

View File

@ -15,6 +15,7 @@
#include <memory>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/functional/bind_front.h"
#include "absl/types/optional.h"
#include "api/units/time_delta.h"
@ -55,9 +56,8 @@ class TaskQueueFrameDecodeSchedulerTest : public ::testing::Test {
}
protected:
template <class Task>
void OnQueue(Task&& t) {
task_queue_.PostTask(std::forward<Task>(t));
void OnQueue(absl::AnyInvocable<void() &&> t) {
task_queue_.PostTask(std::move(t));
time_controller_.AdvanceTime(TimeDelta::Zero());
}

View File

@ -29,7 +29,6 @@
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/frequency.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@ -609,7 +608,7 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) {
// `video_frame.packet_infos`. But VideoFrame is const qualified here.
call_->worker_thread()->PostTask(
ToQueuedTask(task_safety_, [frame_meta, this]() {
SafeTask(task_safety_.flag(), [frame_meta, this]() {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
int64_t video_playout_ntp_ms;
int64_t sync_offset_ms;
@ -771,9 +770,9 @@ void VideoReceiveStream2::OnEncodedFrame(std::unique_ptr<EncodedFrame> frame) {
void VideoReceiveStream2::OnDecodableFrameTimeout(TimeDelta wait_time) {
if (!call_->worker_thread()->IsCurrent()) {
call_->worker_thread()->PostTask(ToQueuedTask(
task_safety_,
[this, wait_time] { OnDecodableFrameTimeout(wait_time); }));
call_->worker_thread()->PostTask(
SafeTask(task_safety_.flag(),
[this, wait_time] { OnDecodableFrameTimeout(wait_time); }));
return;
}
@ -841,8 +840,8 @@ void VideoReceiveStream2::HandleEncodedFrame(
{
// TODO(bugs.webrtc.org/11993): Make this PostTask to the network thread.
call_->worker_thread()->PostTask(ToQueuedTask(
task_safety_,
call_->worker_thread()->PostTask(SafeTask(
task_safety_.flag(),
[this, now, received_frame_is_keyframe, force_request_key_frame,
decoded_frame_picture_id, keyframe_request_is_due]() {
RTC_DCHECK_RUN_ON(&packet_sequence_checker_);

View File

@ -13,7 +13,6 @@
#include "api/array_view.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/video/video_stream_encoder_settings.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/source/rtp_header_extension_size.h"
@ -238,7 +237,7 @@ void VideoSendStream::UpdateActiveSimulcastLayers(
<< active_layers_string.str();
rtp_transport_queue_->PostTask(
ToQueuedTask(transport_queue_safety_, [this, active_layers] {
SafeTask(transport_queue_safety_, [this, active_layers] {
send_stream_.UpdateActiveSimulcastLayers(active_layers);
}));
@ -253,11 +252,11 @@ void VideoSendStream::Start() {
running_ = true;
rtp_transport_queue_->PostTask(ToQueuedTask([this] {
rtp_transport_queue_->PostTask([this] {
transport_queue_safety_->SetAlive();
send_stream_.Start();
thread_sync_event_.Set();
}));
});
// It is expected that after VideoSendStream::Start has been called, incoming
// frames are not dropped in VideoStreamEncoder. To ensure this, Start has to
@ -272,7 +271,7 @@ void VideoSendStream::Stop() {
return;
RTC_DLOG(LS_INFO) << "VideoSendStream::Stop";
running_ = false;
rtp_transport_queue_->PostTask(ToQueuedTask(transport_queue_safety_, [this] {
rtp_transport_queue_->PostTask(SafeTask(transport_queue_safety_, [this] {
// As the stream can get re-used and implicitly restarted via changing
// the state of the active layers, we do not mark the
// `transport_queue_safety_` flag with `SetNotAlive()` here. That's only

View File

@ -21,7 +21,6 @@
#include "api/rtp_parameters.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/to_queued_task.h"
#include "api/video_codecs/video_codec.h"
#include "call/rtp_transport_controller_send_interface.h"
#include "call/video_send_stream.h"
@ -284,7 +283,7 @@ VideoSendStreamImpl::VideoSendStreamImpl(
transport->EnablePeriodicAlrProbing(*enable_alr_bw_probing);
}
rtp_transport_queue_->PostTask(ToQueuedTask(transport_queue_safety_, [this] {
rtp_transport_queue_->PostTask(SafeTask(transport_queue_safety_, [this] {
if (configured_pacing_factor_)
transport_->SetPacingFactor(*configured_pacing_factor_);
@ -398,7 +397,7 @@ void VideoSendStreamImpl::SignalEncoderTimedOut() {
void VideoSendStreamImpl::OnBitrateAllocationUpdated(
const VideoBitrateAllocation& allocation) {
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(ToQueuedTask(transport_queue_safety_, [=] {
rtp_transport_queue_->PostTask(SafeTask(transport_queue_safety_, [=] {
OnBitrateAllocationUpdated(allocation);
}));
return;
@ -472,7 +471,7 @@ void VideoSendStreamImpl::OnEncoderConfigurationChanged(
VideoEncoderConfig::ContentType content_type,
int min_transmit_bitrate_bps) {
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(ToQueuedTask(
rtp_transport_queue_->PostTask(SafeTask(
transport_queue_safety_,
[this, streams = std::move(streams), is_svc, content_type,
min_transmit_bitrate_bps]() mutable {
@ -555,7 +554,7 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
};
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(
ToQueuedTask(transport_queue_safety_, std::move(enable_padding_task)));
SafeTask(transport_queue_safety_, std::move(enable_padding_task)));
} else {
enable_padding_task();
}
@ -574,7 +573,7 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
};
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(
ToQueuedTask(transport_queue_safety_, std::move(update_task)));
SafeTask(transport_queue_safety_, std::move(update_task)));
} else {
update_task();
}

View File

@ -16,6 +16,7 @@
#include "absl/types/optional.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/units/time_delta.h"
#include "call/rtp_video_sender.h"
#include "call/test/mock_bitrate_allocator.h"
#include "call/test/mock_rtp_transport_controller_send.h"
@ -866,15 +867,15 @@ TEST_F(VideoSendStreamImplTest, DisablesPaddingOnPausedEncoder) {
RTC_FROM_HERE);
rtc::Event done;
test_queue_.PostDelayedTask(
ToQueuedTask([&] {
test_queue_.Get()->PostDelayedTask(
[&] {
// No padding supposed to be sent for paused observer
EXPECT_EQ(0, padding_bitrate);
testing::Mock::VerifyAndClearExpectations(&bitrate_allocator_);
vss_impl->Stop();
done.Set();
}),
5000);
},
TimeDelta::Seconds(5));
// Pause the test suite so that the last delayed task executes.
ASSERT_TRUE(done.Wait(10000));
@ -904,13 +905,13 @@ TEST_F(VideoSendStreamImplTest, KeepAliveOnDroppedFrame) {
RTC_FROM_HERE);
rtc::Event done;
test_queue_.PostDelayedTask(
ToQueuedTask([&] {
test_queue_.Get()->PostDelayedTask(
[&] {
testing::Mock::VerifyAndClearExpectations(&bitrate_allocator_);
vss_impl->Stop();
done.Set();
}),
2000);
},
TimeDelta::Seconds(2));
ASSERT_TRUE(done.Wait(5000));
}

View File

@ -16,7 +16,6 @@
#include "api/sequence_checker.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/test/simulated_network.h"
#include "api/video/builtin_video_bitrate_allocator_factory.h"
#include "api/video/encoded_image.h"
@ -1503,7 +1502,7 @@ TEST_F(VideoSendStreamTest, MinTransmitBitrateRespectsRemb) {
const uint32_t ssrc = rtp_packet.Ssrc();
RTC_DCHECK(stream_);
task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this, ssrc]() {
task_queue_->PostTask(SafeTask(task_safety_flag_, [this, ssrc]() {
VideoSendStream::Stats stats = stream_->GetStats();
if (!stats.substreams.empty()) {
EXPECT_EQ(1u, stats.substreams.size());
@ -1620,14 +1619,14 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
Action OnSendRtp(const uint8_t* packet, size_t length) override {
RTC_DCHECK_RUN_ON(&module_process_thread_);
task_queue_->PostTask(ToQueuedTask([this]() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
if (!call_)
return;
Call::Stats stats = call_->GetStats();
if (stats.send_bandwidth_bps > kStartBitrateBps)
observation_complete_.Set();
}));
});
return SEND_PACKET;
}
@ -1722,7 +1721,7 @@ TEST_F(VideoSendStreamTest, DISABLED_RelayToDirectRoute) {
Action OnSendRtp(const uint8_t* packet, size_t length) override {
RTC_DCHECK_RUN_ON(&module_process_thread_);
task_queue_->PostTask(ToQueuedTask([this]() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
if (!call_)
return;
@ -1732,7 +1731,7 @@ TEST_F(VideoSendStreamTest, DISABLED_RelayToDirectRoute) {
call_->GetStats().send_bandwidth_bps > kRelayBandwidthCapBps;
if (did_exceed_cap || had_time_to_exceed_cap_in_relayed_phase)
observation_complete_.Set();
}));
});
return SEND_PACKET;
}
@ -1918,7 +1917,7 @@ class MaxPaddingSetTest : public test::SendTest {
// Check the stats on the correct thread and signal the 'complete' flag
// once we detect that we're done.
task_queue_->PostTask(ToQueuedTask([this]() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
// In case we get a callback during teardown.
// When this happens, OnStreamsStopped() has been called already,
@ -1953,7 +1952,7 @@ class MaxPaddingSetTest : public test::SendTest {
observation_complete_.Set();
}
}
}));
});
return SEND_PACKET;
}
@ -3865,7 +3864,7 @@ class ContentSwitchTest : public test::SendTest {
}
Action OnSendRtp(const uint8_t* packet, size_t length) override {
task_queue_->PostTask(ToQueuedTask([this]() {
task_queue_->PostTask([this]() {
MutexLock lock(&mutex_);
if (done_)
return;
@ -3914,7 +3913,7 @@ class ContentSwitchTest : public test::SendTest {
return;
}
observation_complete_.Set();
}));
});
return SEND_PACKET;
}

View File

@ -18,12 +18,12 @@
#include <utility>
#include "absl/algorithm/container.h"
#include "absl/cleanup/cleanup.h"
#include "absl/types/optional.h"
#include "api/field_trials_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/video/encoded_image.h"
#include "api/video/i420_buffer.h"
#include "api/video/render_resolution.h"
@ -738,9 +738,9 @@ void VideoStreamEncoder::Stop() {
video_source_sink_controller_.SetSource(nullptr);
rtc::Event shutdown_event;
encoder_queue_.PostTask(webrtc::ToQueuedTask(
[this] {
absl::Cleanup shutdown = [&shutdown_event] { shutdown_event.Set(); };
encoder_queue_.PostTask(
[this, shutdown = std::move(shutdown)] {
RTC_DCHECK_RUN_ON(&encoder_queue_);
if (resource_adaptation_processor_) {
stream_resource_manager_.StopManagedResources();
@ -763,8 +763,7 @@ void VideoStreamEncoder::Stop() {
ReleaseEncoder();
encoder_ = nullptr;
frame_cadence_adapter_ = nullptr;
},
[&shutdown_event]() { shutdown_event.Set(); }));
});
shutdown_event.Wait(rtc::Event::kForever);
}
@ -1148,9 +1147,10 @@ void VideoStreamEncoder::ReconfigureEncoder() {
encoder_resolutions.emplace_back(simulcastStream.width,
simulcastStream.height);
}
worker_queue_->PostTask(ToQueuedTask(
task_safety_, [this, max_framerate, alignment,
encoder_resolutions = std::move(encoder_resolutions)]() {
worker_queue_->PostTask(SafeTask(
task_safety_.flag(),
[this, max_framerate, alignment,
encoder_resolutions = std::move(encoder_resolutions)]() {
RTC_DCHECK_RUN_ON(worker_queue_);
if (max_framerate !=
video_source_sink_controller_.frame_rate_upper_limit() ||
@ -1910,7 +1910,7 @@ void VideoStreamEncoder::EncodeVideoFrame(const VideoFrame& video_frame,
}
void VideoStreamEncoder::RequestRefreshFrame() {
worker_queue_->PostTask(ToQueuedTask(task_safety_, [this] {
worker_queue_->PostTask(SafeTask(task_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.RequestRefreshFrame();
}));
@ -2249,8 +2249,8 @@ void VideoStreamEncoder::OnVideoSourceRestrictionsUpdated(
RTC_LOG(LS_INFO) << "Updating sink restrictions from "
<< (reason ? reason->Name() : std::string("<null>"))
<< " to " << restrictions.ToString();
worker_queue_->PostTask(ToQueuedTask(
task_safety_, [this, restrictions = std::move(restrictions)]() {
worker_queue_->PostTask(SafeTask(
task_safety_.flag(), [this, restrictions = std::move(restrictions)]() {
RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.SetRestrictions(std::move(restrictions));
video_source_sink_controller_.PushSourceSinkSettings();
@ -2391,7 +2391,7 @@ void VideoStreamEncoder::CheckForAnimatedContent(
"animation detection.";
}
worker_queue_->PostTask(
ToQueuedTask(task_safety_, [this, should_cap_resolution]() {
SafeTask(task_safety_.flag(), [this, should_cap_resolution]() {
RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.SetPixelsPerFrameUpperLimit(
should_cap_resolution

View File

@ -133,11 +133,10 @@ void PassAFrame(
TaskQueueBase* encoder_queue,
FrameCadenceAdapterInterface::Callback* video_stream_encoder_callback,
int64_t ntp_time_ms) {
encoder_queue->PostTask(
ToQueuedTask([video_stream_encoder_callback, ntp_time_ms] {
video_stream_encoder_callback->OnFrame(Timestamp::Millis(ntp_time_ms),
1, CreateSimpleNV12Frame());
}));
encoder_queue->PostTask([video_stream_encoder_callback, ntp_time_ms] {
video_stream_encoder_callback->OnFrame(Timestamp::Millis(ntp_time_ms), 1,
CreateSimpleNV12Frame());
});
}
class TestBuffer : public webrtc::I420Buffer {
@ -9076,13 +9075,17 @@ TEST(VideoStreamEncoderSimpleTest, CreateDestroy) {
private:
void Delete() override { delete this; }
void PostTask(std::unique_ptr<QueuedTask> task) override {
void PostTask(absl::AnyInvocable<void() &&> task) override {
// meh.
}
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override {
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
ASSERT_TRUE(false);
}
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
ADD_FAILURE();
}
};
// Lots of boiler plate.