Revert "[WebRTC-SendPacketsOnWorkerThread] Cleanup VideoSendStream(Impl)"

This reverts commit 77c47947ad098e4182a6244cb998e4fa8c7bd37e.

Reason for revert: Breaks downstream project.

Original change's description:
> [WebRTC-SendPacketsOnWorkerThread] Cleanup VideoSendStream(Impl)
>
> Cleanup and remove usage of MaybeWorkerThread from VideoSendStream.
> VideoSendStream is now created and lives on the worker thread.
>
> Bug: webrtc:14502
> Change-Id: I81ccf6b9fc6e8889db81b09bd4a75a3831a003e2
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300842
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Commit-Queue: Per Kjellander <perkj@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39814}

Bug: webrtc:14502
Change-Id: Ic969071d8797204851ecbaeea3b37f9256303d3d
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300962
Owners-Override: Mirko Bonadei <mbonadei@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Auto-Submit: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39819}
This commit is contained in:
Mirko Bonadei 2023-04-12 08:37:51 +00:00 committed by WebRTC LUCI CQ
parent 28d92f6804
commit 779aadeb2e
4 changed files with 83 additions and 50 deletions

View File

@ -149,7 +149,8 @@ VideoSendStream::VideoSendStream(
const std::map<uint32_t, RtpPayloadState>& suspended_payload_states,
std::unique_ptr<FecController> fec_controller,
const FieldTrialsView& field_trials)
: transport_(transport),
: rtp_transport_queue_(transport->GetWorkerQueue()),
transport_(transport),
stats_proxy_(clock, config, encoder_config.content_type, field_trials),
config_(std::move(config)),
content_type_(encoder_config.content_type),
@ -236,7 +237,12 @@ void VideoSendStream::StartPerRtpStream(const std::vector<bool> active_layers) {
}
active_layers_string << "}";
RTC_LOG(LS_INFO) << "StartPerRtpStream: " << active_layers_string.str();
send_stream_.StartPerRtpStream(active_layers);
rtp_transport_queue_->RunOrPost(
SafeTask(transport_queue_safety_, [this, active_layers] {
send_stream_.StartPerRtpStream(active_layers);
}));
running_ = running;
}
@ -246,7 +252,13 @@ void VideoSendStream::Stop() {
return;
RTC_DLOG(LS_INFO) << "VideoSendStream::Stop";
running_ = false;
send_stream_.Stop();
rtp_transport_queue_->RunOrPost(SafeTask(transport_queue_safety_, [this] {
// As the stream can get re-used and implicitly restarted via changing
// the state of the active layers, we do not mark the
// `transport_queue_safety_` flag with `SetNotAlive()` here. That's only
// done when we stop permanently via `StopPermanentlyAndGetRtpStates()`.
send_stream_.Stop();
}));
}
bool VideoSendStream::started() {
@ -288,7 +300,9 @@ void VideoSendStream::ReconfigureVideoEncoder(VideoEncoderConfig config,
}
VideoSendStream::Stats VideoSendStream::GetStats() {
RTC_DCHECK_RUN_ON(&thread_checker_);
// TODO(perkj, solenberg): Some test cases in EndToEndTest call GetStats from
// a network thread. See comment in Call::GetStats().
// RTC_DCHECK_RUN_ON(&thread_checker_);
return stats_proxy_.GetStats();
}
@ -306,9 +320,13 @@ void VideoSendStream::StopPermanentlyAndGetRtpStates(
// Always run these cleanup steps regardless of whether running_ was set
// or not. This will unregister callbacks before destruction.
// See `VideoSendStreamImpl::StopVideoSendStream` for more.
send_stream_.Stop();
*rtp_state_map = send_stream_.GetRtpStates();
*payload_state_map = send_stream_.GetRtpPayloadStates();
rtp_transport_queue_->RunSynchronous(
[this, rtp_state_map, payload_state_map]() {
transport_queue_safety_->SetNotAlive();
send_stream_.Stop();
*rtp_state_map = send_stream_.GetRtpStates();
*payload_state_map = send_stream_.GetRtpPayloadStates();
});
}
void VideoSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
@ -317,7 +335,6 @@ void VideoSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
}
void VideoSendStream::GenerateKeyFrame(const std::vector<std::string>& rids) {
RTC_DCHECK_RUN_ON(&thread_checker_);
// Map rids to layers. If rids is empty, generate a keyframe for all layers.
std::vector<VideoFrameType> next_frames(config_.rtp.ssrcs.size(),
VideoFrameType::kVideoFrameKey);

View File

@ -104,7 +104,11 @@ class VideoSendStream : public webrtc::VideoSendStream {
absl::optional<float> GetPacingFactorOverride() const;
RTC_NO_UNIQUE_ADDRESS SequenceChecker thread_checker_;
MaybeWorkerThread* const rtp_transport_queue_;
RtpTransportControllerSendInterface* const transport_;
rtc::Event thread_sync_event_;
rtc::scoped_refptr<PendingTaskSafetyFlag> transport_queue_safety_ =
PendingTaskSafetyFlag::CreateDetached();
SendStatisticsProxy stats_proxy_;
const VideoSendStream::Config config_;

View File

@ -233,7 +233,7 @@ VideoSendStreamImpl::VideoSendStreamImpl(
pacing_config_(PacingConfig(field_trials)),
stats_proxy_(stats_proxy),
config_(config),
worker_queue_(TaskQueueBase::Current()),
rtp_transport_queue_(transport->GetWorkerQueue()),
timed_out_(false),
transport_(transport),
bitrate_allocator_(bitrate_allocator),
@ -298,26 +298,33 @@ VideoSendStreamImpl::VideoSendStreamImpl(
transport->EnablePeriodicAlrProbing(*enable_alr_bw_probing);
}
if (configured_pacing_factor_)
transport_->SetPacingFactor(*configured_pacing_factor_);
rtp_transport_queue_->RunOrPost(SafeTask(transport_queue_safety_, [this] {
if (configured_pacing_factor_)
transport_->SetPacingFactor(*configured_pacing_factor_);
video_stream_encoder_->SetStartBitrate(
bitrate_allocator_->GetStartBitrate(this));
video_stream_encoder_->SetStartBitrate(
bitrate_allocator_->GetStartBitrate(this));
}));
}
VideoSendStreamImpl::~VideoSendStreamImpl() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_LOG(LS_INFO) << "~VideoSendStreamImpl: " << config_->ToString();
// TODO(webrtc:14502): Change `transport_queue_safety_` to be of type
// ScopedTaskSafety if experiment WebRTC-SendPacketsOnWorkerThread succeed.
if (rtp_transport_queue_->IsCurrent()) {
transport_queue_safety_->SetNotAlive();
}
}
void VideoSendStreamImpl::DeliverRtcp(const uint8_t* packet, size_t length) {
RTC_DCHECK_RUN_ON(&thread_checker_);
// Runs on a worker thread.
rtp_video_sender_->DeliverRtcp(packet, length);
}
void VideoSendStreamImpl::StartPerRtpStream(
const std::vector<bool> active_layers) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
bool previously_active = rtp_video_sender_->IsActive();
rtp_video_sender_->SetActiveModules(active_layers);
if (!rtp_video_sender_->IsActive() && previously_active) {
@ -328,7 +335,8 @@ void VideoSendStreamImpl::StartPerRtpStream(
}
void VideoSendStreamImpl::StartupVideoSendStream() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
transport_queue_safety_->SetAlive();
bitrate_allocator_->AddObserver(this, GetAllocationConfig());
// Start monitoring encoder activity.
@ -338,8 +346,9 @@ void VideoSendStreamImpl::StartupVideoSendStream() {
activity_ = false;
timed_out_ = false;
check_encoder_activity_task_ = RepeatingTaskHandle::DelayedStart(
worker_queue_, kEncoderTimeOut, [this] {
RTC_DCHECK_RUN_ON(&thread_checker_);
rtp_transport_queue_->TaskQueueForDelayedTasks(), kEncoderTimeOut,
[this] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
if (!activity_) {
if (!timed_out_) {
SignalEncoderTimedOut();
@ -359,27 +368,29 @@ void VideoSendStreamImpl::StartupVideoSendStream() {
}
void VideoSendStreamImpl::Stop() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
RTC_LOG(LS_INFO) << "VideoSendStreamImpl::Stop";
if (!rtp_video_sender_->IsActive())
return;
RTC_DCHECK(transport_queue_safety_->alive());
TRACE_EVENT_INSTANT0("webrtc", "VideoSendStream::Stop");
rtp_video_sender_->Stop();
StopVideoSendStream();
}
void VideoSendStreamImpl::StopVideoSendStream() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
bitrate_allocator_->RemoveObserver(this);
check_encoder_activity_task_.Stop();
video_stream_encoder_->OnBitrateUpdated(DataRate::Zero(), DataRate::Zero(),
DataRate::Zero(), 0, 0, 0);
stats_proxy_->OnSetEncoderTargetRate(0);
transport_queue_safety_->SetNotAlive();
}
void VideoSendStreamImpl::SignalEncoderTimedOut() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
// If the encoder has not produced anything the last kEncoderTimeOut and it
// is supposed to, deregister as BitrateAllocatorObserver. This can happen
// if a camera stops producing frames.
@ -392,9 +403,9 @@ void VideoSendStreamImpl::SignalEncoderTimedOut() {
void VideoSendStreamImpl::OnBitrateAllocationUpdated(
const VideoBitrateAllocation& allocation) {
// OnBitrateAllocationUpdated is invoked from the encoder task queue or
// the worker_queue_.
auto task = [this, allocation] {
RTC_DCHECK_RUN_ON(&thread_checker_);
// the rtp_transport_queue_.
auto task = [=] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
if (encoder_target_rate_bps_ == 0) {
return;
}
@ -430,9 +441,9 @@ void VideoSendStreamImpl::OnBitrateAllocationUpdated(
// Send bitrate allocation metadata only if encoder is not paused.
rtp_video_sender_->OnBitrateAllocationUpdated(allocation);
};
if (!worker_queue_->IsCurrent()) {
worker_queue_->PostTask(
SafeTask(worker_queue_safety_.flag(), std::move(task)));
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->TaskQueueForPost()->PostTask(
SafeTask(transport_queue_safety_, std::move(task)));
} else {
task();
}
@ -446,7 +457,7 @@ void VideoSendStreamImpl::OnVideoLayersAllocationUpdated(
}
void VideoSendStreamImpl::SignalEncoderActive() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
if (rtp_video_sender_->IsActive()) {
RTC_LOG(LS_INFO) << "SignalEncoderActive, Encoder is active.";
bitrate_allocator_->AddObserver(this, GetAllocationConfig());
@ -469,12 +480,12 @@ void VideoSendStreamImpl::OnEncoderConfigurationChanged(
VideoEncoderConfig::ContentType content_type,
int min_transmit_bitrate_bps) {
// Currently called on the encoder TQ
RTC_DCHECK(!worker_queue_->IsCurrent());
RTC_DCHECK(!rtp_transport_queue_->IsCurrent());
auto closure = [this, streams = std::move(streams), is_svc, content_type,
min_transmit_bitrate_bps]() mutable {
RTC_DCHECK_GE(config_->rtp.ssrcs.size(), streams.size());
TRACE_EVENT0("webrtc", "VideoSendStream::OnEncoderConfigurationChanged");
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
const VideoCodecType codec_type =
PayloadStringToCodecType(config_->rtp.payload_name);
@ -526,8 +537,8 @@ void VideoSendStreamImpl::OnEncoderConfigurationChanged(
}
};
worker_queue_->PostTask(
SafeTask(worker_queue_safety_.flag(), std::move(closure)));
rtp_transport_queue_->TaskQueueForPost()->PostTask(
SafeTask(transport_queue_safety_, std::move(closure)));
}
EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
@ -539,10 +550,10 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
// Indicate that there still is activity going on.
activity_ = true;
RTC_DCHECK(!worker_queue_->IsCurrent());
RTC_DCHECK(!rtp_transport_queue_->IsCurrent());
auto task_to_run_on_worker = [this]() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
if (disable_padding_) {
disable_padding_ = false;
// To ensure that padding bitrate is propagated to the bitrate allocator.
@ -555,8 +566,8 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
OnBitrateAllocationUpdated(*context->throttled_allocation);
}
};
worker_queue_->PostTask(
SafeTask(worker_queue_safety_.flag(), std::move(task_to_run_on_worker)));
rtp_transport_queue_->TaskQueueForPost()->PostTask(
SafeTask(transport_queue_safety_, std::move(task_to_run_on_worker)));
return rtp_video_sender_->OnEncodedImage(encoded_image, codec_specific_info);
}
@ -576,7 +587,7 @@ std::map<uint32_t, RtpPayloadState> VideoSendStreamImpl::GetRtpPayloadStates()
}
uint32_t VideoSendStreamImpl::OnBitrateUpdated(BitrateAllocationUpdate update) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
RTC_DCHECK(rtp_video_sender_->IsActive())
<< "VideoSendStream::Start has not been called.";

View File

@ -121,14 +121,14 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver,
void StartupVideoSendStream();
// Removes the bitrate observer, stops monitoring and notifies the video
// encoder of the bitrate update.
void StopVideoSendStream() RTC_RUN_ON(thread_checker_);
void StopVideoSendStream() RTC_RUN_ON(rtp_transport_queue_);
void ConfigureProtection();
void ConfigureSsrcs();
void SignalEncoderTimedOut();
void SignalEncoderActive();
MediaStreamAllocationConfig GetAllocationConfig() const
RTC_RUN_ON(thread_checker_);
RTC_RUN_ON(rtp_transport_queue_);
RTC_NO_UNIQUE_ADDRESS SequenceChecker thread_checker_;
Clock* const clock_;
@ -138,30 +138,31 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver,
SendStatisticsProxy* const stats_proxy_;
const VideoSendStream::Config* const config_;
TaskQueueBase* const worker_queue_;
MaybeWorkerThread* const rtp_transport_queue_;
RepeatingTaskHandle check_encoder_activity_task_
RTC_GUARDED_BY(thread_checker_);
RTC_GUARDED_BY(rtp_transport_queue_);
std::atomic_bool activity_;
bool timed_out_ RTC_GUARDED_BY(thread_checker_);
bool timed_out_ RTC_GUARDED_BY(rtp_transport_queue_);
RtpTransportControllerSendInterface* const transport_;
BitrateAllocatorInterface* const bitrate_allocator_;
bool disable_padding_ RTC_GUARDED_BY(thread_checker_);
int max_padding_bitrate_ RTC_GUARDED_BY(thread_checker_);
int encoder_min_bitrate_bps_ RTC_GUARDED_BY(thread_checker_);
uint32_t encoder_max_bitrate_bps_ RTC_GUARDED_BY(thread_checker_);
uint32_t encoder_target_rate_bps_ RTC_GUARDED_BY(thread_checker_);
double encoder_bitrate_priority_ RTC_GUARDED_BY(thread_checker_);
bool disable_padding_;
int max_padding_bitrate_;
int encoder_min_bitrate_bps_;
uint32_t encoder_max_bitrate_bps_;
uint32_t encoder_target_rate_bps_;
double encoder_bitrate_priority_;
VideoStreamEncoderInterface* const video_stream_encoder_;
RtcpBandwidthObserver* const bandwidth_observer_;
RtpVideoSenderInterface* const rtp_video_sender_;
ScopedTaskSafety worker_queue_safety_;
rtc::scoped_refptr<PendingTaskSafetyFlag> transport_queue_safety_ =
PendingTaskSafetyFlag::CreateDetached();
// Context for the most recent and last sent video bitrate allocation. Used to
// throttle sending of similar bitrate allocations.
@ -171,7 +172,7 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver,
int64_t last_send_time_ms;
};
absl::optional<VbaSendContext> video_bitrate_allocation_context_
RTC_GUARDED_BY(thread_checker_);
RTC_GUARDED_BY(rtp_transport_queue_);
const absl::optional<float> configured_pacing_factor_;
};
} // namespace internal