Moving task queue from Call to transport controller.

Moving ownership of worker task queue in Call to
RtpTransportControllerSend. This CL also ensures that the task queue
is not destroyed until the process thread running
SendSideCongestionController is stopped.

The worker queue should be owned by RtpTransportControllerSend since
it is mainly used for rtp and transport related tasks such as bitrate
allocation and signaling network state.

Bug: webrtc:9232
Change-Id: I211edf1a3b9f9b2572875d5584cb788cb2449ef9
Reviewed-on: https://webrtc-review.googlesource.com/63023
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#23119}
This commit is contained in:
Sebastian Jansson 2018-05-04 14:08:15 +02:00 committed by Commit Bot
parent 9d96e92316
commit e6256055e7
6 changed files with 80 additions and 52 deletions

View File

@ -110,6 +110,7 @@ rtc_source_set("rtp_sender") {
"../modules/utility",
"../rtc_base:rtc_base",
"../rtc_base:rtc_base_approved",
"../rtc_base:rtc_task_queue",
"../system_wrappers:field_trial_api",
]
}

View File

@ -166,11 +166,11 @@ std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig(
namespace internal {
class Call : public webrtc::Call,
public PacketReceiver,
public RecoveredPacketReceiver,
public TargetTransferRateObserver,
public BitrateAllocator::LimitObserver {
class Call final : public webrtc::Call,
public PacketReceiver,
public RecoveredPacketReceiver,
public TargetTransferRateObserver,
public BitrateAllocator::LimitObserver {
public:
Call(const Call::Config& config,
std::unique_ptr<RtpTransportControllerSendInterface> transport_send);
@ -362,19 +362,21 @@ class Call : public webrtc::Call,
AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_);
RateLimiter retransmission_rate_limiter_;
std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
ReceiveSideCongestionController receive_side_cc_;
const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_;
const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
const int64_t start_ms_;
// TODO(perkj): |worker_queue_| is supposed to replace
// |module_process_thread_|.
// |worker_queue| is defined last to ensure all pending tasks are cancelled
// and deleted before any other members.
rtc::TaskQueue worker_queue_;
// Caches transport_send_.get(), to avoid racing with destructor.
// Note that this is declared before transport_send_ to ensure that it is not
// invalidated until no more tasks can be running on the transport_send_ task
// queue.
RtpTransportControllerSendInterface* transport_send_ptr_;
// Declared last since it will issue callbacks from a task queue. Declaring it
// last ensures that it is destroyed first and any running tasks are finished.
std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
RTC_DISALLOW_COPY_AND_ASSIGN(Call);
};
} // namespace internal
@ -444,11 +446,11 @@ Call::Call(const Call::Config& config,
receive_side_cc_(clock_, transport_send->packet_router()),
receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()),
video_send_delay_stats_(new SendDelayStats(clock_)),
start_ms_(clock_->TimeInMilliseconds()),
worker_queue_("call_worker_queue") {
start_ms_(clock_->TimeInMilliseconds()) {
RTC_DCHECK(config.event_log != nullptr);
transport_send->RegisterTargetTransferRateObserver(this);
transport_send_ = std::move(transport_send);
transport_send_ptr_ = transport_send_.get();
call_stats_->RegisterStatsObserver(&receive_side_cc_);
call_stats_->RegisterStatsObserver(transport_send_->GetCallStatsObserver());
@ -591,10 +593,14 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
}
}
// TODO(srte): AudioSendStream should call GetWorkerQueue directly rather than
// having it injected.
AudioSendStream* send_stream = new AudioSendStream(
config, config_.audio_state, &worker_queue_, module_process_thread_.get(),
transport_send_.get(), bitrate_allocator_.get(), event_log_,
call_stats_.get(), suspended_rtp_state, &sent_rtp_audio_timer_ms_);
config, config_.audio_state, transport_send_ptr_->GetWorkerQueue(),
module_process_thread_.get(), transport_send_ptr_,
bitrate_allocator_.get(), event_log_, call_stats_.get(),
suspended_rtp_state, &sent_rtp_audio_timer_ms_);
{
WriteLockScoped write_lock(*send_crit_);
RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
@ -649,7 +655,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
event_log_->Log(rtc::MakeUnique<RtcEventAudioReceiveStreamConfig>(
CreateRtcLogStreamConfig(config)));
AudioReceiveStream* receive_stream = new AudioReceiveStream(
&audio_receiver_controller_, transport_send_->packet_router(),
&audio_receiver_controller_, transport_send_ptr_->packet_router(),
module_process_thread_.get(), config, config_.audio_state, event_log_);
{
WriteLockScoped write_lock(*receive_crit_);
@ -718,9 +724,12 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
// Copy ssrcs from |config| since |config| is moved.
std::vector<uint32_t> ssrcs = config.rtp.ssrcs;
// TODO(srte): VideoSendStream should call GetWorkerQueue directly rather than
// having it injected.
VideoSendStream* send_stream = new VideoSendStream(
num_cpu_cores_, module_process_thread_.get(), &worker_queue_,
call_stats_.get(), transport_send_.get(), bitrate_allocator_.get(),
num_cpu_cores_, module_process_thread_.get(),
transport_send_ptr_->GetWorkerQueue(), call_stats_.get(),
transport_send_ptr_, bitrate_allocator_.get(),
video_send_delay_stats_.get(), event_log_, std::move(config),
std::move(encoder_config), suspended_video_send_ssrcs_,
suspended_video_payload_states_, std::move(fec_controller),
@ -799,7 +808,7 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
VideoReceiveStream* receive_stream = new VideoReceiveStream(
&video_receiver_controller_, num_cpu_cores_,
transport_send_->packet_router(), std::move(configuration),
transport_send_ptr_->packet_router(), std::move(configuration),
module_process_thread_.get(), call_stats_.get());
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
@ -907,7 +916,7 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
}
RtpTransportControllerSendInterface* Call::GetTransportControllerSend() {
return transport_send_.get();
return transport_send_ptr_;
}
Call::Stats Call::GetStats() const {
@ -930,8 +939,9 @@ Call::Stats Call::GetStats() const {
// available.
{
rtc::CritScope cs(&aggregate_network_up_crit_);
stats.pacer_delay_ms =
aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0;
stats.pacer_delay_ms = aggregate_network_up_
? transport_send_ptr_->GetPacerQueuingDelayMs()
: 0;
}
stats.rtt_ms = call_stats_->LastProcessedRtt();
@ -945,19 +955,19 @@ Call::Stats Call::GetStats() const {
void Call::SetBitrateAllocationStrategy(
std::unique_ptr<rtc::BitrateAllocationStrategy>
bitrate_allocation_strategy) {
if (!worker_queue_.IsCurrent()) {
rtc::BitrateAllocationStrategy* strategy_raw =
bitrate_allocation_strategy.release();
auto functor = [this, strategy_raw]() {
SetBitrateAllocationStrategy(
rtc::WrapUnique<rtc::BitrateAllocationStrategy>(strategy_raw));
};
worker_queue_.PostTask([functor] { functor(); });
return;
}
RTC_DCHECK_RUN_ON(&worker_queue_);
bitrate_allocator_->SetBitrateAllocationStrategy(
std::move(bitrate_allocation_strategy));
// TODO(srte): This function should be moved to RtpTransportControllerSend
// when BitrateAllocator is moved there.
struct Functor {
void operator()() {
bitrate_allocator_->SetBitrateAllocationStrategy(
std::move(bitrate_allocation_strategy_));
}
BitrateAllocator* bitrate_allocator_;
std::unique_ptr<rtc::BitrateAllocationStrategy>
bitrate_allocation_strategy_;
};
transport_send_ptr_->GetWorkerQueue()->PostTask(Functor{
bitrate_allocator_.get(), std::move(bitrate_allocation_strategy)});
}
void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
@ -1050,23 +1060,16 @@ void Call::UpdateAggregateNetworkState() {
rtc::CritScope cs(&aggregate_network_up_crit_);
aggregate_network_up_ = aggregate_network_up;
}
transport_send_->OnNetworkAvailability(aggregate_network_up);
transport_send_ptr_->OnNetworkAvailability(aggregate_network_up);
}
void Call::OnSentPacket(const rtc::SentPacket& sent_packet) {
video_send_delay_stats_->OnSentPacket(sent_packet.packet_id,
clock_->TimeInMilliseconds());
transport_send_->OnSentPacket(sent_packet);
transport_send_ptr_->OnSentPacket(sent_packet);
}
void Call::OnTargetTransferRate(TargetTransferRate msg) {
// TODO(perkj): Consider making sure CongestionController operates on
// |worker_queue_|.
if (!worker_queue_.IsCurrent()) {
worker_queue_.PostTask([this, msg] { OnTargetTransferRate(msg); });
return;
}
RTC_DCHECK_RUN_ON(&worker_queue_);
uint32_t target_bitrate_bps = msg.target_rate.bps();
int loss_ratio_255 = msg.network_estimate.loss_rate_ratio * 255;
uint8_t fraction_loss =
@ -1116,9 +1119,9 @@ void Call::OnAllocationLimitsChanged(uint32_t min_send_bitrate_bps,
uint32_t max_padding_bitrate_bps,
uint32_t total_bitrate_bps,
bool has_packet_feedback) {
transport_send_->SetAllocatedSendBitrateLimits(
transport_send_ptr_->SetAllocatedSendBitrateLimits(
min_send_bitrate_bps, max_padding_bitrate_bps, total_bitrate_bps);
transport_send_->SetPerPacketFeedbackAvailable(has_packet_feedback);
transport_send_ptr_->SetPerPacketFeedbackAvailable(has_packet_feedback);
rtc::CritScope lock(&bitrate_crit_);
min_allocated_send_bitrate_bps_ = min_send_bitrate_bps;
configured_max_padding_bitrate_bps_ = max_padding_bitrate_bps;

View File

@ -63,7 +63,8 @@ RtpTransportControllerSend::RtpTransportControllerSend(
event_log,
&pacer_,
bitrate_config,
TaskQueueExperimentEnabled())) {
TaskQueueExperimentEnabled())),
task_queue_("rtp_send_controller") {
send_side_cc_ptr_ = send_side_cc_.get();
process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE);
@ -92,10 +93,24 @@ void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps,
msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps);
msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0;
msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms);
rtc::CritScope cs(&observer_crit_);
// We wont register as observer until we have an observer.
RTC_DCHECK(observer_ != nullptr);
observer_->OnTargetTransferRate(msg);
if (!task_queue_.IsCurrent()) {
task_queue_.PostTask([this, msg] {
rtc::CritScope cs(&observer_crit_);
// We won't register as observer until we have an observer.
RTC_DCHECK(observer_ != nullptr);
observer_->OnTargetTransferRate(msg);
});
} else {
rtc::CritScope cs(&observer_crit_);
// We won't register as observer until we have an observer.
RTC_DCHECK(observer_ != nullptr);
observer_->OnTargetTransferRate(msg);
}
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
return &task_queue_;
}
PacketRouter* RtpTransportControllerSend::packet_router() {

View File

@ -23,6 +23,7 @@
#include "modules/utility/include/process_thread.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/networkroute.h"
#include "rtc_base/task_queue.h"
namespace webrtc {
class Clock;
@ -47,6 +48,7 @@ class RtpTransportControllerSend final
int64_t probing_interval_ms) override;
// Implements RtpTransportControllerSendInterface
rtc::TaskQueue* GetWorkerQueue() override;
PacketRouter* packet_router() override;
TransportFeedbackObserver* transport_feedback_observer() override;
@ -102,6 +104,10 @@ class RtpTransportControllerSend final
// Declared last since it will issue callbacks from a task queue. Declaring it
// last ensures that it is destroyed first.
const std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
// TODO(perkj): |task_queue_| is supposed to replace |process_thread_|.
// |task_queue_| is defined last to ensure all pending tasks are cancelled
// and deleted before any other members.
rtc::TaskQueue task_queue_;
RTC_DISALLOW_COPY_AND_ASSIGN(RtpTransportControllerSend);
};

View File

@ -21,6 +21,7 @@
namespace rtc {
struct SentPacket;
struct NetworkRoute;
class TaskQueue;
} // namespace rtc
namespace webrtc {
@ -62,6 +63,7 @@ class TransportFeedbackObserver;
class RtpTransportControllerSendInterface {
public:
virtual ~RtpTransportControllerSendInterface() {}
virtual rtc::TaskQueue* GetWorkerQueue() = 0;
virtual PacketRouter* packet_router() = 0;
virtual TransportFeedbackObserver* transport_feedback_observer() = 0;

View File

@ -27,6 +27,7 @@ namespace webrtc {
class MockRtpTransportControllerSend
: public RtpTransportControllerSendInterface {
public:
MOCK_METHOD0(GetWorkerQueue, rtc::TaskQueue*());
MOCK_METHOD0(packet_router, PacketRouter*());
MOCK_METHOD0(transport_feedback_observer, TransportFeedbackObserver*());
MOCK_METHOD0(packet_sender, RtpPacketSender*());