Stop using legacy rtc::TaskQueue in VideoReceiveStream2

Bug: webrtc:14169
Change-Id: Ib18a0bd4531d69055ae0131ac749745bd74651d8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/334681
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41545}
This commit is contained in:
Danil Chapovalov 2024-01-16 17:25:22 +01:00 committed by WebRTC LUCI CQ
parent 54be7084e0
commit 2c22da6220
3 changed files with 30 additions and 25 deletions

View File

@ -142,7 +142,6 @@ rtc_library("video") {
"../rtc_base:rate_tracker",
"../rtc_base:rtc_event",
"../rtc_base:rtc_numerics",
"../rtc_base:rtc_task_queue",
"../rtc_base:safe_conversions",
"../rtc_base:sample_counter",
"../rtc_base:stringutils",

View File

@ -187,6 +187,8 @@ VideoReceiveStream2::VideoReceiveStream2(
NackPeriodicProcessor* nack_periodic_processor,
DecodeSynchronizer* decode_sync)
: env_(env),
packet_sequence_checker_(SequenceChecker::kDetached),
decode_sequence_checker_(SequenceChecker::kDetached),
transport_adapter_(config.rtcp_send_transport),
config_(std::move(config)),
num_cpu_cores_(num_cpu_cores),
@ -227,7 +229,6 @@ VideoReceiveStream2::VideoReceiveStream2(
RTC_DCHECK(call_->worker_thread());
RTC_DCHECK(config_.renderer);
RTC_DCHECK(call_stats_);
packet_sequence_checker_.Detach();
RTC_DCHECK(!config_.decoders.empty());
RTC_CHECK(config_.decoder_factory);
@ -378,8 +379,8 @@ void VideoReceiveStream2::Start() {
// Start decoding on task queue.
stats_proxy_.DecoderThreadStarting();
decode_queue_.PostTask([this] {
RTC_DCHECK_RUN_ON(&decode_queue_);
decode_queue_->PostTask([this] {
RTC_DCHECK_RUN_ON(&decode_sequence_checker_);
decoder_stopped_ = false;
});
buffer_->StartNextDecode(true);
@ -409,8 +410,8 @@ void VideoReceiveStream2::Stop() {
if (decoder_running_) {
rtc::Event done;
decode_queue_.PostTask([this, &done] {
RTC_DCHECK_RUN_ON(&decode_queue_);
decode_queue_->PostTask([this, &done] {
RTC_DCHECK_RUN_ON(&decode_sequence_checker_);
// Set `decoder_stopped_` before deregistering all decoders. This means
// that any pending encoded frame will return early without trying to
// access the decoder database.
@ -763,10 +764,10 @@ void VideoReceiveStream2::OnEncodedFrame(std::unique_ptr<EncodedFrame> frame) {
}
stats_proxy_.OnPreDecode(frame->CodecSpecific()->codecType, qp);
decode_queue_.PostTask([this, now, keyframe_request_is_due,
received_frame_is_keyframe, frame = std::move(frame),
keyframe_required = keyframe_required_]() mutable {
RTC_DCHECK_RUN_ON(&decode_queue_);
decode_queue_->PostTask([this, now, keyframe_request_is_due,
received_frame_is_keyframe, frame = std::move(frame),
keyframe_required = keyframe_required_]() mutable {
RTC_DCHECK_RUN_ON(&decode_sequence_checker_);
if (decoder_stopped_)
return;
DecodeFrameResult result = HandleEncodedFrameOnDecodeQueue(
@ -830,7 +831,7 @@ VideoReceiveStream2::HandleEncodedFrameOnDecodeQueue(
std::unique_ptr<EncodedFrame> frame,
bool keyframe_request_is_due,
bool keyframe_required) {
RTC_DCHECK_RUN_ON(&decode_queue_);
RTC_DCHECK_RUN_ON(&decode_sequence_checker_);
bool force_request_key_frame = false;
absl::optional<int64_t> decoded_frame_picture_id;
@ -872,7 +873,7 @@ VideoReceiveStream2::HandleEncodedFrameOnDecodeQueue(
int VideoReceiveStream2::DecodeAndMaybeDispatchEncodedFrame(
std::unique_ptr<EncodedFrame> frame) {
RTC_DCHECK_RUN_ON(&decode_queue_);
RTC_DCHECK_RUN_ON(&decode_sequence_checker_);
// If `buffered_encoded_frames_` grows out of control (=60 queued frames),
// maybe due to a stuck decoder, we just halt the process here and log the
@ -1055,10 +1056,10 @@ VideoReceiveStream2::SetAndGetRecordingState(RecordingState state,
: Timestamp::Millis(state.last_keyframe_request_ms.value_or(0));
}
decode_queue_.PostTask(
decode_queue_->PostTask(
[this, &event, &old_state, callback = std::move(state.callback),
last_keyframe_request = std::move(last_keyframe_request)] {
RTC_DCHECK_RUN_ON(&decode_queue_);
RTC_DCHECK_RUN_ON(&decode_sequence_checker_);
old_state.callback = std::move(encoded_frame_buffer_function_);
encoded_frame_buffer_function_ = std::move(callback);

View File

@ -20,6 +20,7 @@
#include "api/environment/environment.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "api/video/recordable_encoded_frame.h"
@ -31,7 +32,6 @@
#include "modules/video_coding/nack_requester.h"
#include "modules/video_coding/video_receiver2.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
#include "video/receive_statistics_proxy.h"
#include "video/rtp_streams_synchronizer2.h"
@ -224,7 +224,7 @@ class VideoReceiveStream2
DecodeFrameResult HandleEncodedFrameOnDecodeQueue(
std::unique_ptr<EncodedFrame> frame,
bool keyframe_request_is_due,
bool keyframe_required) RTC_RUN_ON(decode_queue_);
bool keyframe_required) RTC_RUN_ON(decode_sequence_checker_);
void UpdatePlayoutDelays() const
RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_sequence_checker_);
void RequestKeyFrame(Timestamp now) RTC_RUN_ON(packet_sequence_checker_);
@ -236,10 +236,12 @@ class VideoReceiveStream2
bool IsReceivingKeyFrame(Timestamp timestamp) const
RTC_RUN_ON(packet_sequence_checker_);
int DecodeAndMaybeDispatchEncodedFrame(std::unique_ptr<EncodedFrame> frame)
RTC_RUN_ON(decode_queue_);
RTC_RUN_ON(decode_sequence_checker_);
void UpdateHistograms();
const Environment env_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_sequence_checker_;
// TODO(bugs.webrtc.org/11993): This checker conceptually represents
// operations that belong to the network thread. The Call class is currently
@ -250,7 +252,7 @@ class VideoReceiveStream2
// on the network thread, this comment will be deleted.
RTC_NO_UNIQUE_ADDRESS SequenceChecker packet_sequence_checker_;
const Environment env_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker decode_sequence_checker_;
TransportAdapter transport_adapter_;
const VideoReceiveStreamInterface::Config config_;
@ -260,7 +262,7 @@ class VideoReceiveStream2
CallStats* const call_stats_;
bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true;
bool decoder_stopped_ RTC_GUARDED_BY(decode_sequence_checker_) = true;
SourceTracker source_tracker_;
ReceiveStatisticsProxy stats_proxy_;
@ -296,7 +298,7 @@ class VideoReceiveStream2
bool keyframe_required_ RTC_GUARDED_BY(packet_sequence_checker_) = true;
// If we have successfully decoded any frame.
bool frame_decoded_ RTC_GUARDED_BY(decode_queue_) = false;
bool frame_decoded_ RTC_GUARDED_BY(decode_sequence_checker_) = false;
absl::optional<Timestamp> last_keyframe_request_
RTC_GUARDED_BY(packet_sequence_checker_);
@ -325,7 +327,7 @@ class VideoReceiveStream2
// Function that is triggered with encoded frames, if not empty.
std::function<void(const RecordableEncodedFrame&)>
encoded_frame_buffer_function_ RTC_GUARDED_BY(decode_queue_);
encoded_frame_buffer_function_ RTC_GUARDED_BY(decode_sequence_checker_);
// Set to true while we're requesting keyframes but not yet received one.
bool keyframe_generation_requested_ RTC_GUARDED_BY(packet_sequence_checker_) =
false;
@ -338,13 +340,16 @@ class VideoReceiveStream2
RTC_GUARDED_BY(pending_resolution_mutex_);
// Buffered encoded frames held while waiting for decoded resolution.
std::vector<std::unique_ptr<EncodedFrame>> buffered_encoded_frames_
RTC_GUARDED_BY(decode_queue_);
// Defined last so they are destroyed before all other members.
rtc::TaskQueue decode_queue_;
RTC_GUARDED_BY(decode_sequence_checker_);
// Used to signal destruction to potentially pending tasks.
ScopedTaskSafety task_safety_;
// Defined last so they are destroyed before all other members, in particular
// `decode_queue_` should be stopped before `decode_sequence_checker_` is
// destructed to avoid races when running tasks on the `decode_queue_` during
// VideoReceiveStream2 destruction.
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> decode_queue_;
};
} // namespace internal