Replace TaskQueue with MaybeWorkerThread in RtpTransportControllerInterface

This spills to a few more clasess....

Change-Id: Iea79e3b4ac86b30db6f13da89a47ab7000c5440a
Bug: webrtc:14502
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/277803
Reviewed-by: Jakob Ivarsson‎ <jakobi@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38334}
This commit is contained in:
Per Kjellander 2022-10-10 12:53:41 +02:00 committed by WebRTC LUCI CQ
parent 5c9b7da038
commit 828ef91817
17 changed files with 849 additions and 822 deletions

View File

@ -84,6 +84,7 @@ rtc_library("audio") {
"../modules/pacing",
"../modules/rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/utility:utility",
"../rtc_base:audio_format_to_string",
"../rtc_base:buffer",
"../rtc_base:checks",
@ -111,6 +112,7 @@ rtc_library("audio") {
"utility:audio_frame_operations",
]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
@ -185,6 +187,7 @@ if (rtc_include_tests) {
"../modules/pacing",
"../modules/rtp_rtcp:mock_rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/utility:utility",
"../rtc_base:checks",
"../rtc_base:macromagic",
"../rtc_base:refcount",
@ -202,6 +205,7 @@ if (rtc_include_tests) {
"../test:scoped_key_value_config",
"../test:test_common",
"../test:test_support",
"../test/time_controller:time_controller",
"utility:utility_tests",
"//testing/gtest",
]

View File

@ -22,6 +22,7 @@
#include "api/crypto/frame_encryptor_interface.h"
#include "api/function_view.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/task_queue/task_queue_base.h"
#include "audio/audio_state.h"
#include "audio/channel_send.h"
#include "audio/conversion.h"
@ -35,10 +36,8 @@
#include "modules/audio_processing/include/audio_processing.h"
#include "modules/rtp_rtcp/source/rtp_header_extensions.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/audio_format_to_string.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/trace_event.h"
namespace webrtc {
@ -75,6 +74,7 @@ void UpdateEventLogStreamConfig(RtcEventLog* event_log,
event_log->Log(std::make_unique<RtcEventAudioSendStreamConfig>(
std::move(rtclog_config)));
}
} // namespace
constexpr char AudioAllocationConfig::kKey[];
@ -176,7 +176,6 @@ AudioSendStream::AudioSendStream(
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(config, true);
UpdateCachedTargetAudioBitrateConstraints();
pacer_thread_checker_.Detach();
}
AudioSendStream::~AudioSendStream() {
@ -184,11 +183,10 @@ AudioSendStream::~AudioSendStream() {
RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc;
RTC_DCHECK(!sending_);
channel_send_->ResetSenderCongestionControlObjects();
// Blocking call to synchronize state with worker queue to ensure that there
// are no pending tasks left that keeps references to audio.
rtc::Event thread_sync_event;
rtp_transport_queue_->PostTask([&] { thread_sync_event.Set(); });
thread_sync_event.Wait(rtc::Event::kForever);
rtp_transport_queue_->RunSynchronous([] {});
}
const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const {
@ -846,7 +844,7 @@ void AudioSendStream::ConfigureBitrateObserver() {
if (allocation_settings_.priority_bitrate_raw)
priority_bitrate = *allocation_settings_.priority_bitrate_raw;
rtp_transport_queue_->PostTask([this, constraints, priority_bitrate,
rtp_transport_queue_->RunOrPost([this, constraints, priority_bitrate,
config_bitrate_priority =
config_.bitrate_priority] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
@ -863,13 +861,10 @@ void AudioSendStream::ConfigureBitrateObserver() {
void AudioSendStream::RemoveBitrateObserver() {
registered_with_allocator_ = false;
rtc::Event thread_sync_event;
rtp_transport_queue_->PostTask([this, &thread_sync_event] {
rtp_transport_queue_->RunSynchronous([this] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
bitrate_allocator_->RemoveObserver(this);
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
}
absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
@ -932,7 +927,7 @@ void AudioSendStream::UpdateCachedTargetAudioBitrateConstraints() {
if (!new_constraints.has_value()) {
return;
}
rtp_transport_queue_->PostTask([this, new_constraints]() {
rtp_transport_queue_->RunOrPost([this, new_constraints]() {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
cached_constraints_ = new_constraints;
});

View File

@ -15,14 +15,17 @@
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "api/field_trials_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "audio/audio_level.h"
#include "audio/channel_send.h"
#include "call/audio_send_stream.h"
#include "call/audio_state.h"
#include "call/bitrate_allocator.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_interface.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/experiments/struct_parameters_parser.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h"
@ -166,9 +169,8 @@ class AudioSendStream final : public webrtc::AudioSendStream,
const FieldTrialsView& field_trials_;
SequenceChecker worker_thread_checker_;
SequenceChecker pacer_thread_checker_;
rtc::RaceChecker audio_capture_race_checker_;
rtc::TaskQueue* rtp_transport_queue_;
MaybeWorkerThread* rtp_transport_queue_;
const bool allocate_audio_without_feedback_;
const bool force_no_audio_feedback_ = allocate_audio_without_feedback_;

View File

@ -30,12 +30,13 @@
#include "modules/audio_processing/include/mock_audio_processing.h"
#include "modules/rtp_rtcp/mocks/mock_rtcp_bandwidth_observer.h"
#include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h"
#include "rtc_base/task_queue_for_test.h"
#include "modules/utility/maybe_worker_thread.h"
#include "system_wrappers/include/clock.h"
#include "test/gtest.h"
#include "test/mock_audio_encoder.h"
#include "test/mock_audio_encoder_factory.h"
#include "test/scoped_key_value_config.h"
#include "test/time_controller/real_time_controller.h"
namespace webrtc {
namespace test {
@ -148,17 +149,15 @@ struct ConfigHelper {
ConfigHelper(bool audio_bwe_enabled,
bool expect_set_encoder_call,
bool use_null_audio_processing)
: clock_(1000000),
task_queue_factory_(CreateDefaultTaskQueueFactory()),
stream_config_(/*send_transport=*/nullptr),
: stream_config_(/*send_transport=*/nullptr),
audio_processing_(
use_null_audio_processing
? nullptr
: rtc::make_ref_counted<NiceMock<MockAudioProcessing>>()),
bitrate_allocator_(&limit_observer_),
worker_queue_(task_queue_factory_->CreateTaskQueue(
worker_queue_(field_trials,
"ConfigHelper_worker_queue",
TaskQueueFactory::Priority::NORMAL)),
time_controller_.GetTaskQueueFactory()),
audio_encoder_(nullptr) {
using ::testing::Invoke;
@ -193,9 +192,9 @@ struct ConfigHelper {
.WillRepeatedly(Return(&worker_queue_));
return std::unique_ptr<internal::AudioSendStream>(
new internal::AudioSendStream(
Clock::GetRealTimeClock(), stream_config_, audio_state_,
task_queue_factory_.get(), &rtp_transport_, &bitrate_allocator_,
&event_log_, absl::nullopt,
time_controller_.GetClock(), stream_config_, audio_state_,
time_controller_.GetTaskQueueFactory(), &rtp_transport_,
&bitrate_allocator_, &event_log_, absl::nullopt,
std::unique_ptr<voe::ChannelSendInterface>(channel_send_),
field_trials));
}
@ -320,13 +319,12 @@ struct ConfigHelper {
}
}
TaskQueueForTest* worker() { return &worker_queue_; }
MaybeWorkerThread* worker() { return &worker_queue_; }
test::ScopedKeyValueConfig field_trials;
private:
SimulatedClock clock_;
std::unique_ptr<TaskQueueFactory> task_queue_factory_;
RealTimeController time_controller_;
rtc::scoped_refptr<AudioState> audio_state_;
AudioSendStream::Config stream_config_;
::testing::StrictMock<MockChannelSend>* channel_send_ = nullptr;
@ -340,7 +338,7 @@ struct ConfigHelper {
BitrateAllocator bitrate_allocator_;
// `worker_queue` is defined last to ensure all pending tasks are cancelled
// and deleted before any other members.
TaskQueueForTest worker_queue_;
MaybeWorkerThread worker_queue_;
std::unique_ptr<AudioEncoder> audio_encoder_;
};
@ -638,7 +636,8 @@ TEST(AudioSendStreamTest, DoesNotPassHigherBitrateThanMaxBitrate) {
update.packet_loss_ratio = 0;
update.round_trip_time = TimeDelta::Millis(50);
update.bwe_period = TimeDelta::Millis(6000);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -654,7 +653,8 @@ TEST(AudioSendStreamTest, SSBweTargetInRangeRespected) {
BitrateAllocationUpdate update;
update.target_bitrate =
DataRate::BitsPerSec(helper.config().max_bitrate_bps - 5000);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -670,7 +670,8 @@ TEST(AudioSendStreamTest, SSBweFieldTrialMinRespected) {
Eq(DataRate::KilobitsPerSec(6)))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(1);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -686,7 +687,8 @@ TEST(AudioSendStreamTest, SSBweFieldTrialMaxRespected) {
Eq(DataRate::KilobitsPerSec(64)))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(128);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -706,7 +708,8 @@ TEST(AudioSendStreamTest, SSBweWithOverhead) {
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update;
update.target_bitrate = bitrate;
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -726,7 +729,8 @@ TEST(AudioSendStreamTest, SSBweWithOverheadMinRespected) {
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(1);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -746,7 +750,8 @@ TEST(AudioSendStreamTest, SSBweWithOverheadMaxRespected) {
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(128);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -764,7 +769,8 @@ TEST(AudioSendStreamTest, ProbingIntervalOnBitrateUpdated) {
update.packet_loss_ratio = 0;
update.round_trip_time = TimeDelta::Millis(50);
update.bwe_period = TimeDelta::Millis(5000);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
}
}
@ -866,7 +872,8 @@ TEST(AudioSendStreamTest, AudioOverheadChanged) {
DataRate::BitsPerSec(helper.config().max_bitrate_bps) +
kMaxOverheadRate;
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(audio_overhead_per_packet_bytes,
send_stream->TestOnlyGetPerPacketOverheadBytes());
@ -874,7 +881,8 @@ TEST(AudioSendStreamTest, AudioOverheadChanged) {
EXPECT_CALL(*helper.rtp_rtcp(), ExpectedPerPacketOverhead)
.WillRepeatedly(Return(audio_overhead_per_packet_bytes + 20));
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(audio_overhead_per_packet_bytes + 20,
send_stream->TestOnlyGetPerPacketOverheadBytes());
@ -898,7 +906,8 @@ TEST(AudioSendStreamTest, OnAudioAndTransportOverheadChanged) {
DataRate::BitsPerSec(helper.config().max_bitrate_bps) +
kMaxOverheadRate;
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
helper.worker()->SendTask([&] { send_stream->OnBitrateUpdated(update); });
helper.worker()->RunSynchronous(
[&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(
transport_overhead_per_packet_bytes + audio_overhead_per_packet_bytes,

View File

@ -184,7 +184,8 @@ rtc_library("rtp_sender") {
"../api:sequence_checker",
"../api:transport_api",
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue",
"../api/transport:field_trial_based_config",
"../api/transport:goog_cc",
"../api/transport:network_control",
@ -203,6 +204,7 @@ rtc_library("rtp_sender") {
"../modules/rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/rtp_rtcp:rtp_video_header",
"../modules/utility:utility",
"../modules/video_coding:chain_diff_calculator",
"../modules/video_coding:codec_globals_headers",
"../modules/video_coding:frame_dependencies_calculator",

View File

@ -16,6 +16,7 @@
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/goog_cc_factory.h"
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
@ -119,9 +120,7 @@ RtpTransportControllerSend::RtpTransportControllerSend(
congestion_window_size_(DataSize::PlusInfinity()),
is_congested_(false),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
task_queue_(task_queue_factory->CreateTaskQueue(
"rtp_send_controller",
TaskQueueFactory::Priority::NORMAL)),
task_queue_(trials, "rtp_send_controller", task_queue_factory),
field_trials_(trials) {
ParseFieldTrial({&relay_bandwidth_cap_},
trials.Lookup("WebRTC-Bwe-NetworkRouteConstraints"));
@ -135,7 +134,15 @@ RtpTransportControllerSend::RtpTransportControllerSend(
}
RtpTransportControllerSend::~RtpTransportControllerSend() {
RTC_DCHECK_RUN_ON(&main_thread_);
RTC_DCHECK(video_rtp_senders_.empty());
if (task_queue_.IsCurrent()) {
// If these repeated tasks run on a task queue owned by
// `task_queue_`, they are stopped when the task queue is deleted.
// Otherwise, stop them here.
pacer_queue_update_task_.Stop();
controller_task_.Stop();
}
}
RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
@ -195,7 +202,7 @@ void RtpTransportControllerSend::UpdateCongestedState() {
}
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
MaybeWorkerThread* RtpTransportControllerSend::GetWorkerQueue() {
return &task_queue_;
}
@ -240,7 +247,7 @@ RtpTransportControllerSend::GetStreamFeedbackProvider() {
void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
TargetTransferRateObserver* observer) {
task_queue_.PostTask([this, observer] {
task_queue_.RunOrPost([this, observer] {
RTC_DCHECK_RUN_ON(&task_queue_);
RTC_DCHECK(observer_ == nullptr);
observer_ = observer;
@ -300,7 +307,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
if (relay_constraint_update.has_value()) {
UpdateBitrateConstraints(*relay_constraint_update);
}
task_queue_.PostTask([this, network_route] {
task_queue_.RunOrPost([this, network_route] {
RTC_DCHECK_RUN_ON(&task_queue_);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
});
@ -329,7 +336,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
NetworkRouteChange msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.constraints = ConvertConstraints(bitrate_config, clock_);
task_queue_.PostTask([this, msg, network_route] {
task_queue_.RunOrPost([this, msg, network_route] {
RTC_DCHECK_RUN_ON(&task_queue_);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
if (reset_feedback_on_route_change_) {
@ -352,7 +359,7 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
NetworkAvailability msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.network_available = network_available;
task_queue_.PostTask([this, msg]() {
task_queue_.RunOrPost([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (network_available_ == msg.network_available)
return;
@ -389,7 +396,7 @@ absl::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime()
return pacer_.FirstSentPacketTime();
}
void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
task_queue_.PostTask([this, enable]() {
task_queue_.RunOrPost([this, enable]() {
RTC_DCHECK_RUN_ON(&task_queue_);
streams_config_.requests_alr_probing = enable;
UpdateStreamsConfig();
@ -397,25 +404,33 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
}
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
task_queue_.PostTask([this, sent_packet]() {
// Normally called on the network thread !
// We can not use SafeTask here if we are using an owned task queue, because
// the safety flag will be destroyed when RtpTransportControllerSend is
// destroyed on the worker thread. But we must use SafeTask if we are using
// the worker thread, since the worker thread outlive
// RtpTransportControllerSend.
task_queue_.TaskQueueForPost()->PostTask(
task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (packet_msg) {
// Only update outstanding data if:
// 1. Packet feadback is used.
// 1. Packet feedback is used.
// 2. The packet has not yet received an acknowledgement.
// 3. It is not a retransmission of an earlier packet.
UpdateCongestedState();
if (controller_)
PostUpdates(controller_->OnSentPacket(*packet_msg));
}
});
}));
}
void RtpTransportControllerSend::OnReceivedPacket(
const ReceivedPacket& packet_msg) {
task_queue_.PostTask([this, packet_msg]() {
task_queue_.RunOrPost([this, packet_msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
PostUpdates(controller_->OnReceivedPacket(packet_msg));
@ -425,7 +440,7 @@ void RtpTransportControllerSend::OnReceivedPacket(
void RtpTransportControllerSend::UpdateBitrateConstraints(
const BitrateConstraints& updated) {
TargetRateConstraints msg = ConvertConstraints(updated, clock_);
task_queue_.PostTask([this, msg]() {
task_queue_.RunOrPost([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) {
PostUpdates(controller_->OnTargetRateConstraints(msg));
@ -506,7 +521,7 @@ void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
RemoteBitrateReport msg;
msg.receive_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.bandwidth = DataRate::BitsPerSec(bitrate);
task_queue_.PostTask([this, msg]() {
task_queue_.RunOrPost([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
PostUpdates(controller_->OnRemoteBitrateReport(msg));
@ -517,13 +532,9 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
const ReportBlockList& report_blocks,
int64_t rtt_ms,
int64_t now_ms) {
task_queue_.PostTask([this, report_blocks, now_ms]() {
task_queue_.RunOrPost([this, report_blocks, now_ms, rtt_ms]() {
RTC_DCHECK_RUN_ON(&task_queue_);
OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
});
task_queue_.PostTask([this, now_ms, rtt_ms]() {
RTC_DCHECK_RUN_ON(&task_queue_);
RoundTripTimeUpdate report;
report.receive_time = Timestamp::Millis(now_ms);
report.round_trip_time = TimeDelta::Millis(rtt_ms);
@ -536,7 +547,8 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
void RtpTransportControllerSend::OnAddPacket(
const RtpPacketSendInfo& packet_info) {
Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, packet_info, creation_time]() {
task_queue_.RunOrPost([this, packet_info, creation_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
feedback_demuxer_.AddPacket(packet_info);
transport_feedback_adapter_.AddPacket(
@ -549,7 +561,7 @@ void RtpTransportControllerSend::OnAddPacket(
void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, feedback, feedback_time]() {
task_queue_.RunOrPost([this, feedback, feedback_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
feedback_demuxer_.OnTransportFeedback(feedback);
absl::optional<TransportPacketsFeedback> feedback_msg =
@ -572,7 +584,7 @@ void RtpTransportControllerSend::OnRemoteNetworkEstimate(
estimate.link_capacity_lower, estimate.link_capacity_upper));
}
estimate.update_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, estimate] {
task_queue_.RunOrPost([this, estimate] {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
PostUpdates(controller_->OnNetworkStateEstimate(estimate));
@ -614,9 +626,11 @@ void RtpTransportControllerSend::UpdateInitialConstraints(
}
void RtpTransportControllerSend::StartProcessPeriodicTasks() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (!pacer_queue_update_task_.Running()) {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
task_queue_.TaskQueueForDelayedTasks(), kPacerQueueUpdateInterval,
[this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time = pacer_.ExpectedQueueTime();
control_handler_->SetPacerQueue(expected_queue_time);
@ -627,7 +641,7 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() {
controller_task_.Stop();
if (process_interval_.IsFinite()) {
controller_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), process_interval_, [this]() {
task_queue_.TaskQueueForDelayedTasks(), process_interval_, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
UpdateControllerWithTimeInterval();
return process_interval_;

View File

@ -20,6 +20,7 @@
#include "absl/strings/string_view.h"
#include "api/network_state_predictor.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/transport/network_control.h"
#include "api/units/data_rate.h"
@ -32,6 +33,7 @@
#include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/pacing/task_queue_paced_sender.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/network_route.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/task_queue.h"
@ -79,7 +81,7 @@ class RtpTransportControllerSend final
RtpVideoSenderInterface* rtp_video_sender) override;
// Implements RtpTransportControllerSendInterface
rtc::TaskQueue* GetWorkerQueue() override;
MaybeWorkerThread* GetWorkerQueue() override;
PacketRouter* packet_router() override;
NetworkStateEstimateObserver* network_state_estimate_observer() override;
@ -208,10 +210,8 @@ class RtpTransportControllerSend final
// Protected by internal locks.
RateLimiter retransmission_rate_limiter_;
// TODO(perkj): `task_queue_` is supposed to replace `process_thread_`.
// `task_queue_` is defined last to ensure all pending tasks are cancelled
// and deleted before any other members.
rtc::TaskQueue task_queue_;
ScopedTaskSafety safety_;
MaybeWorkerThread task_queue_;
const FieldTrialsView& field_trials_;
};

View File

@ -42,6 +42,7 @@ class TaskQueue;
namespace webrtc {
class FrameEncryptorInterface;
class MaybeWorkerThread;
class TargetTransferRateObserver;
class Transport;
class PacketRouter;
@ -93,7 +94,9 @@ struct RtpSenderFrameEncryptionConfig {
class RtpTransportControllerSendInterface {
public:
virtual ~RtpTransportControllerSendInterface() {}
virtual rtc::TaskQueue* GetWorkerQueue() = 0;
// TODO(webrtc:14502): Remove MaybeWorkerThread when experiment has been
// evaluated.
virtual MaybeWorkerThread* GetWorkerQueue() = 0;
virtual PacketRouter* packet_router() = 0;
virtual RtpVideoSenderInterface* CreateRtpVideoSender(

View File

@ -27,6 +27,7 @@
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
#include "modules/rtp_rtcp/source/rtp_sender.h"
#include "modules/utility/maybe_worker_thread.h"
#include "modules/video_coding/include/video_codec_interface.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"

View File

@ -208,7 +208,7 @@ class RtpVideoSenderTestFixture {
// that allow for running a `task` on the transport queue, similar to
// SendTask().
void RunOnTransportQueue(absl::AnyInvocable<void() &&> task) {
transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
transport_controller_.GetWorkerQueue()->RunOrPost(std::move(task));
AdvanceTime(TimeDelta::Zero());
}

View File

@ -50,7 +50,7 @@ class MockRtpTransportControllerSend
DestroyRtpVideoSender,
(RtpVideoSenderInterface*),
(override));
MOCK_METHOD(rtc::TaskQueue*, GetWorkerQueue, (), (override));
MOCK_METHOD(MaybeWorkerThread*, GetWorkerQueue, (), (override));
MOCK_METHOD(PacketRouter*, packet_router, (), (override));
MOCK_METHOD(NetworkStateEstimateObserver*,
network_state_estimate_observer,

View File

@ -123,6 +123,7 @@ rtc_library("video") {
"../modules/rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/rtp_rtcp:rtp_video_header",
"../modules/utility:utility",
"../modules/video_coding",
"../modules/video_coding:nack_requester",
"../modules/video_coding:packet_buffer",
@ -876,6 +877,7 @@ if (rtc_include_tests) {
"../modules/rtp_rtcp",
"../modules/rtp_rtcp:mock_rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/utility:utility",
"../modules/video_coding",
"../modules/video_coding:codec_globals_headers",
"../modules/video_coding:encoded_frame",

View File

@ -149,7 +149,7 @@ VideoSendStream::VideoSendStream(
const std::map<uint32_t, RtpPayloadState>& suspended_payload_states,
std::unique_ptr<FecController> fec_controller,
const FieldTrialsView& field_trials)
: rtp_transport_queue_(transport->GetWorkerQueue()->Get()),
: rtp_transport_queue_(transport->GetWorkerQueue()),
transport_(transport),
stats_proxy_(clock, config, encoder_config.content_type, field_trials),
config_(std::move(config)),
@ -186,7 +186,6 @@ VideoSendStream::VideoSendStream(
config_.frame_transformer)),
send_stream_(clock,
&stats_proxy_,
rtp_transport_queue_,
transport,
bitrate_allocator,
video_stream_encoder_.get(),
@ -236,7 +235,7 @@ void VideoSendStream::UpdateActiveSimulcastLayers(
RTC_LOG(LS_INFO) << "UpdateActiveSimulcastLayers: "
<< active_layers_string.str();
rtp_transport_queue_->PostTask(
rtp_transport_queue_->RunOrPost(
SafeTask(transport_queue_safety_, [this, active_layers] {
send_stream_.UpdateActiveSimulcastLayers(active_layers);
}));
@ -252,17 +251,14 @@ void VideoSendStream::Start() {
running_ = true;
rtp_transport_queue_->PostTask([this] {
transport_queue_safety_->SetAlive();
send_stream_.Start();
thread_sync_event_.Set();
});
// It is expected that after VideoSendStream::Start has been called, incoming
// frames are not dropped in VideoStreamEncoder. To ensure this, Start has to
// be synchronized.
// TODO(tommi): ^^^ Validate if this still holds.
thread_sync_event_.Wait(rtc::Event::kForever);
rtp_transport_queue_->RunSynchronous([this] {
transport_queue_safety_->SetAlive();
send_stream_.Start();
});
}
void VideoSendStream::Stop() {
@ -271,7 +267,7 @@ void VideoSendStream::Stop() {
return;
RTC_DLOG(LS_INFO) << "VideoSendStream::Stop";
running_ = false;
rtp_transport_queue_->PostTask(SafeTask(transport_queue_safety_, [this] {
rtp_transport_queue_->RunOrPost(SafeTask(transport_queue_safety_, [this] {
// As the stream can get re-used and implicitly restarted via changing
// the state of the active layers, we do not mark the
// `transport_queue_safety_` flag with `SetNotAlive()` here. That's only
@ -333,18 +329,17 @@ void VideoSendStream::StopPermanentlyAndGetRtpStates(
// Always run these cleanup steps regardless of whether running_ was set
// or not. This will unregister callbacks before destruction.
// See `VideoSendStreamImpl::StopVideoSendStream` for more.
rtp_transport_queue_->PostTask([this, rtp_state_map, payload_state_map]() {
rtp_transport_queue_->RunSynchronous(
[this, rtp_state_map, payload_state_map]() {
transport_queue_safety_->SetNotAlive();
send_stream_.Stop();
*rtp_state_map = send_stream_.GetRtpStates();
*payload_state_map = send_stream_.GetRtpPayloadStates();
thread_sync_event_.Set();
});
thread_sync_event_.Wait(rtc::Event::kForever);
}
void VideoSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
// Called on a network thread.
RTC_DCHECK_RUN_ON(&thread_checker_);
send_stream_.DeliverRtcp(packet, length);
}

View File

@ -22,6 +22,7 @@
#include "call/bitrate_allocator.h"
#include "call/video_receive_stream.h"
#include "call/video_send_stream.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/event.h"
#include "rtc_base/system/no_unique_address.h"
#include "video/encoder_rtcp_feedback.h"
@ -99,7 +100,7 @@ class VideoSendStream : public webrtc::VideoSendStream {
absl::optional<float> GetPacingFactorOverride() const;
RTC_NO_UNIQUE_ADDRESS SequenceChecker thread_checker_;
TaskQueueBase* const rtp_transport_queue_;
MaybeWorkerThread* const rtp_transport_queue_;
RtpTransportControllerSendInterface* const transport_;
rtc::Event thread_sync_event_;
rtc::scoped_refptr<PendingTaskSafetyFlag> transport_queue_safety_ =

View File

@ -21,6 +21,8 @@
#include "api/rtp_parameters.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/video_codecs/video_codec.h"
#include "call/rtp_transport_controller_send_interface.h"
#include "call/video_send_stream.h"
@ -206,7 +208,6 @@ PacingConfig::~PacingConfig() = default;
VideoSendStreamImpl::VideoSendStreamImpl(
Clock* clock,
SendStatisticsProxy* stats_proxy,
TaskQueueBase* rtp_transport_queue,
RtpTransportControllerSendInterface* transport,
BitrateAllocatorInterface* bitrate_allocator,
VideoStreamEncoderInterface* video_stream_encoder,
@ -222,7 +223,7 @@ VideoSendStreamImpl::VideoSendStreamImpl(
pacing_config_(PacingConfig(field_trials)),
stats_proxy_(stats_proxy),
config_(config),
rtp_transport_queue_(rtp_transport_queue),
rtp_transport_queue_(transport->GetWorkerQueue()),
timed_out_(false),
transport_(transport),
bitrate_allocator_(bitrate_allocator),
@ -287,7 +288,7 @@ VideoSendStreamImpl::VideoSendStreamImpl(
transport->EnablePeriodicAlrProbing(*enable_alr_bw_probing);
}
rtp_transport_queue_->PostTask(SafeTask(transport_queue_safety_, [this] {
rtp_transport_queue_->RunOrPost(SafeTask(transport_queue_safety_, [this] {
if (configured_pacing_factor_)
transport_->SetPacingFactor(*configured_pacing_factor_);
@ -302,8 +303,7 @@ VideoSendStreamImpl::~VideoSendStreamImpl() {
}
void VideoSendStreamImpl::DeliverRtcp(const uint8_t* packet, size_t length) {
// Runs on a network thread.
RTC_DCHECK(!rtp_transport_queue_->IsCurrent());
// Runs on a worker thread.
rtp_video_sender_->DeliverRtcp(packet, length);
}
@ -345,7 +345,8 @@ void VideoSendStreamImpl::StartupVideoSendStream() {
activity_ = false;
timed_out_ = false;
check_encoder_activity_task_ = RepeatingTaskHandle::DelayedStart(
rtp_transport_queue_, kEncoderTimeOut, [this] {
rtp_transport_queue_->TaskQueueForDelayedTasks(), kEncoderTimeOut,
[this] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
if (!activity_) {
if (!timed_out_) {
@ -400,22 +401,19 @@ void VideoSendStreamImpl::SignalEncoderTimedOut() {
void VideoSendStreamImpl::OnBitrateAllocationUpdated(
const VideoBitrateAllocation& allocation) {
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(SafeTask(transport_queue_safety_, [=] {
OnBitrateAllocationUpdated(allocation);
}));
// OnBitrateAllocationUpdated is invoked from the encoder task queue or
// the rtp_transport_queue_.
auto task = [=] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
if (encoder_target_rate_bps_ == 0) {
return;
}
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
int64_t now_ms = clock_->TimeInMilliseconds();
if (encoder_target_rate_bps_ != 0) {
if (video_bitrate_allocation_context_) {
// If new allocation is within kMaxVbaSizeDifferencePercent larger than
// the previously sent allocation and the same streams are still enabled,
// it is considered "similar". We do not want send similar allocations
// more once per kMaxVbaThrottleTimeMs.
// If new allocation is within kMaxVbaSizeDifferencePercent larger
// than the previously sent allocation and the same streams are still
// enabled, it is considered "similar". We do not want send similar
// allocations more once per kMaxVbaThrottleTimeMs.
const VideoBitrateAllocation& last =
video_bitrate_allocation_context_->last_sent_allocation;
const bool is_similar =
@ -441,6 +439,12 @@ void VideoSendStreamImpl::OnBitrateAllocationUpdated(
// Send bitrate allocation metadata only if encoder is not paused.
rtp_video_sender_->OnBitrateAllocationUpdated(allocation);
};
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->TaskQueueForPost()->PostTask(
SafeTask(transport_queue_safety_, std::move(task)));
} else {
task();
}
}
@ -474,17 +478,10 @@ void VideoSendStreamImpl::OnEncoderConfigurationChanged(
bool is_svc,
VideoEncoderConfig::ContentType content_type,
int min_transmit_bitrate_bps) {
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(SafeTask(
transport_queue_safety_,
[this, streams = std::move(streams), is_svc, content_type,
// Currently called on the encoder TQ
RTC_DCHECK(!rtp_transport_queue_->IsCurrent());
auto closure = [this, streams = std::move(streams), is_svc, content_type,
min_transmit_bitrate_bps]() mutable {
OnEncoderConfigurationChanged(std::move(streams), is_svc,
content_type, min_transmit_bitrate_bps);
}));
return;
}
RTC_DCHECK_GE(config_->rtp.ssrcs.size(), streams.size());
TRACE_EVENT0("webrtc", "VideoSendStream::OnEncoderConfigurationChanged");
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
@ -532,10 +529,14 @@ void VideoSendStreamImpl::OnEncoderConfigurationChanged(
num_temporal_layers);
if (rtp_video_sender_->IsActive()) {
// The send stream is started already. Update the allocator with new bitrate
// limits.
// The send stream is started already. Update the allocator with new
// bitrate limits.
bitrate_allocator_->AddObserver(this, GetAllocationConfig());
}
};
rtp_transport_queue_->TaskQueueForPost()->PostTask(
SafeTask(transport_queue_safety_, std::move(closure)));
}
EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
@ -547,42 +548,26 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
// Indicate that there still is activity going on.
activity_ = true;
RTC_DCHECK(!rtp_transport_queue_->IsCurrent());
auto enable_padding_task = [this]() {
if (disable_padding_) {
auto task_to_run_on_worker = [this]() {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
if (disable_padding_) {
disable_padding_ = false;
// To ensure that padding bitrate is propagated to the bitrate allocator.
SignalEncoderActive();
}
};
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(
SafeTask(transport_queue_safety_, std::move(enable_padding_task)));
} else {
enable_padding_task();
}
EncodedImageCallback::Result result(EncodedImageCallback::Result::OK);
result =
rtp_video_sender_->OnEncodedImage(encoded_image, codec_specific_info);
// Check if there's a throttled VideoBitrateAllocation that we should try
// sending.
auto update_task = [this]() {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
auto& context = video_bitrate_allocation_context_;
if (context && context->throttled_allocation) {
OnBitrateAllocationUpdated(*context->throttled_allocation);
}
};
if (!rtp_transport_queue_->IsCurrent()) {
rtp_transport_queue_->PostTask(
SafeTask(transport_queue_safety_, std::move(update_task)));
} else {
update_task();
}
rtp_transport_queue_->TaskQueueForPost()->PostTask(
SafeTask(transport_queue_safety_, std::move(task_to_run_on_worker)));
return result;
return rtp_video_sender_->OnEncodedImage(encoded_image, codec_specific_info);
}
void VideoSendStreamImpl::OnDroppedFrame(

View File

@ -32,6 +32,7 @@
#include "call/rtp_video_sender_interface.h"
#include "modules/include/module_common_types.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/utility/maybe_worker_thread.h"
#include "modules/video_coding/include/video_codec_interface.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/system/no_unique_address.h"
@ -66,7 +67,6 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver,
public:
VideoSendStreamImpl(Clock* clock,
SendStatisticsProxy* stats_proxy,
TaskQueueBase* rtp_transport_queue,
RtpTransportControllerSendInterface* transport,
BitrateAllocatorInterface* bitrate_allocator,
VideoStreamEncoderInterface* video_stream_encoder,
@ -139,7 +139,7 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver,
SendStatisticsProxy* const stats_proxy_;
const VideoSendStream::Config* const config_;
TaskQueueBase* const rtp_transport_queue_;
MaybeWorkerThread* const rtp_transport_queue_;
RepeatingTaskHandle check_encoder_activity_task_
RTC_GUARDED_BY(rtp_transport_queue_);

View File

@ -16,19 +16,24 @@
#include "absl/types/optional.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "call/rtp_video_sender.h"
#include "call/test/mock_bitrate_allocator.h"
#include "call/test/mock_rtp_transport_controller_send.h"
#include "modules/rtp_rtcp/source/rtp_sequence_number_map.h"
#include "modules/utility/maybe_worker_thread.h"
#include "modules/video_coding/fec_controller_default.h"
#include "rtc_base/event.h"
#include "rtc_base/experiments/alr_experiment.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/logging.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/mock_transport.h"
#include "test/scoped_key_value_config.h"
#include "test/time_controller/simulated_time_controller.h"
#include "video/test/mock_video_stream_encoder.h"
#include "video/video_send_stream.h"
@ -114,11 +119,16 @@ BitrateAllocationUpdate CreateAllocation(int bitrate_bps) {
class VideoSendStreamImplTest : public ::testing::Test {
protected:
VideoSendStreamImplTest()
: clock_(1000 * 1000 * 1000),
: time_controller_(Timestamp::Seconds(1000)),
config_(&transport_),
send_delay_stats_(&clock_),
test_queue_("test_queue"),
stats_proxy_(&clock_,
send_delay_stats_(time_controller_.GetClock()),
worker_queue_(field_trials_,
"worker_queue",
time_controller_.GetTaskQueueFactory()),
encoder_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"encoder_queue",
TaskQueueFactory::Priority::NORMAL)),
stats_proxy_(time_controller_.GetClock(),
config_,
VideoEncoderConfig::ContentType::kRealtimeVideo,
field_trials_) {
@ -135,6 +145,8 @@ class VideoSendStreamImplTest : public ::testing::Test {
EXPECT_CALL(rtp_video_sender_, IsActive())
.WillRepeatedly(
::testing::Invoke([&]() { return rtp_video_sender_active_; }));
ON_CALL(transport_controller_, GetWorkerQueue())
.WillByDefault(Return(&worker_queue_));
}
~VideoSendStreamImplTest() {}
@ -142,7 +154,7 @@ class VideoSendStreamImplTest : public ::testing::Test {
int initial_encoder_max_bitrate,
double initial_encoder_bitrate_priority,
VideoEncoderConfig::ContentType content_type) {
RTC_DCHECK(!test_queue_.IsCurrent());
RTC_DCHECK(!worker_queue_.IsCurrent());
EXPECT_CALL(bitrate_allocator_, GetStartBitrate(_))
.WillOnce(Return(123000));
@ -150,19 +162,21 @@ class VideoSendStreamImplTest : public ::testing::Test {
std::map<uint32_t, RtpState> suspended_ssrcs;
std::map<uint32_t, RtpPayloadState> suspended_payload_states;
auto ret = std::make_unique<VideoSendStreamImpl>(
&clock_, &stats_proxy_, test_queue_.Get(), &transport_controller_,
time_controller_.GetClock(), &stats_proxy_, &transport_controller_,
&bitrate_allocator_, &video_stream_encoder_, &config_,
initial_encoder_max_bitrate, initial_encoder_bitrate_priority,
content_type, &rtp_video_sender_, field_trials_);
// The call to GetStartBitrate() executes asynchronously on the tq.
test_queue_.WaitForPreviouslyPostedTasks();
// Ensure all tasks get to run.
time_controller_.AdvanceTime(TimeDelta::Zero());
testing::Mock::VerifyAndClearExpectations(&bitrate_allocator_);
return ret;
}
protected:
GlobalSimulatedTimeController time_controller_;
webrtc::test::ScopedKeyValueConfig field_trials_;
NiceMock<MockTransport> transport_;
NiceMock<MockRtpTransportControllerSend> transport_controller_;
@ -171,11 +185,11 @@ class VideoSendStreamImplTest : public ::testing::Test {
NiceMock<MockRtpVideoSender> rtp_video_sender_;
bool rtp_video_sender_active_ = false;
SimulatedClock clock_;
RtcEventLogNull event_log_;
VideoSendStream::Config config_;
SendDelayStats send_delay_stats_;
TaskQueueForTest test_queue_;
MaybeWorkerThread worker_queue_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> encoder_queue_;
SendStatisticsProxy stats_proxy_;
PacketRouter packet_router_;
};
@ -195,7 +209,7 @@ TEST_F(VideoSendStreamImplTest, RegistersAsBitrateObserverOnStart) {
EXPECT_EQ(config.enforce_min_bitrate, !kSuspend);
EXPECT_EQ(config.bitrate_priority, kDefaultBitratePriority);
}));
test_queue_.SendTask([&] {
worker_queue_.RunSynchronous([&] {
vss_impl->Start();
EXPECT_CALL(bitrate_allocator_, RemoveObserver(vss_impl.get())).Times(1);
vss_impl->Stop();
@ -211,9 +225,7 @@ TEST_F(VideoSendStreamImplTest, UpdatesObserverOnConfigurationChange) {
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kRealtimeVideo);
test_queue_.SendTask(
[&] {
vss_impl->Start();
worker_queue_.RunSynchronous([&] { vss_impl->Start(); });
// QVGA + VGA configuration matching defaults in
// media/engine/simulcast.cc.
@ -243,8 +255,9 @@ TEST_F(VideoSendStreamImplTest, UpdatesObserverOnConfigurationChange) {
config_.rtp.ssrcs.emplace_back(2);
EXPECT_CALL(bitrate_allocator_, AddObserver(vss_impl.get(), _))
.WillRepeatedly(Invoke([&](BitrateAllocatorObserver*,
MediaStreamAllocationConfig config) {
.WillRepeatedly(Invoke(
[&](BitrateAllocatorObserver*, MediaStreamAllocationConfig config) {
EXPECT_TRUE(worker_queue_.IsCurrent());
EXPECT_EQ(config.min_bitrate_bps,
static_cast<uint32_t>(min_transmit_bitrate_bps));
EXPECT_EQ(config.max_bitrate_bps,
@ -258,13 +271,15 @@ TEST_F(VideoSendStreamImplTest, UpdatesObserverOnConfigurationChange) {
EXPECT_EQ(config.enforce_min_bitrate, !kSuspend);
}));
encoder_queue_->PostTask([&] {
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get())
->OnEncoderConfigurationChanged(
std::vector<VideoStream>{qvga_stream, vga_stream}, false,
VideoEncoderConfig::ContentType::kRealtimeVideo,
min_transmit_bitrate_bps);
vss_impl->Stop();
});
time_controller_.AdvanceTime(TimeDelta::Zero());
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest, UpdatesObserverOnConfigurationChangeWithAlr) {
@ -276,9 +291,7 @@ TEST_F(VideoSendStreamImplTest, UpdatesObserverOnConfigurationChangeWithAlr) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kScreen);
test_queue_.SendTask(
[&] {
vss_impl->Start();
worker_queue_.RunSynchronous([&] { vss_impl->Start(); });
// Simulcast screenshare.
VideoStream low_stream;
@ -311,8 +324,9 @@ TEST_F(VideoSendStreamImplTest, UpdatesObserverOnConfigurationChangeWithAlr) {
config_.rtp.ssrcs.emplace_back(2);
EXPECT_CALL(bitrate_allocator_, AddObserver(vss_impl.get(), _))
.WillRepeatedly(Invoke([&](BitrateAllocatorObserver*,
MediaStreamAllocationConfig config) {
.WillRepeatedly(Invoke(
[&](BitrateAllocatorObserver*, MediaStreamAllocationConfig config) {
EXPECT_TRUE(worker_queue_.IsCurrent());
EXPECT_EQ(config.min_bitrate_bps,
static_cast<uint32_t>(low_stream.min_bitrate_bps));
EXPECT_EQ(config.max_bitrate_bps,
@ -324,14 +338,14 @@ TEST_F(VideoSendStreamImplTest, UpdatesObserverOnConfigurationChangeWithAlr) {
}
EXPECT_EQ(config.enforce_min_bitrate, !kSuspend);
}));
encoder_queue_->PostTask([&] {
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get())
->OnEncoderConfigurationChanged(
std::vector<VideoStream>{low_stream, high_stream}, false,
VideoEncoderConfig::ContentType::kScreen,
min_transmit_bitrate_bps);
vss_impl->Stop();
VideoEncoderConfig::ContentType::kScreen, min_transmit_bitrate_bps);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest,
@ -343,10 +357,7 @@ TEST_F(VideoSendStreamImplTest,
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kRealtimeVideo);
test_queue_.SendTask(
[&] {
vss_impl->Start();
worker_queue_.RunSynchronous([&] { vss_impl->Start(); });
// 2-layer video simulcast.
VideoStream low_stream;
low_stream.width = 320;
@ -374,26 +385,28 @@ TEST_F(VideoSendStreamImplTest,
EXPECT_CALL(bitrate_allocator_, AddObserver(vss_impl.get(), _))
.WillRepeatedly(Invoke([&](BitrateAllocatorObserver*,
MediaStreamAllocationConfig config) {
EXPECT_TRUE(worker_queue_.IsCurrent());
EXPECT_EQ(config.min_bitrate_bps,
static_cast<uint32_t>(low_stream.min_bitrate_bps));
EXPECT_EQ(config.max_bitrate_bps,
static_cast<uint32_t>(low_stream.max_bitrate_bps +
high_stream.max_bitrate_bps));
if (config.pad_up_bitrate_bps != 0) {
EXPECT_EQ(
config.pad_up_bitrate_bps,
EXPECT_EQ(config.pad_up_bitrate_bps,
static_cast<uint32_t>(low_stream.target_bitrate_bps +
1.25 * high_stream.min_bitrate_bps));
}
}));
encoder_queue_->PostTask([&] {
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get())
->OnEncoderConfigurationChanged(
std::vector<VideoStream>{low_stream, high_stream}, false,
VideoEncoderConfig::ContentType::kRealtimeVideo,
/*min_transmit_bitrate_bps=*/0);
vss_impl->Stop();
});
time_controller_.AdvanceTime(TimeDelta::Zero());
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest, SetsScreensharePacingFactorWithFeedback) {
@ -408,7 +421,7 @@ TEST_F(VideoSendStreamImplTest, SetsScreensharePacingFactorWithFeedback) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kScreen);
test_queue_.SendTask([&] {
worker_queue_.RunSynchronous([&] {
vss_impl->Start();
vss_impl->Stop();
});
@ -419,7 +432,7 @@ TEST_F(VideoSendStreamImplTest, DoesNotSetPacingFactorWithoutFeedback) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kScreen);
test_queue_.SendTask([&] {
worker_queue_.RunSynchronous([&] {
EXPECT_CALL(transport_controller_, SetPacingFactor(_)).Times(0);
vss_impl->Start();
vss_impl->Stop();
@ -430,14 +443,11 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationWhenEnabled) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kScreen);
test_queue_.SendTask(
[&] {
EXPECT_CALL(transport_controller_, SetPacingFactor(_)).Times(0);
VideoStreamEncoderInterface::EncoderSink* const sink =
static_cast<VideoStreamEncoderInterface::EncoderSink*>(
vss_impl.get());
vss_impl->Start();
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get());
worker_queue_.RunSynchronous([&] { vss_impl->Start(); });
// Populate a test instance of video bitrate allocation.
VideoBitrateAllocation alloc;
alloc.SetBitrate(0, 0, 10000);
@ -445,11 +455,15 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationWhenEnabled) {
alloc.SetBitrate(1, 0, 30000);
alloc.SetBitrate(1, 1, 40000);
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(0);
encoder_queue_->PostTask([&] {
// Encoder starts out paused, don't forward allocation.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(0);
sink->OnBitrateAllocationUpdated(alloc);
sink->OnBitrateAllocationUpdated(alloc);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
worker_queue_.RunSynchronous([&] {
// Unpause encoder, allocation should be passed through.
const uint32_t kBitrateBps = 100000;
EXPECT_CALL(rtp_video_sender_, GetPayloadBitrateBps())
@ -457,30 +471,29 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationWhenEnabled) {
.WillOnce(Return(kBitrateBps));
static_cast<BitrateAllocatorObserver*>(vss_impl.get())
->OnBitrateUpdated(CreateAllocation(kBitrateBps));
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(1);
sink->OnBitrateAllocationUpdated(alloc);
});
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(1);
encoder_queue_->PostTask([&] { sink->OnBitrateAllocationUpdated(alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
worker_queue_.RunSynchronous([&] {
// Pause encoder again, and block allocations.
EXPECT_CALL(rtp_video_sender_, GetPayloadBitrateBps())
.Times(1)
.WillOnce(Return(0));
static_cast<BitrateAllocatorObserver*>(vss_impl.get())
->OnBitrateUpdated(CreateAllocation(0));
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(0);
sink->OnBitrateAllocationUpdated(alloc);
vss_impl->Stop();
});
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(0);
encoder_queue_->PostTask([&] { sink->OnBitrateAllocationUpdated(alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest, ThrottlesVideoBitrateAllocationWhenTooSimilar) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kScreen);
test_queue_.SendTask(
[&] {
worker_queue_.RunSynchronous([&] {
vss_impl->Start();
// Unpause encoder, to allows allocations to be passed through.
const uint32_t kBitrateBps = 100000;
@ -489,9 +502,9 @@ TEST_F(VideoSendStreamImplTest, ThrottlesVideoBitrateAllocationWhenTooSimilar) {
.WillOnce(Return(kBitrateBps));
static_cast<BitrateAllocatorObserver*>(vss_impl.get())
->OnBitrateUpdated(CreateAllocation(kBitrateBps));
});
VideoStreamEncoderInterface::EncoderSink* const sink =
static_cast<VideoStreamEncoderInterface::EncoderSink*>(
vss_impl.get());
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get());
// Populate a test instance of video bitrate allocation.
VideoBitrateAllocation alloc;
@ -501,9 +514,9 @@ TEST_F(VideoSendStreamImplTest, ThrottlesVideoBitrateAllocationWhenTooSimilar) {
alloc.SetBitrate(1, 1, 40000);
// Initial value.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(1);
sink->OnBitrateAllocationUpdated(alloc);
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(1);
encoder_queue_->PostTask([&] { sink->OnBitrateAllocationUpdated(alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
VideoBitrateAllocation updated_alloc = alloc;
// Needs 10% increase in bitrate to trigger immediate forward.
@ -513,33 +526,36 @@ TEST_F(VideoSendStreamImplTest, ThrottlesVideoBitrateAllocationWhenTooSimilar) {
// Too small increase, don't forward.
updated_alloc.SetBitrate(0, 0, base_layer_min_update_bitrate_bps - 1);
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(_)).Times(0);
sink->OnBitrateAllocationUpdated(updated_alloc);
encoder_queue_->PostTask(
[&] { sink->OnBitrateAllocationUpdated(updated_alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
// Large enough increase, do forward.
updated_alloc.SetBitrate(0, 0, base_layer_min_update_bitrate_bps);
EXPECT_CALL(rtp_video_sender_,
OnBitrateAllocationUpdated(updated_alloc))
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(updated_alloc))
.Times(1);
sink->OnBitrateAllocationUpdated(updated_alloc);
encoder_queue_->PostTask(
[&] { sink->OnBitrateAllocationUpdated(updated_alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
// This is now a decrease compared to last forward allocation,
// forward immediately.
updated_alloc.SetBitrate(0, 0, base_layer_min_update_bitrate_bps - 1);
EXPECT_CALL(rtp_video_sender_,
OnBitrateAllocationUpdated(updated_alloc))
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(updated_alloc))
.Times(1);
sink->OnBitrateAllocationUpdated(updated_alloc);
encoder_queue_->PostTask(
[&] { sink->OnBitrateAllocationUpdated(updated_alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
vss_impl->Stop();
});
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationOnLayerChange) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kScreen);
test_queue_.SendTask(
[&] {
worker_queue_.RunSynchronous([&] {
vss_impl->Start();
// Unpause encoder, to allows allocations to be passed through.
const uint32_t kBitrateBps = 100000;
@ -548,9 +564,9 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationOnLayerChange) {
.WillOnce(Return(kBitrateBps));
static_cast<BitrateAllocatorObserver*>(vss_impl.get())
->OnBitrateUpdated(CreateAllocation(kBitrateBps));
});
VideoStreamEncoderInterface::EncoderSink* const sink =
static_cast<VideoStreamEncoderInterface::EncoderSink*>(
vss_impl.get());
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get());
// Populate a test instance of video bitrate allocation.
VideoBitrateAllocation alloc;
@ -560,8 +576,7 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationOnLayerChange) {
alloc.SetBitrate(1, 1, 40000);
// Initial value.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(1);
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(1);
sink->OnBitrateAllocationUpdated(alloc);
// Move some bitrate from one layer to a new one, but keep sum the
@ -570,21 +585,20 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationOnLayerChange) {
updated_alloc.SetBitrate(2, 0, 10000);
updated_alloc.SetBitrate(1, 1, alloc.GetBitrate(1, 1) - 10000);
EXPECT_EQ(alloc.get_sum_bps(), updated_alloc.get_sum_bps());
EXPECT_CALL(rtp_video_sender_,
OnBitrateAllocationUpdated(updated_alloc))
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(updated_alloc))
.Times(1);
sink->OnBitrateAllocationUpdated(updated_alloc);
encoder_queue_->PostTask(
[&] { sink->OnBitrateAllocationUpdated(updated_alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
vss_impl->Stop();
});
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationAfterTimeout) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kScreen);
test_queue_.SendTask(
[&] {
worker_queue_.RunSynchronous([&] {
vss_impl->Start();
const uint32_t kBitrateBps = 100000;
// Unpause encoder, to allows allocations to be passed through.
@ -593,12 +607,13 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationAfterTimeout) {
.WillRepeatedly(Return(kBitrateBps));
static_cast<BitrateAllocatorObserver*>(vss_impl.get())
->OnBitrateUpdated(CreateAllocation(kBitrateBps));
});
VideoStreamEncoderInterface::EncoderSink* const sink =
static_cast<VideoStreamEncoderInterface::EncoderSink*>(
vss_impl.get());
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get());
// Populate a test instance of video bitrate allocation.
VideoBitrateAllocation alloc;
alloc.SetBitrate(0, 0, 10000);
alloc.SetBitrate(0, 1, 20000);
alloc.SetBitrate(1, 0, 30000);
@ -607,73 +622,83 @@ TEST_F(VideoSendStreamImplTest, ForwardsVideoBitrateAllocationAfterTimeout) {
EncodedImage encoded_image;
CodecSpecificInfo codec_specific;
EXPECT_CALL(rtp_video_sender_, OnEncodedImage)
.WillRepeatedly(Return(EncodedImageCallback::Result(
EncodedImageCallback::Result::OK)));
.WillRepeatedly(Return(
EncodedImageCallback::Result(EncodedImageCallback::Result::OK)));
// Max time we will throttle similar video bitrate allocations.
static constexpr int64_t kMaxVbaThrottleTimeMs = 500;
{
// Initial value.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(1);
sink->OnBitrateAllocationUpdated(alloc);
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(1);
encoder_queue_->PostTask([&] { sink->OnBitrateAllocationUpdated(alloc); });
time_controller_.AdvanceTime(TimeDelta::Zero());
}
{
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(0);
encoder_queue_->PostTask([&] {
// Sending same allocation again, this one should be throttled.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(0);
sink->OnBitrateAllocationUpdated(alloc);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
}
clock_.AdvanceTimeMicroseconds(kMaxVbaThrottleTimeMs * 1000);
time_controller_.AdvanceTime(TimeDelta::Millis(kMaxVbaThrottleTimeMs));
{
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(1);
encoder_queue_->PostTask([&] {
// Sending similar allocation again after timeout, should
// forward.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(1);
sink->OnBitrateAllocationUpdated(alloc);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
}
{
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(0);
encoder_queue_->PostTask([&] {
// Sending similar allocation again without timeout, throttle.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(0);
sink->OnBitrateAllocationUpdated(alloc);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
}
{
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(0);
encoder_queue_->PostTask([&] {
// Send encoded image, should be a noop.
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(0);
static_cast<EncodedImageCallback*>(vss_impl.get())
->OnEncodedImage(encoded_image, &codec_specific);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
}
{
// Advance time and send encoded image, this should wake up and
// send cached bitrate allocation.
clock_.AdvanceTimeMicroseconds(kMaxVbaThrottleTimeMs * 1000);
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(1);
time_controller_.AdvanceTime(TimeDelta::Millis(kMaxVbaThrottleTimeMs));
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(1);
encoder_queue_->PostTask([&] {
static_cast<EncodedImageCallback*>(vss_impl.get())
->OnEncodedImage(encoded_image, &codec_specific);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
}
{
// Advance time and send encoded image, there should be no
// cached allocation to send.
clock_.AdvanceTimeMicroseconds(kMaxVbaThrottleTimeMs * 1000);
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc))
.Times(0);
time_controller_.AdvanceTime(TimeDelta::Millis(kMaxVbaThrottleTimeMs));
EXPECT_CALL(rtp_video_sender_, OnBitrateAllocationUpdated(alloc)).Times(0);
encoder_queue_->PostTask([&] {
static_cast<EncodedImageCallback*>(vss_impl.get())
->OnEncodedImage(encoded_image, &codec_specific);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
}
vss_impl->Stop();
});
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest, CallsVideoStreamEncoderOnBitrateUpdate) {
@ -684,10 +709,7 @@ TEST_F(VideoSendStreamImplTest, CallsVideoStreamEncoderOnBitrateUpdate) {
auto vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kRealtimeVideo);
test_queue_.SendTask(
[&] {
vss_impl->Start();
worker_queue_.RunSynchronous([&] { vss_impl->Start(); });
VideoStream qvga_stream;
qvga_stream.width = 320;
qvga_stream.height = 180;
@ -702,12 +724,16 @@ TEST_F(VideoSendStreamImplTest, CallsVideoStreamEncoderOnBitrateUpdate) {
config_.rtp.ssrcs.emplace_back(1);
encoder_queue_->PostTask([&] {
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get())
->OnEncoderConfigurationChanged(
std::vector<VideoStream>{qvga_stream}, false,
VideoEncoderConfig::ContentType::kRealtimeVideo,
min_transmit_bitrate_bps);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
worker_queue_.RunSynchronous([&] {
const DataRate network_constrained_rate =
DataRate::BitsPerSec(qvga_stream.target_bitrate_bps);
BitrateAllocationUpdate update;
@ -784,26 +810,25 @@ TEST_F(VideoSendStreamImplTest, DisablesPaddingOnPausedEncoder) {
std::unique_ptr<VideoSendStreamImpl> vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kRealtimeVideo);
test_queue_.SendTask(
[&] {
// Capture padding bitrate for testing.
EXPECT_CALL(bitrate_allocator_, AddObserver(vss_impl.get(), _))
.WillRepeatedly(Invoke([&](BitrateAllocatorObserver*,
MediaStreamAllocationConfig config) {
.WillRepeatedly(Invoke(
[&](BitrateAllocatorObserver*, MediaStreamAllocationConfig config) {
padding_bitrate = config.pad_up_bitrate_bps;
}));
// If observer is removed, no padding will be sent.
EXPECT_CALL(bitrate_allocator_, RemoveObserver(vss_impl.get()))
.WillRepeatedly(Invoke(
[&](BitrateAllocatorObserver*) { padding_bitrate = 0; }));
.WillRepeatedly(
Invoke([&](BitrateAllocatorObserver*) { padding_bitrate = 0; }));
EXPECT_CALL(rtp_video_sender_, OnEncodedImage)
.WillRepeatedly(Return(EncodedImageCallback::Result(
EncodedImageCallback::Result::OK)));
.WillRepeatedly(Return(
EncodedImageCallback::Result(EncodedImageCallback::Result::OK)));
const bool kSuspend = false;
config_.suspend_below_min_bitrate = kSuspend;
config_.rtp.extensions.emplace_back(
RtpExtension::kTransportSequenceNumberUri, 1);
config_.rtp.extensions.emplace_back(RtpExtension::kTransportSequenceNumberUri,
1);
VideoStream qvga_stream;
qvga_stream.width = 320;
qvga_stream.height = 180;
@ -817,22 +842,23 @@ TEST_F(VideoSendStreamImplTest, DisablesPaddingOnPausedEncoder) {
int min_transmit_bitrate_bps = 30000;
config_.rtp.ssrcs.emplace_back(1);
vss_impl->Start();
worker_queue_.RunSynchronous([&] { vss_impl->Start(); });
// Starts without padding.
EXPECT_EQ(0, padding_bitrate);
encoder_queue_->PostTask([&] {
// Reconfigure e.g. due to a fake frame.
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get())
->OnEncoderConfigurationChanged(
std::vector<VideoStream>{qvga_stream}, false,
VideoEncoderConfig::ContentType::kRealtimeVideo,
min_transmit_bitrate_bps);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
// Still no padding because no actual frames were passed, only
// reconfiguration happened.
EXPECT_EQ(0, padding_bitrate);
worker_queue_.RunSynchronous([&] {
// Unpause encoder.
const uint32_t kBitrateBps = 100000;
EXPECT_CALL(rtp_video_sender_, GetPayloadBitrateBps())
@ -840,37 +866,33 @@ TEST_F(VideoSendStreamImplTest, DisablesPaddingOnPausedEncoder) {
.WillOnce(Return(kBitrateBps));
static_cast<BitrateAllocatorObserver*>(vss_impl.get())
->OnBitrateUpdated(CreateAllocation(kBitrateBps));
});
encoder_queue_->PostTask([&] {
// A frame is encoded.
EncodedImage encoded_image;
CodecSpecificInfo codec_specific;
static_cast<EncodedImageCallback*>(vss_impl.get())
->OnEncodedImage(encoded_image, &codec_specific);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
// Only after actual frame is encoded are we enabling the padding.
EXPECT_GT(padding_bitrate, 0);
});
rtc::Event done;
test_queue_.Get()->PostDelayedTask(
[&] {
// No padding supposed to be sent for paused observer
time_controller_.AdvanceTime(TimeDelta::Seconds(5));
// Since no more frames are sent the last 5s, no padding is supposed to be
// sent.
EXPECT_EQ(0, padding_bitrate);
testing::Mock::VerifyAndClearExpectations(&bitrate_allocator_);
vss_impl->Stop();
done.Set();
},
TimeDelta::Seconds(5));
// Pause the test suite so that the last delayed task executes.
ASSERT_TRUE(done.Wait(TimeDelta::Seconds(10)));
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
TEST_F(VideoSendStreamImplTest, KeepAliveOnDroppedFrame) {
std::unique_ptr<VideoSendStreamImpl> vss_impl = CreateVideoSendStreamImpl(
kDefaultInitialBitrateBps, kDefaultBitratePriority,
VideoEncoderConfig::ContentType::kRealtimeVideo);
test_queue_.SendTask(
[&] {
EXPECT_CALL(bitrate_allocator_, RemoveObserver(vss_impl.get())).Times(0);
worker_queue_.RunSynchronous([&] {
vss_impl->Start();
const uint32_t kBitrateBps = 100000;
EXPECT_CALL(rtp_video_sender_, GetPayloadBitrateBps())
@ -878,24 +900,17 @@ TEST_F(VideoSendStreamImplTest, KeepAliveOnDroppedFrame) {
.WillOnce(Return(kBitrateBps));
static_cast<BitrateAllocatorObserver*>(vss_impl.get())
->OnBitrateUpdated(CreateAllocation(kBitrateBps));
});
encoder_queue_->PostTask([&] {
// Keep the stream from deallocating by dropping a frame.
static_cast<EncodedImageCallback*>(vss_impl.get())
->OnDroppedFrame(
EncodedImageCallback::DropReason::kDroppedByEncoder);
EXPECT_CALL(bitrate_allocator_, RemoveObserver(vss_impl.get()))
.Times(0);
->OnDroppedFrame(EncodedImageCallback::DropReason::kDroppedByEncoder);
});
rtc::Event done;
test_queue_.Get()->PostDelayedTask(
[&] {
time_controller_.AdvanceTime(TimeDelta::Seconds(2));
worker_queue_.RunSynchronous([&] {
testing::Mock::VerifyAndClearExpectations(&bitrate_allocator_);
vss_impl->Stop();
done.Set();
},
TimeDelta::Seconds(2));
ASSERT_TRUE(done.Wait(TimeDelta::Seconds(5)));
});
}
TEST_F(VideoSendStreamImplTest, ConfiguresBitratesForSvc) {
@ -925,9 +940,8 @@ TEST_F(VideoSendStreamImplTest, ConfiguresBitratesForSvc) {
test_config.screenshare
? VideoEncoderConfig::ContentType::kScreen
: VideoEncoderConfig::ContentType::kRealtimeVideo);
test_queue_.SendTask(
[&] {
vss_impl->Start();
worker_queue_.RunSynchronous([&] { vss_impl->Start(); });
// Svc
VideoStream stream;
@ -953,11 +967,10 @@ TEST_F(VideoSendStreamImplTest, ConfiguresBitratesForSvc) {
Field(&MediaStreamAllocationConfig::max_bitrate_bps,
static_cast<uint32_t>(stream.max_bitrate_bps)),
// Stream not yet active - no padding.
Field(&MediaStreamAllocationConfig::pad_up_bitrate_bps,
0u),
Field(&MediaStreamAllocationConfig::pad_up_bitrate_bps, 0u),
Field(&MediaStreamAllocationConfig::enforce_min_bitrate,
!kSuspend))));
encoder_queue_->PostTask([&] {
static_cast<VideoStreamEncoderInterface::EncoderSink*>(vss_impl.get())
->OnEncoderConfigurationChanged(
std::vector<VideoStream>{stream}, true,
@ -965,25 +978,21 @@ TEST_F(VideoSendStreamImplTest, ConfiguresBitratesForSvc) {
? VideoEncoderConfig::ContentType::kScreen
: VideoEncoderConfig::ContentType::kRealtimeVideo,
test_config.min_padding_bitrate_bps);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&bitrate_allocator_);
// Simulate an encoded image, this will turn the stream active and
// enable padding.
EncodedImage encoded_image;
CodecSpecificInfo codec_specific;
EXPECT_CALL(rtp_video_sender_, OnEncodedImage)
.WillRepeatedly(Return(EncodedImageCallback::Result(
EncodedImageCallback::Result::OK)));
.WillRepeatedly(Return(
EncodedImageCallback::Result(EncodedImageCallback::Result::OK)));
// Screensharing implicitly forces ALR.
const bool using_alr = test_config.alr || test_config.screenshare;
// If ALR is used, pads only to min bitrate as rampup is handled by
// probing. Otherwise target_bitrate contains the padding target.
const RateControlSettings trials =
RateControlSettings::ParseFromFieldTrials();
int expected_padding =
using_alr
? stream.min_bitrate_bps
using_alr ? stream.min_bitrate_bps
: static_cast<int>(stream.target_bitrate_bps *
(test_config.screenshare ? 1.35 : 1.2));
// Min padding bitrate may override padding target.
@ -1003,12 +1012,17 @@ TEST_F(VideoSendStreamImplTest, ConfiguresBitratesForSvc) {
expected_padding),
Field(&MediaStreamAllocationConfig::enforce_min_bitrate,
!kSuspend))));
encoder_queue_->PostTask([&] {
EncodedImage encoded_image;
CodecSpecificInfo codec_specific;
static_cast<EncodedImageCallback*>(vss_impl.get())
->OnEncodedImage(encoded_image, &codec_specific);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&bitrate_allocator_);
vss_impl->Stop();
});
worker_queue_.RunSynchronous([&] { vss_impl->Stop(); });
}
}
} // namespace internal