NackModule2: coalesce repeating tasks.

NackModule2 creates repeating tasks, but as there are
many modules (one per receiver) these tasks execute out
of phase with each other, multipliying the amount of wakeups
caused.

Fix this by creating a single wakeup source that serves all
NackModule2 instances in a call.

Bug: webrtc:12989
Change-Id: Ia9c84307eb57349679e42b673474feb2cb43f08e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/226464
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34527}
This commit is contained in:
Markus Handell 2021-07-20 13:32:02 +02:00 committed by WebRTC LUCI CQ
parent 41e98bab1e
commit 0e62f7aa98
11 changed files with 161 additions and 54 deletions

View File

@ -376,6 +376,9 @@ class Call final : public webrtc::Call,
// network thread.
bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
// Schedules nack periodic processing on behalf of all streams.
NackPeriodicProcessor nack_periodic_processor_;
// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
// TODO(bugs.webrtc.org/11993): Move audio_receive_streams_,
@ -1124,7 +1127,8 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
task_queue_factory_, this, num_cpu_cores_,
transport_send_->packet_router(), std::move(configuration),
call_stats_.get(), clock_, new VCMTiming(clock_));
call_stats_.get(), clock_, new VCMTiming(clock_),
&nack_periodic_processor_);
// TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network
// thread.
receive_stream->RegisterWithTransport(&video_receiver_controller_);

View File

@ -82,6 +82,7 @@ rtc_library("nack_module") {
deps = [
"..:module_api",
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../rtc_base:checks",

View File

@ -13,6 +13,8 @@
#include <algorithm>
#include <limits>
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/timestamp.h"
#include "rtc_base/checks.h"
#include "rtc_base/experiments/field_trial_parser.h"
@ -43,7 +45,52 @@ int64_t GetSendNackDelay() {
}
} // namespace
constexpr TimeDelta NackModule2::kUpdateInterval;
constexpr TimeDelta NackPeriodicProcessor::kUpdateInterval;
NackPeriodicProcessor::NackPeriodicProcessor(TimeDelta update_interval)
: update_interval_(update_interval) {}
NackPeriodicProcessor::~NackPeriodicProcessor() {}
void NackPeriodicProcessor::RegisterNackModule(NackModuleBase* module) {
RTC_DCHECK_RUN_ON(&sequence_);
modules_.push_back(module);
if (modules_.size() != 1)
return;
repeating_task_ = RepeatingTaskHandle::DelayedStart(
TaskQueueBase::Current(), update_interval_, [this] {
RTC_DCHECK_RUN_ON(&sequence_);
ProcessNackModules();
return update_interval_;
});
}
void NackPeriodicProcessor::UnregisterNackModule(NackModuleBase* module) {
RTC_DCHECK_RUN_ON(&sequence_);
auto it = std::find(modules_.begin(), modules_.end(), module);
RTC_DCHECK(it != modules_.end());
modules_.erase(it);
if (modules_.empty())
repeating_task_.Stop();
}
// RTC_RUN_ON(sequence_)
void NackPeriodicProcessor::ProcessNackModules() {
for (NackModuleBase* module : modules_)
module->ProcessNacks();
}
ScopedNackPeriodicProcessorRegistration::
ScopedNackPeriodicProcessorRegistration(NackModuleBase* module,
NackPeriodicProcessor* processor)
: module_(module), processor_(processor) {
processor_->RegisterNackModule(module_);
}
ScopedNackPeriodicProcessorRegistration::
~ScopedNackPeriodicProcessorRegistration() {
processor_->UnregisterNackModule(module_);
}
NackModule2::NackInfo::NackInfo()
: seq_num(0), send_at_seq_num(0), sent_at_time(-1), retries(0) {}
@ -89,12 +136,11 @@ NackModule2::BackoffSettings::ParseFromFieldTrials() {
}
NackModule2::NackModule2(TaskQueueBase* current_queue,
NackPeriodicProcessor* periodic_processor,
Clock* clock,
NackSender* nack_sender,
KeyFrameRequestSender* keyframe_request_sender,
TimeDelta update_interval /*= kUpdateInterval*/)
KeyFrameRequestSender* keyframe_request_sender)
: worker_thread_(current_queue),
update_interval_(update_interval),
clock_(clock),
nack_sender_(nack_sender),
keyframe_request_sender_(keyframe_request_sender),
@ -103,17 +149,20 @@ NackModule2::NackModule2(TaskQueueBase* current_queue,
rtt_ms_(kDefaultRttMs),
newest_seq_num_(0),
send_nack_delay_ms_(GetSendNackDelay()),
backoff_settings_(BackoffSettings::ParseFromFieldTrials()) {
backoff_settings_(BackoffSettings::ParseFromFieldTrials()),
processor_registration_(this, periodic_processor) {
RTC_DCHECK(clock_);
RTC_DCHECK(nack_sender_);
RTC_DCHECK(keyframe_request_sender_);
RTC_DCHECK_GT(update_interval.ms(), 0);
RTC_DCHECK(worker_thread_);
RTC_DCHECK(worker_thread_->IsCurrent());
}
repeating_task_ = RepeatingTaskHandle::DelayedStart(
TaskQueueBase::Current(), update_interval_,
[this]() {
NackModule2::~NackModule2() {
RTC_DCHECK_RUN_ON(worker_thread_);
}
void NackModule2::ProcessNacks() {
RTC_DCHECK_RUN_ON(worker_thread_);
std::vector<uint16_t> nack_batch = GetNackBatch(kTimeOnly);
if (!nack_batch.empty()) {
@ -121,14 +170,6 @@ NackModule2::NackModule2(TaskQueueBase* current_queue,
// initiator who can batch them with other feedback messages.
nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false);
}
return update_interval_;
},
clock_);
}
NackModule2::~NackModule2() {
RTC_DCHECK_RUN_ON(worker_thread_);
repeating_task_.Stop();
}
int NackModule2::OnReceivedPacket(uint16_t seq_num, bool is_keyframe) {

View File

@ -30,20 +30,54 @@
namespace webrtc {
class NackModuleBase {
public:
virtual ~NackModuleBase() = default;
virtual void ProcessNacks() = 0;
};
class NackPeriodicProcessor {
public:
static constexpr TimeDelta kUpdateInterval = TimeDelta::Millis(20);
explicit NackPeriodicProcessor(TimeDelta update_interval = kUpdateInterval);
~NackPeriodicProcessor();
void RegisterNackModule(NackModuleBase* module);
void UnregisterNackModule(NackModuleBase* module);
private:
void ProcessNackModules() RTC_RUN_ON(sequence_);
const TimeDelta update_interval_;
RepeatingTaskHandle repeating_task_ RTC_GUARDED_BY(sequence_);
std::vector<NackModuleBase*> modules_ RTC_GUARDED_BY(sequence_);
RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_;
};
class ScopedNackPeriodicProcessorRegistration {
public:
ScopedNackPeriodicProcessorRegistration(NackModuleBase* module,
NackPeriodicProcessor* processor);
~ScopedNackPeriodicProcessorRegistration();
private:
NackModuleBase* const module_;
NackPeriodicProcessor* const processor_;
};
// TODO(bugs.webrtc.org/11594): This class no longer implements the Module
// interface and therefore "NackModule" may not be a descriptive name anymore.
// Consider renaming to e.g. NackTracker or NackRequester.
class NackModule2 final {
class NackModule2 final : public NackModuleBase {
public:
static constexpr TimeDelta kUpdateInterval = TimeDelta::Millis(20);
NackModule2(TaskQueueBase* current_queue,
NackPeriodicProcessor* periodic_processor,
Clock* clock,
NackSender* nack_sender,
KeyFrameRequestSender* keyframe_request_sender,
TimeDelta update_interval = kUpdateInterval);
KeyFrameRequestSender* keyframe_request_sender);
~NackModule2();
void ProcessNacks() override;
int OnReceivedPacket(uint16_t seq_num, bool is_keyframe);
int OnReceivedPacket(uint16_t seq_num, bool is_keyframe, bool is_recovered);
@ -103,11 +137,6 @@ class NackModule2 final {
RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
TaskQueueBase* const worker_thread_;
// Used to regularly call SendNack if needed.
RepeatingTaskHandle repeating_task_ RTC_GUARDED_BY(worker_thread_);
const TimeDelta update_interval_;
Clock* const clock_;
NackSender* const nack_sender_;
KeyFrameRequestSender* const keyframe_request_sender_;
@ -131,6 +160,8 @@ class NackModule2 final {
const absl::optional<BackoffSettings> backoff_settings_;
ScopedNackPeriodicProcessorRegistration processor_registration_;
// Used to signal destruction to potentially pending tasks.
ScopedTaskSafety task_safety_;
};

View File

@ -80,10 +80,13 @@ class TestNackModule2 : public ::testing::TestWithParam<bool>,
}
NackModule2& CreateNackModule(
TimeDelta interval = NackModule2::kUpdateInterval) {
TimeDelta interval = NackPeriodicProcessor::kUpdateInterval) {
RTC_DCHECK(!nack_module_.get());
nack_module_ = std::make_unique<NackModule2>(
TaskQueueBase::Current(), clock_.get(), this, this, interval);
nack_periodic_processor_ =
std::make_unique<NackPeriodicProcessor>(interval);
nack_module_ = std::make_unique<NackModule2>(TaskQueueBase::Current(),
nack_periodic_processor_.get(),
clock_.get(), this, this);
nack_module_->UpdateRtt(kDefaultRttMs);
return *nack_module_.get();
}
@ -92,6 +95,7 @@ class TestNackModule2 : public ::testing::TestWithParam<bool>,
test::RunLoop loop_;
std::unique_ptr<SimulatedClock> clock_;
test::ScopedFieldTrials field_trial_;
std::unique_ptr<NackPeriodicProcessor> nack_periodic_processor_;
std::unique_ptr<NackModule2> nack_module_;
std::vector<uint16_t> sent_nacks_;
int keyframes_requested_;
@ -379,7 +383,11 @@ class TestNackModule2WithFieldTrial : public ::testing::Test,
TestNackModule2WithFieldTrial()
: nack_delay_field_trial_("WebRTC-SendNackDelayMs/10/"),
clock_(new SimulatedClock(0)),
nack_module_(TaskQueueBase::Current(), clock_.get(), this, this),
nack_module_(TaskQueueBase::Current(),
&nack_periodic_processor_,
clock_.get(),
this,
this),
keyframes_requested_(0) {}
void SendNack(const std::vector<uint16_t>& sequence_numbers,
@ -392,6 +400,7 @@ class TestNackModule2WithFieldTrial : public ::testing::Test,
test::ScopedFieldTrials nack_delay_field_trial_;
std::unique_ptr<SimulatedClock> clock_;
NackPeriodicProcessor nack_periodic_processor_;
NackModule2 nack_module_;
std::vector<uint16_t> sent_nacks_;
int keyframes_requested_;

View File

@ -105,6 +105,7 @@ std::unique_ptr<ModuleRtpRtcpImpl2> CreateRtpRtcpModule(
std::unique_ptr<NackModule2> MaybeConstructNackModule(
TaskQueueBase* current_queue,
NackPeriodicProcessor* nack_periodic_processor,
const VideoReceiveStream::Config& config,
Clock* clock,
NackSender* nack_sender,
@ -113,7 +114,8 @@ std::unique_ptr<NackModule2> MaybeConstructNackModule(
return nullptr;
// TODO(bugs.webrtc.org/12420): pass rtp_history_ms to the nack module.
return std::make_unique<NackModule2>(current_queue, clock, nack_sender,
return std::make_unique<NackModule2>(current_queue, nack_periodic_processor,
clock, nack_sender,
keyframe_request_sender);
}
@ -210,6 +212,7 @@ RtpVideoStreamReceiver2::RtpVideoStreamReceiver2(
ReceiveStatistics* rtp_receive_statistics,
RtcpPacketTypeCounterObserver* rtcp_packet_type_counter_observer,
RtcpCnameCallback* rtcp_cname_callback,
NackPeriodicProcessor* nack_periodic_processor,
NackSender* nack_sender,
KeyFrameRequestSender* keyframe_request_sender,
OnCompleteFrameCallback* complete_frame_callback,
@ -243,6 +246,7 @@ RtpVideoStreamReceiver2::RtpVideoStreamReceiver2(
// directly with |rtp_rtcp_|.
rtcp_feedback_buffer_(this, nack_sender, this),
nack_module_(MaybeConstructNackModule(current_queue,
nack_periodic_processor,
config_,
clock_,
&rtcp_feedback_buffer_,

View File

@ -39,6 +39,7 @@
#include "modules/rtp_rtcp/source/video_rtp_depacketizer.h"
#include "modules/video_coding/h264_sps_pps_tracker.h"
#include "modules/video_coding/loss_notification_controller.h"
#include "modules/video_coding/nack_module2.h"
#include "modules/video_coding/packet_buffer.h"
#include "modules/video_coding/rtp_frame_reference_finder.h"
#include "modules/video_coding/unique_timestamp_counter.h"
@ -89,6 +90,7 @@ class RtpVideoStreamReceiver2 : public LossNotificationSender,
ReceiveStatistics* rtp_receive_statistics,
RtcpPacketTypeCounterObserver* rtcp_packet_type_counter_observer,
RtcpCnameCallback* rtcp_cname_callback,
NackPeriodicProcessor* nack_periodic_processor,
NackSender* nack_sender,
// The KeyFrameRequestSender is optional; if not provided, key frame
// requests are sent via the internal RtpRtcp module.

View File

@ -172,8 +172,9 @@ class RtpVideoStreamReceiver2Test : public ::testing::Test,
rtp_video_stream_receiver_ = std::make_unique<RtpVideoStreamReceiver2>(
TaskQueueBase::Current(), Clock::GetRealTimeClock(), &mock_transport_,
nullptr, nullptr, &config_, rtp_receive_statistics_.get(), nullptr,
nullptr, &mock_nack_sender_, &mock_key_frame_request_sender_,
&mock_on_complete_frame_callback_, nullptr, nullptr);
nullptr, &nack_periodic_processor_, &mock_nack_sender_,
&mock_key_frame_request_sender_, &mock_on_complete_frame_callback_,
nullptr, nullptr);
VideoCodec codec;
codec.codecType = kVideoCodecGeneric;
rtp_video_stream_receiver_->AddReceiveCodec(kPayloadType, codec, {},
@ -244,6 +245,7 @@ class RtpVideoStreamReceiver2Test : public ::testing::Test,
const webrtc::test::ScopedFieldTrials override_field_trials_;
VideoReceiveStream::Config config_;
NackPeriodicProcessor nack_periodic_processor_;
MockNackSender mock_nack_sender_;
MockKeyFrameRequestSender mock_key_frame_request_sender_;
MockTransport mock_transport_;
@ -1132,8 +1134,8 @@ TEST_F(RtpVideoStreamReceiver2Test, TransformFrame) {
auto receiver = std::make_unique<RtpVideoStreamReceiver2>(
TaskQueueBase::Current(), Clock::GetRealTimeClock(), &mock_transport_,
nullptr, nullptr, &config_, rtp_receive_statistics_.get(), nullptr,
nullptr, &mock_nack_sender_, nullptr, &mock_on_complete_frame_callback_,
nullptr, mock_frame_transformer);
nullptr, &nack_periodic_processor_, &mock_nack_sender_, nullptr,
&mock_on_complete_frame_callback_, nullptr, mock_frame_transformer);
VideoCodec video_codec;
video_codec.codecType = kVideoCodecGeneric;
receiver->AddReceiveCodec(kPayloadType, video_codec, {},

View File

@ -211,14 +211,16 @@ int DetermineMaxWaitForFrame(const VideoReceiveStream::Config& config,
: kMaxWaitForFrameMs;
}
VideoReceiveStream2::VideoReceiveStream2(TaskQueueFactory* task_queue_factory,
VideoReceiveStream2::VideoReceiveStream2(
TaskQueueFactory* task_queue_factory,
Call* call,
int num_cpu_cores,
PacketRouter* packet_router,
VideoReceiveStream::Config config,
CallStats* call_stats,
Clock* clock,
VCMTiming* timing)
VCMTiming* timing,
NackPeriodicProcessor* nack_periodic_processor)
: task_queue_factory_(task_queue_factory),
transport_adapter_(config.rtcp_send_transport),
config_(std::move(config)),
@ -240,6 +242,7 @@ VideoReceiveStream2::VideoReceiveStream2(TaskQueueFactory* task_queue_factory,
rtp_receive_statistics_.get(),
&stats_proxy_,
&stats_proxy_,
nack_periodic_processor,
this, // NackSender
nullptr, // Use default KeyFrameRequestSender
this, // OnCompleteFrameCallback

View File

@ -25,6 +25,7 @@
#include "modules/rtp_rtcp/include/flexfec_receiver.h"
#include "modules/rtp_rtcp/source/source_tracker.h"
#include "modules/video_coding/frame_buffer2.h"
#include "modules/video_coding/nack_module2.h"
#include "modules/video_coding/video_receiver2.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/task_queue.h"
@ -97,7 +98,8 @@ class VideoReceiveStream2
VideoReceiveStream::Config config,
CallStats* call_stats,
Clock* clock,
VCMTiming* timing);
VCMTiming* timing,
NackPeriodicProcessor* nack_periodic_processor);
// Destruction happens on the worker thread. Prior to destruction the caller
// must ensure that a registration with the transport has been cleared. See
// `RegisterWithTransport` for details.

View File

@ -137,7 +137,8 @@ class VideoReceiveStream2Test : public ::testing::Test {
video_receive_stream_ =
std::make_unique<webrtc::internal::VideoReceiveStream2>(
task_queue_factory_.get(), &fake_call_, kDefaultNumCpuCores,
&packet_router_, config_.Copy(), &call_stats_, clock_, timing_);
&packet_router_, config_.Copy(), &call_stats_, clock_, timing_,
&nack_periodic_processor_);
video_receive_stream_->RegisterWithTransport(
&rtp_stream_receiver_controller_);
}
@ -146,6 +147,7 @@ class VideoReceiveStream2Test : public ::testing::Test {
test::RunLoop loop_;
const std::unique_ptr<TaskQueueFactory> task_queue_factory_;
test::VideoDecoderProxyFactory h264_decoder_factory_;
NackPeriodicProcessor nack_periodic_processor_;
VideoReceiveStream::Config config_;
internal::CallStats call_stats_;
MockVideoDecoder mock_h264_video_decoder_;
@ -316,7 +318,8 @@ class VideoReceiveStream2TestWithFakeDecoder : public ::testing::Test {
timing_ = new VCMTiming(clock_);
video_receive_stream_.reset(new webrtc::internal::VideoReceiveStream2(
task_queue_factory_.get(), &fake_call_, kDefaultNumCpuCores,
&packet_router_, config_.Copy(), &call_stats_, clock_, timing_));
&packet_router_, config_.Copy(), &call_stats_, clock_, timing_,
&nack_periodic_processor_));
video_receive_stream_->RegisterWithTransport(
&rtp_stream_receiver_controller_);
video_receive_stream_->SetAndGetRecordingState(std::move(state), false);
@ -326,6 +329,7 @@ class VideoReceiveStream2TestWithFakeDecoder : public ::testing::Test {
test::RunLoop loop_;
test::FunctionVideoDecoderFactory fake_decoder_factory_;
const std::unique_ptr<TaskQueueFactory> task_queue_factory_;
NackPeriodicProcessor nack_periodic_processor_;
VideoReceiveStream::Config config_;
internal::CallStats call_stats_;
cricket::FakeVideoRenderer fake_renderer_;
@ -581,7 +585,8 @@ class VideoReceiveStream2TestWithSimulatedClock
config_.Copy(),
&call_stats_,
time_controller_.GetClock(),
new VCMTiming(time_controller_.GetClock())) {
new VCMTiming(time_controller_.GetClock()),
&nack_periodic_processor_) {
video_receive_stream_.RegisterWithTransport(
&rtp_stream_receiver_controller_);
video_receive_stream_.Start();
@ -608,6 +613,7 @@ class VideoReceiveStream2TestWithSimulatedClock
MockTransport mock_transport_;
FakeRenderer fake_renderer_;
cricket::FakeCall fake_call_;
NackPeriodicProcessor nack_periodic_processor_;
VideoReceiveStream::Config config_;
internal::CallStats call_stats_;
PacketRouter packet_router_;
@ -749,7 +755,8 @@ class VideoReceiveStream2TestWithLazyDecoderCreation : public ::testing::Test {
video_receive_stream_ =
std::make_unique<webrtc::internal::VideoReceiveStream2>(
task_queue_factory_.get(), &fake_call_, kDefaultNumCpuCores,
&packet_router_, config_.Copy(), &call_stats_, clock_, timing_);
&packet_router_, config_.Copy(), &call_stats_, clock_, timing_,
&nack_periodic_processor_);
video_receive_stream_->RegisterWithTransport(
&rtp_stream_receiver_controller_);
}
@ -758,6 +765,7 @@ class VideoReceiveStream2TestWithLazyDecoderCreation : public ::testing::Test {
test::RunLoop loop_;
const std::unique_ptr<TaskQueueFactory> task_queue_factory_;
MockVideoDecoderFactory mock_h264_decoder_factory_;
NackPeriodicProcessor nack_periodic_processor_;
VideoReceiveStream::Config config_;
internal::CallStats call_stats_;
MockVideoDecoder mock_h264_video_decoder_;