Move receive side congestion controller periodic task to worker thread
This way call no longer needs dedicated process thread Bug: webrtc:7219 Change-Id: I8ab677b1e6b909eeb726aefed5e6d10ce4bc43b7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265921 Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Reviewed-by: Philip Eliasson <philipel@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37279}
This commit is contained in:
parent
c6014bcbb1
commit
675dfb4a1f
@ -332,6 +332,7 @@ rtc_library("call") {
|
||||
"../rtc_base/experiments:field_trial_parser",
|
||||
"../rtc_base/network:sent_packet",
|
||||
"../rtc_base/system:no_unique_address",
|
||||
"../rtc_base/task_utils:repeating_task",
|
||||
"../system_wrappers",
|
||||
"../system_wrappers:field_trial",
|
||||
"../system_wrappers:metrics",
|
||||
|
||||
28
call/call.cc
28
call/call.cc
@ -57,6 +57,7 @@
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/strings/string_builder.h"
|
||||
#include "rtc_base/system/no_unique_address.h"
|
||||
#include "rtc_base/task_utils/repeating_task.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
#include "rtc_base/time_utils.h"
|
||||
#include "rtc_base/trace_event.h"
|
||||
@ -203,7 +204,6 @@ class Call final : public webrtc::Call,
|
||||
Call(Clock* clock,
|
||||
const Call::Config& config,
|
||||
std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
|
||||
rtc::scoped_refptr<SharedModuleThread> module_process_thread,
|
||||
TaskQueueFactory* task_queue_factory);
|
||||
~Call() override;
|
||||
|
||||
@ -378,7 +378,6 @@ class Call final : public webrtc::Call,
|
||||
RTC_NO_UNIQUE_ADDRESS SequenceChecker send_transport_sequence_checker_;
|
||||
|
||||
const int num_cpu_cores_;
|
||||
const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
|
||||
const std::unique_ptr<CallStats> call_stats_;
|
||||
const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
|
||||
const Call::Config config_ RTC_GUARDED_BY(worker_thread_);
|
||||
@ -456,6 +455,7 @@ class Call final : public webrtc::Call,
|
||||
std::atomic<uint32_t> configured_max_padding_bitrate_bps_{0};
|
||||
|
||||
ReceiveSideCongestionController receive_side_cc_;
|
||||
RepeatingTaskHandle receive_side_cc_periodic_task_;
|
||||
|
||||
const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_;
|
||||
|
||||
@ -513,7 +513,7 @@ Call* Call::Create(const Call::Config& config) {
|
||||
|
||||
Call* Call::Create(const Call::Config& config,
|
||||
Clock* clock,
|
||||
rtc::scoped_refptr<SharedModuleThread> call_thread,
|
||||
rtc::scoped_refptr<SharedModuleThread> /*call_thread*/,
|
||||
std::unique_ptr<ProcessThread> pacer_thread) {
|
||||
RTC_DCHECK(config.task_queue_factory);
|
||||
|
||||
@ -524,17 +524,17 @@ Call* Call::Create(const Call::Config& config,
|
||||
return new internal::Call(
|
||||
clock, config,
|
||||
transport_controller_factory_.Create(transportConfig, clock),
|
||||
std::move(call_thread), config.task_queue_factory);
|
||||
config.task_queue_factory);
|
||||
}
|
||||
|
||||
Call* Call::Create(const Call::Config& config,
|
||||
Clock* clock,
|
||||
rtc::scoped_refptr<SharedModuleThread> call_thread,
|
||||
rtc::scoped_refptr<SharedModuleThread> /*call_thread*/,
|
||||
std::unique_ptr<RtpTransportControllerSendInterface>
|
||||
transportControllerSend) {
|
||||
RTC_DCHECK(config.task_queue_factory);
|
||||
return new internal::Call(clock, config, std::move(transportControllerSend),
|
||||
std::move(call_thread), config.task_queue_factory);
|
||||
config.task_queue_factory);
|
||||
}
|
||||
|
||||
class SharedModuleThread::Impl {
|
||||
@ -796,7 +796,6 @@ void Call::SendStats::SetMinAllocatableRate(BitrateAllocationLimits limits) {
|
||||
Call::Call(Clock* clock,
|
||||
const Call::Config& config,
|
||||
std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
|
||||
rtc::scoped_refptr<SharedModuleThread> module_process_thread,
|
||||
TaskQueueFactory* task_queue_factory)
|
||||
: clock_(clock),
|
||||
task_queue_factory_(task_queue_factory),
|
||||
@ -811,7 +810,6 @@ Call::Call(Clock* clock,
|
||||
worker_thread_)
|
||||
: nullptr),
|
||||
num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
|
||||
module_process_thread_(std::move(module_process_thread)),
|
||||
call_stats_(new CallStats(clock_, worker_thread_)),
|
||||
bitrate_allocator_(new BitrateAllocator(this)),
|
||||
config_(config),
|
||||
@ -849,10 +847,11 @@ Call::Call(Clock* clock,
|
||||
|
||||
call_stats_->RegisterStatsObserver(&receive_side_cc_);
|
||||
|
||||
module_process_thread_->process_thread()->RegisterModule(
|
||||
receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
|
||||
module_process_thread_->process_thread()->RegisterModule(&receive_side_cc_,
|
||||
RTC_FROM_HERE);
|
||||
ReceiveSideCongestionController* receive_side_cc = &receive_side_cc_;
|
||||
receive_side_cc_periodic_task_ = RepeatingTaskHandle::Start(
|
||||
worker_thread_,
|
||||
[receive_side_cc] { return receive_side_cc->MaybeProcess(); },
|
||||
TaskQueueBase::DelayPrecision::kLow, clock_);
|
||||
}
|
||||
|
||||
Call::~Call() {
|
||||
@ -864,9 +863,7 @@ Call::~Call() {
|
||||
RTC_CHECK(audio_receive_streams_.empty());
|
||||
RTC_CHECK(video_receive_streams_.empty());
|
||||
|
||||
module_process_thread_->process_thread()->DeRegisterModule(
|
||||
receive_side_cc_.GetRemoteBitrateEstimator(true));
|
||||
module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_);
|
||||
receive_side_cc_periodic_task_.Stop();
|
||||
call_stats_->DeregisterStatsObserver(&receive_side_cc_);
|
||||
send_stats_.SetFirstPacketTime(transport_send_->GetFirstPacketTime());
|
||||
|
||||
@ -887,7 +884,6 @@ void Call::EnsureStarted() {
|
||||
// off being kicked off on request rather than in the ctor.
|
||||
transport_send_->RegisterTargetTransferRateObserver(this);
|
||||
|
||||
module_process_thread_->EnsureStarted();
|
||||
transport_send_->EnsureStarted();
|
||||
}
|
||||
|
||||
|
||||
@ -32,8 +32,7 @@ class RemoteBitrateObserver;
|
||||
// relaying for each received RTP packet back to the sender. While for
|
||||
// receive side bandwidth estimation, we do the estimation locally and
|
||||
// send our results back to the sender.
|
||||
class ReceiveSideCongestionController : public CallStatsObserver,
|
||||
public Module {
|
||||
class ReceiveSideCongestionController : public CallStatsObserver {
|
||||
public:
|
||||
ReceiveSideCongestionController(
|
||||
Clock* clock,
|
||||
@ -65,9 +64,12 @@ class ReceiveSideCongestionController : public CallStatsObserver,
|
||||
|
||||
void SetTransportOverhead(DataSize overhead_per_packet);
|
||||
|
||||
// Implements Module.
|
||||
int64_t TimeUntilNextProcess() override;
|
||||
void Process() override;
|
||||
[[deprecated]] int64_t TimeUntilNextProcess();
|
||||
[[deprecated]] void Process();
|
||||
|
||||
// Runs periodic tasks if it is time to run them, returns time until next
|
||||
// call to `MaybeProcess` should be non idle.
|
||||
TimeDelta MaybeProcess();
|
||||
|
||||
private:
|
||||
class WrappingBitrateEstimator : public RemoteBitrateEstimator {
|
||||
|
||||
@ -184,6 +184,21 @@ void ReceiveSideCongestionController::Process() {
|
||||
remote_bitrate_estimator_.Process();
|
||||
}
|
||||
|
||||
TimeDelta ReceiveSideCongestionController::MaybeProcess() {
|
||||
int64_t time_until_rbe_ms = remote_bitrate_estimator_.TimeUntilNextProcess();
|
||||
if (time_until_rbe_ms <= 0) {
|
||||
remote_bitrate_estimator_.Process();
|
||||
time_until_rbe_ms = remote_bitrate_estimator_.TimeUntilNextProcess();
|
||||
}
|
||||
int64_t time_until_rep_ms = remote_estimator_proxy_.TimeUntilNextProcess();
|
||||
if (time_until_rep_ms <= 0) {
|
||||
remote_estimator_proxy_.Process();
|
||||
time_until_rep_ms = remote_estimator_proxy_.TimeUntilNextProcess();
|
||||
}
|
||||
int64_t time_until_next_ms = std::min(time_until_rbe_ms, time_until_rep_ms);
|
||||
return TimeDelta::Millis(std::max<int64_t>(time_until_next_ms, 0));
|
||||
}
|
||||
|
||||
void ReceiveSideCongestionController::SetMaxDesiredReceiveBitrate(
|
||||
DataRate bitrate) {
|
||||
remb_throttler_.SetMaxDesiredReceiveBitrate(bitrate);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user