diff --git a/test/video_codec_tester.cc b/test/video_codec_tester.cc index 8f5feeef04..1ec5bca244 100644 --- a/test/video_codec_tester.cc +++ b/test/video_codec_tester.cc @@ -186,25 +186,34 @@ class Pacer { TimeDelta delay_; }; +// A task queue that limits its maximum size and guarantees FIFO execution of +// the scheduled tasks. class LimitedTaskQueue { public: - // The codec tester reads frames from video source in the main thread. - // Encoding and decoding are done in separate threads. If encoding or - // decoding is slow, the reading may go far ahead and may buffer too many - // frames in memory. To prevent this we limit the encoding/decoding queue - // size. When the queue is full, the main thread and, hence, reading frames - // from video source is blocked until a previously posted encoding/decoding - // task starts. + // Frame reading, encoding and decoding are handled in separate threads. If + // encoding or decoding is slow, the frame reader may run far ahead, loading + // many large frames into memory. To prevent this, we limit the maximum size + // of the task queue. When this limit is reached, posting new tasks is blocked + // until the queue size is reduced by executing previous tasks. static constexpr int kMaxTaskQueueSize = 3; LimitedTaskQueue() : queue_size_(0) {} - void PostScheduledTask(absl::AnyInvocable task, Timestamp start) { + void PostScheduledTask(absl::AnyInvocable task, + Timestamp scheduled) { + { + // Block posting new tasks until the queue size is reduced. + MutexLock lock(&mutex_); + while (queue_size_ >= kMaxTaskQueueSize) { + task_executed_.Wait(TimeDelta::Seconds(10)); + task_executed_.Reset(); + } + } + ++queue_size_; - task_queue_.PostTask([this, task = std::move(task), start]() mutable { - // `TaskQueue` doesn't guarantee FIFO order of execution for delayed - // tasks. - int64_t wait_ms = (start - Timestamp::Millis(rtc::TimeMillis())).ms(); + task_queue_.PostTask([this, task = std::move(task), scheduled]() mutable { + Timestamp now = Timestamp::Millis(rtc::TimeMillis()); + int64_t wait_ms = (scheduled - now).ms(); if (wait_ms > 0) { RTC_CHECK_LT(wait_ms, 10000) << "Too high wait_ms " << wait_ms; SleepMs(wait_ms); @@ -213,16 +222,19 @@ class LimitedTaskQueue { --queue_size_; task_executed_.Set(); }); + } - task_executed_.Reset(); - if (queue_size_ > kMaxTaskQueueSize) { - task_executed_.Wait(rtc::Event::kForever); - RTC_CHECK(queue_size_ <= kMaxTaskQueueSize); - } + void PostTask(absl::AnyInvocable task) { + Timestamp now = Timestamp::Millis(rtc::TimeMillis()); + PostScheduledTask(std::move(task), now); } void PostTaskAndWait(absl::AnyInvocable task) { - PostScheduledTask(std::move(task), Timestamp::Millis(rtc::TimeMillis())); + PostTask(std::move(task)); + WaitForPreviouslyPostedTasks(); + } + + void WaitForPreviouslyPostedTasks() { task_queue_.WaitForPreviouslyPostedTasks(); } @@ -230,6 +242,7 @@ class LimitedTaskQueue { TaskQueueForTest task_queue_; std::atomic_int queue_size_; rtc::Event task_executed_; + Mutex mutex_; }; class TesterY4mWriter { @@ -776,7 +789,7 @@ class VideoCodecAnalyzer : public VideoCodecTester::VideoCodecStats { } VideoSource* const video_source_; - TaskQueueForTest task_queue_; + LimitedTaskQueue task_queue_; // RTP timestamp -> spatial layer -> Frame std::map> frames_; std::map encoding_settings_;