Block posting new tasks until queue size is reduced

Also use LimitedTaskQueue instead of TaskQueueForTest in VideoAnalyzer. This prevents buffering too many decoded frames.

Bug: webrtc:42225151, b/337757868
Change-Id: I75a304c7e4c8569505e31efc6455ce09d49f5a43
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/353380
Commit-Queue: Sergey Silkin <ssilkin@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Auto-Submit: Sergey Silkin <ssilkin@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42425}
This commit is contained in:
Sergey Silkin 2024-06-03 10:50:04 +02:00 committed by WebRTC LUCI CQ
parent 3cfebdf0df
commit 3fa94c458f

View File

@ -186,25 +186,34 @@ class Pacer {
TimeDelta delay_; TimeDelta delay_;
}; };
// A task queue that limits its maximum size and guarantees FIFO execution of
// the scheduled tasks.
class LimitedTaskQueue { class LimitedTaskQueue {
public: public:
// The codec tester reads frames from video source in the main thread. // Frame reading, encoding and decoding are handled in separate threads. If
// Encoding and decoding are done in separate threads. If encoding or // encoding or decoding is slow, the frame reader may run far ahead, loading
// decoding is slow, the reading may go far ahead and may buffer too many // many large frames into memory. To prevent this, we limit the maximum size
// frames in memory. To prevent this we limit the encoding/decoding queue // of the task queue. When this limit is reached, posting new tasks is blocked
// size. When the queue is full, the main thread and, hence, reading frames // until the queue size is reduced by executing previous tasks.
// from video source is blocked until a previously posted encoding/decoding
// task starts.
static constexpr int kMaxTaskQueueSize = 3; static constexpr int kMaxTaskQueueSize = 3;
LimitedTaskQueue() : queue_size_(0) {} LimitedTaskQueue() : queue_size_(0) {}
void PostScheduledTask(absl::AnyInvocable<void() &&> task, Timestamp start) { void PostScheduledTask(absl::AnyInvocable<void() &&> task,
Timestamp scheduled) {
{
// Block posting new tasks until the queue size is reduced.
MutexLock lock(&mutex_);
while (queue_size_ >= kMaxTaskQueueSize) {
task_executed_.Wait(TimeDelta::Seconds(10));
task_executed_.Reset();
}
}
++queue_size_; ++queue_size_;
task_queue_.PostTask([this, task = std::move(task), start]() mutable { task_queue_.PostTask([this, task = std::move(task), scheduled]() mutable {
// `TaskQueue` doesn't guarantee FIFO order of execution for delayed Timestamp now = Timestamp::Millis(rtc::TimeMillis());
// tasks. int64_t wait_ms = (scheduled - now).ms();
int64_t wait_ms = (start - Timestamp::Millis(rtc::TimeMillis())).ms();
if (wait_ms > 0) { if (wait_ms > 0) {
RTC_CHECK_LT(wait_ms, 10000) << "Too high wait_ms " << wait_ms; RTC_CHECK_LT(wait_ms, 10000) << "Too high wait_ms " << wait_ms;
SleepMs(wait_ms); SleepMs(wait_ms);
@ -213,16 +222,19 @@ class LimitedTaskQueue {
--queue_size_; --queue_size_;
task_executed_.Set(); task_executed_.Set();
}); });
task_executed_.Reset();
if (queue_size_ > kMaxTaskQueueSize) {
task_executed_.Wait(rtc::Event::kForever);
RTC_CHECK(queue_size_ <= kMaxTaskQueueSize);
} }
void PostTask(absl::AnyInvocable<void() &&> task) {
Timestamp now = Timestamp::Millis(rtc::TimeMillis());
PostScheduledTask(std::move(task), now);
} }
void PostTaskAndWait(absl::AnyInvocable<void() &&> task) { void PostTaskAndWait(absl::AnyInvocable<void() &&> task) {
PostScheduledTask(std::move(task), Timestamp::Millis(rtc::TimeMillis())); PostTask(std::move(task));
WaitForPreviouslyPostedTasks();
}
void WaitForPreviouslyPostedTasks() {
task_queue_.WaitForPreviouslyPostedTasks(); task_queue_.WaitForPreviouslyPostedTasks();
} }
@ -230,6 +242,7 @@ class LimitedTaskQueue {
TaskQueueForTest task_queue_; TaskQueueForTest task_queue_;
std::atomic_int queue_size_; std::atomic_int queue_size_;
rtc::Event task_executed_; rtc::Event task_executed_;
Mutex mutex_;
}; };
class TesterY4mWriter { class TesterY4mWriter {
@ -776,7 +789,7 @@ class VideoCodecAnalyzer : public VideoCodecTester::VideoCodecStats {
} }
VideoSource* const video_source_; VideoSource* const video_source_;
TaskQueueForTest task_queue_; LimitedTaskQueue task_queue_;
// RTP timestamp -> spatial layer -> Frame // RTP timestamp -> spatial layer -> Frame
std::map<uint32_t, std::map<int, Frame>> frames_; std::map<uint32_t, std::map<int, Frame>> frames_;
std::map<uint32_t, EncodingSettings> encoding_settings_; std::map<uint32_t, EncodingSettings> encoding_settings_;