ModuleRtpRtcpImpl2: remove RTCP send polling.

This change migrates RTCP send polling happening in
ModuleRtpRtcpImpl2::Process to task queues.

ModuleRtpRtcpImpl2 would previously only cause RTCP sends while being
registered with a ProcessThread. This is now relaxed so that RTCP will
be sent regardless of ProcessThread registration status, and it seems
no tests cared.

Now there's only one piece of polling left in Process.

Bug: webrtc:11581
Change-Id: Ibdcffefccef7363f2089c34a9c7d694d222445c0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/222603
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34350}
This commit is contained in:
Markus Handell 2021-06-21 18:57:36 +02:00 committed by WebRTC LUCI CQ
parent 20862093e5
commit 885d538cdd
6 changed files with 180 additions and 33 deletions

View File

@ -138,7 +138,7 @@ RTCPSender::Configuration RTCPSender::Configuration::FromRtpRtcpConfiguration(
return result;
}
RTCPSender::RTCPSender(const Configuration& config)
RTCPSender::RTCPSender(Configuration config)
: audio_(config.audio),
ssrc_(config.local_media_ssrc),
clock_(config.clock),
@ -149,6 +149,8 @@ RTCPSender::RTCPSender(const Configuration& config)
report_interval_(config.rtcp_report_interval.value_or(
TimeDelta::Millis(config.audio ? kDefaultAudioReportInterval
: kDefaultVideoReportInterval))),
schedule_next_rtcp_send_evaluation_function_(
std::move(config.schedule_next_rtcp_send_evaluation_function)),
sending_(false),
timestamp_offset_(0),
last_rtp_timestamp_(0),
@ -201,7 +203,7 @@ void RTCPSender::SetRTCPStatus(RtcpMode new_method) {
next_time_to_send_rtcp_ = absl::nullopt;
} else if (method_ == RtcpMode::kOff) {
// When switching on, reschedule the next packet
next_time_to_send_rtcp_ = clock_->CurrentTime() + report_interval_ / 2;
SetNextRtcpSendEvaluationDuration(report_interval_ / 2);
}
method_ = new_method;
}
@ -284,7 +286,7 @@ void RTCPSender::SetRemb(int64_t bitrate_bps, std::vector<uint32_t> ssrcs) {
SetFlag(kRtcpRemb, /*is_volatile=*/false);
// Send a REMB immediately if we have a new REMB. The frequency of REMBs is
// throttled by the caller.
next_time_to_send_rtcp_ = clock_->CurrentTime();
SetNextRtcpSendEvaluationDuration(TimeDelta::Zero());
}
void RTCPSender::UnsetRemb() {
@ -428,7 +430,9 @@ bool RTCPSender::TimeToSendRTCPReport(bool sendKeyframeBeforeRTP) const {
Timestamp now = clock_->CurrentTime();
MutexLock lock(&mutex_rtcp_sender_);
RTC_DCHECK(
(method_ == RtcpMode::kOff && !next_time_to_send_rtcp_.has_value()) ||
(method_ != RtcpMode::kOff && next_time_to_send_rtcp_.has_value()));
if (method_ == RtcpMode::kOff)
return false;
@ -807,7 +811,7 @@ void RTCPSender::PrepareReport(const FeedbackState& feedback_state) {
random_.Rand(min_interval_int * 1 / 2, min_interval_int * 3 / 2));
RTC_DCHECK(!time_to_next.IsZero());
next_time_to_send_rtcp_ = clock_->CurrentTime() + time_to_next;
SetNextRtcpSendEvaluationDuration(time_to_next);
// RtcpSender expected to be used for sending either just sender reports
// or just receiver reports.
@ -901,7 +905,7 @@ void RTCPSender::SetVideoBitrateAllocation(
RTC_LOG(LS_INFO) << "Emitting TargetBitrate XR for SSRC " << ssrc_
<< " with new layers enabled/disabled: "
<< video_bitrate_allocation_.ToString();
next_time_to_send_rtcp_ = clock_->CurrentTime();
SetNextRtcpSendEvaluationDuration(TimeDelta::Zero());
} else {
video_bitrate_allocation_ = bitrate;
}
@ -962,4 +966,10 @@ void RTCPSender::SendCombinedRtcpPacket(
sender.Send();
}
void RTCPSender::SetNextRtcpSendEvaluationDuration(TimeDelta duration) {
next_time_to_send_rtcp_ = clock_->CurrentTime() + duration;
if (schedule_next_rtcp_send_evaluation_function_)
schedule_next_rtcp_send_evaluation_function_(duration);
}
} // namespace webrtc

View File

@ -64,6 +64,12 @@ class RTCPSender final {
// Estimate RTT as non-sender as described in
// https://tools.ietf.org/html/rfc3611#section-4.4 and #section-4.5
bool non_sender_rtt_measurement = false;
// Optional callback which, if specified, is used by RTCPSender to schedule
// the next time to evaluate if RTCP should be sent by means of
// TimeToSendRTCPReport/SendRTCP.
// The RTCPSender client still needs to call TimeToSendRTCPReport/SendRTCP
// to actually get RTCP sent.
std::function<void(TimeDelta)> schedule_next_rtcp_send_evaluation_function;
RtcEventLog* event_log = nullptr;
absl::optional<TimeDelta> rtcp_report_interval;
@ -91,7 +97,7 @@ class RTCPSender final {
RTCPReceiver* receiver;
};
explicit RTCPSender(const Configuration& config);
explicit RTCPSender(Configuration config);
// TODO(bugs.webrtc.org/11581): delete this temporary compatibility helper
// once downstream dependencies migrates.
explicit RTCPSender(const RtpRtcpInterface::Configuration& config);
@ -224,6 +230,10 @@ class RTCPSender final {
void BuildNACK(const RtcpContext& context, PacketSender& sender)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_rtcp_sender_);
// |duration| being TimeDelta::Zero() means schedule immediately.
void SetNextRtcpSendEvaluationDuration(TimeDelta duration)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_rtcp_sender_);
const bool audio_;
// TODO(bugs.webrtc.org/11581): `mutex_rtcp_sender_` shouldn't be required if
// we consistently run network related operations on the network thread.
@ -238,6 +248,10 @@ class RTCPSender final {
Transport* const transport_;
const TimeDelta report_interval_;
// Set from
// RTCPSender::Configuration::schedule_next_rtcp_send_evaluation_function.
const std::function<void(TimeDelta)>
schedule_next_rtcp_send_evaluation_function_;
mutable Mutex mutex_rtcp_sender_;
bool sending_ RTC_GUARDED_BY(mutex_rtcp_sender_);

View File

@ -20,12 +20,16 @@
#include <utility>
#include "absl/types/optional.h"
#include "api/sequence_checker.h"
#include "api/transport/field_trial_based_config.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/source/rtcp_packet/dlrr.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/ntp_time.h"
#ifdef _WIN32
@ -39,6 +43,22 @@ const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5;
const int64_t kDefaultExpectedRetransmissionTimeMs = 125;
constexpr TimeDelta kRttUpdateInterval = TimeDelta::Millis(1000);
RTCPSender::Configuration AddRtcpSendEvaluationCallback(
RTCPSender::Configuration config,
std::function<void(TimeDelta)> send_evaluation_callback) {
config.schedule_next_rtcp_send_evaluation_function =
std::move(send_evaluation_callback);
return config;
}
int DelayMillisForDuration(TimeDelta duration) {
// TimeDelta::ms() rounds downwards sometimes which leads to too little time
// slept. Account for this, unless |duration| is exactly representable in
// millisecs.
return (duration.us() + rtc::kNumMillisecsPerSec - 1) /
rtc::kNumMicrosecsPerMillisec;
}
} // namespace
ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
@ -57,8 +77,11 @@ void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber(
ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
: worker_queue_(TaskQueueBase::Current()),
rtcp_sender_(
RTCPSender::Configuration::FromRtpRtcpConfiguration(configuration)),
rtcp_sender_(AddRtcpSendEvaluationCallback(
RTCPSender::Configuration::FromRtpRtcpConfiguration(configuration),
[this](TimeDelta duration) {
ScheduleRtcpSendEvaluation(duration);
})),
rtcp_receiver_(configuration, this),
clock_(configuration.clock),
last_rtt_process_time_(clock_->TimeInMilliseconds()),
@ -139,11 +162,6 @@ void ModuleRtpRtcpImpl2::Process() {
rtcp_sender_.SetTargetBitrate(target_bitrate);
}
}
// TODO(bugs.webrtc.org/11581): Run this on a separate set of delayed tasks
// based off of next_time_to_send_rtcp_ in RTCPSender.
if (rtcp_sender_.TimeToSendRTCPReport())
rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
}
void ModuleRtpRtcpImpl2::SetRtxSendStatus(int mode) {
@ -771,4 +789,60 @@ void ModuleRtpRtcpImpl2::PeriodicUpdate() {
rtcp_receiver_.NotifyTmmbrUpdated();
}
// RTC_RUN_ON(worker_queue_);
void ModuleRtpRtcpImpl2::MaybeSendRtcp() {
if (rtcp_sender_.TimeToSendRTCPReport())
rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
}
// TODO(bugs.webrtc.org/12889): Consider removing this function when the issue
// is resolved.
// RTC_RUN_ON(worker_queue_);
void ModuleRtpRtcpImpl2::MaybeSendRtcpAtOrAfterTimestamp(
Timestamp execution_time) {
Timestamp now = clock_->CurrentTime();
if (now >= execution_time) {
MaybeSendRtcp();
return;
}
RTC_DLOG(LS_WARNING)
<< "BUGBUG: Task queue scheduled delayed call too early.";
ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, execution_time - now);
}
void ModuleRtpRtcpImpl2::ScheduleRtcpSendEvaluation(TimeDelta duration) {
// We end up here under various sequences including the worker queue, and
// the RTCPSender lock is held.
// We're assuming that the fact that RTCPSender executes under other sequences
// than the worker queue on which it's created on implies that external
// synchronization is present and removes this activity before destruction.
if (duration.IsZero()) {
worker_queue_->PostTask(ToQueuedTask(task_safety_, [this] {
RTC_DCHECK_RUN_ON(worker_queue_);
MaybeSendRtcp();
}));
} else {
Timestamp execution_time = clock_->CurrentTime() + duration;
ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, duration);
}
}
void ModuleRtpRtcpImpl2::ScheduleMaybeSendRtcpAtOrAfterTimestamp(
Timestamp execution_time,
TimeDelta duration) {
// We end up here under various sequences including the worker queue, and
// the RTCPSender lock is held.
// See note in ScheduleRtcpSendEvaluation about why |worker_queue_| can be
// accessed.
worker_queue_->PostDelayedTask(
ToQueuedTask(task_safety_,
[this, execution_time] {
RTC_DCHECK_RUN_ON(worker_queue_);
MaybeSendRtcpAtOrAfterTimestamp(execution_time);
}),
DelayMillisForDuration(duration));
}
} // namespace webrtc

View File

@ -23,6 +23,7 @@
#include "api/rtp_headers.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/video/video_bitrate_allocation.h"
#include "modules/include/module_fec_types.h"
#include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
@ -32,7 +33,6 @@
#include "modules/rtp_rtcp/source/rtcp_sender.h"
#include "modules/rtp_rtcp/source/rtp_packet_history.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
#include "modules/rtp_rtcp/source/rtp_sender.h"
#include "modules/rtp_rtcp/source/rtp_sender_egress.h"
#include "rtc_base/gtest_prod_util.h"
@ -40,6 +40,8 @@
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@ -200,7 +202,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
int64_t ExpectedRetransmissionTimeMs() const override;
// Force a send of an RTCP packet.
// Normal SR and RR are triggered via the process function.
// Normal SR and RR are triggered via the task queue that's current when this
// object is created.
int32_t SendRTCP(RTCPPacketType rtcpPacketType) override;
void GetSendStreamDataCounters(
@ -289,12 +292,28 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
// Returns true if the module is configured to store packets.
bool StorePackets() const;
// Used from RtcpSenderMediator to maybe send rtcp.
void MaybeSendRtcp() RTC_RUN_ON(worker_queue_);
// Called when |rtcp_sender_| informs of the next RTCP instant. The method may
// be called on various sequences, and is called under a RTCPSenderLock.
void ScheduleRtcpSendEvaluation(TimeDelta duration);
// Helper method combating too early delayed calls from task queues.
// TODO(bugs.webrtc.org/12889): Consider removing this function when the issue
// is resolved.
void MaybeSendRtcpAtOrAfterTimestamp(Timestamp execution_time)
RTC_RUN_ON(worker_queue_);
// Schedules a call to MaybeSendRtcpAtOrAfterTimestamp delayed by |duration|.
void ScheduleMaybeSendRtcpAtOrAfterTimestamp(Timestamp execution_time,
TimeDelta duration);
TaskQueueBase* const worker_queue_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker process_thread_checker_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker packet_sequence_checker_;
std::unique_ptr<RtpSenderContext> rtp_sender_;
RTCPSender rtcp_sender_;
RTCPReceiver rtcp_receiver_;
@ -316,6 +335,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
// The processed RTT from RtcpRttStats.
mutable Mutex mutex_rtt_;
int64_t rtt_ms_ RTC_GUARDED_BY(mutex_rtt_);
RTC_NO_UNIQUE_ADDRESS ScopedTaskSafety task_safety_;
};
} // namespace webrtc

View File

@ -10,13 +10,19 @@
#include <memory>
#include "api/media_types.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/test/simulated_network.h"
#include "api/video_codecs/video_encoder.h"
#include "call/fake_network_pipe.h"
#include "call/simulated_network.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "rtc_base/location.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "system_wrappers/include/sleep.h"
#include "test/call_test.h"
#include "test/fake_encoder.h"
@ -166,7 +172,10 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) {
explicit NetworkStateTest(TaskQueueBase* task_queue)
: EndToEndTest(kDefaultTimeoutMs),
FakeEncoder(Clock::GetRealTimeClock()),
task_queue_(task_queue),
e2e_test_task_queue_(task_queue),
task_queue_(CreateDefaultTaskQueueFactory()->CreateTaskQueue(
"NetworkStateTest",
TaskQueueFactory::Priority::NORMAL)),
sender_call_(nullptr),
receiver_call_(nullptr),
encoder_factory_(this),
@ -219,26 +228,36 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) {
send_config->encoder_settings.encoder_factory = &encoder_factory_;
}
void SignalChannelNetworkState(Call* call,
MediaType media_type,
NetworkState network_state) {
SendTask(RTC_FROM_HERE, e2e_test_task_queue_,
[call, media_type, network_state] {
call->SignalChannelNetworkState(media_type, network_state);
});
}
void PerformTest() override {
EXPECT_TRUE(encoded_frames_.Wait(kDefaultTimeoutMs))
<< "No frames received by the encoder.";
SendTask(RTC_FROM_HERE, task_queue_, [this]() {
SendTask(RTC_FROM_HERE, task_queue_.get(), [this]() {
// Wait for packets from both sender/receiver.
WaitForPacketsOrSilence(false, false);
// Sender-side network down for audio; there should be no effect on
// video
sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkDown);
SignalChannelNetworkState(sender_call_, MediaType::AUDIO, kNetworkDown);
WaitForPacketsOrSilence(false, false);
// Receiver-side network down for audio; no change expected
receiver_call_->SignalChannelNetworkState(MediaType::AUDIO,
kNetworkDown);
SignalChannelNetworkState(receiver_call_, MediaType::AUDIO,
kNetworkDown);
WaitForPacketsOrSilence(false, false);
// Sender-side network down.
sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkDown);
SignalChannelNetworkState(sender_call_, MediaType::VIDEO, kNetworkDown);
{
MutexLock lock(&test_mutex_);
// After network goes down we shouldn't be encoding more frames.
@ -248,14 +267,14 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) {
WaitForPacketsOrSilence(true, false);
// Receiver-side network down.
receiver_call_->SignalChannelNetworkState(MediaType::VIDEO,
kNetworkDown);
SignalChannelNetworkState(receiver_call_, MediaType::VIDEO,
kNetworkDown);
WaitForPacketsOrSilence(true, true);
// Network up for audio for both sides; video is still not expected to
// start
sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
SignalChannelNetworkState(sender_call_, MediaType::AUDIO, kNetworkUp);
SignalChannelNetworkState(receiver_call_, MediaType::AUDIO, kNetworkUp);
WaitForPacketsOrSilence(true, true);
// Network back up again for both.
@ -265,8 +284,8 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) {
// network.
sender_state_ = kNetworkUp;
}
sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
SignalChannelNetworkState(sender_call_, MediaType::VIDEO, kNetworkUp);
SignalChannelNetworkState(receiver_call_, MediaType::VIDEO, kNetworkUp);
WaitForPacketsOrSilence(false, false);
// TODO(skvlad): add tests to verify that the audio streams are stopped
@ -340,7 +359,8 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) {
}
}
TaskQueueBase* const task_queue_;
TaskQueueBase* const e2e_test_task_queue_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
Mutex test_mutex_;
rtc::Event encoded_frames_;
rtc::Event packet_event_;

View File

@ -13,6 +13,7 @@
#include <memory>
#include <utility>
#include "api/task_queue/task_queue_base.h"
#include "api/video/video_codec_type.h"
#include "api/video/video_frame_type.h"
#include "common_video/h264/h264_common.h"
@ -38,6 +39,7 @@
#include "test/gtest.h"
#include "test/mock_frame_transformer.h"
#include "test/time_controller/simulated_task_queue.h"
#include "test/time_controller/simulated_time_controller.h"
using ::testing::_;
using ::testing::ElementsAre;
@ -158,7 +160,12 @@ class RtpVideoStreamReceiver2Test : public ::testing::Test,
public:
RtpVideoStreamReceiver2Test() : RtpVideoStreamReceiver2Test("") {}
explicit RtpVideoStreamReceiver2Test(std::string field_trials)
: override_field_trials_(field_trials),
: time_controller_(Timestamp::Millis(100)),
task_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"RtpVideoStreamReceiver2Test",
TaskQueueFactory::Priority::NORMAL)),
task_queue_setter_(task_queue_.get()),
override_field_trials_(field_trials),
config_(CreateConfig()),
process_thread_(ProcessThread::Create("TestThread")) {
rtp_receive_statistics_ =
@ -233,8 +240,9 @@ class RtpVideoStreamReceiver2Test : public ::testing::Test,
return config;
}
TokenTaskQueue task_queue_;
TokenTaskQueue::CurrentTaskQueueSetter task_queue_setter_{&task_queue_};
GlobalSimulatedTimeController time_controller_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
TokenTaskQueue::CurrentTaskQueueSetter task_queue_setter_;
const webrtc::test::ScopedFieldTrials override_field_trials_;
VideoReceiveStream::Config config_;