diff --git a/modules/rtp_rtcp/source/rtcp_sender.cc b/modules/rtp_rtcp/source/rtcp_sender.cc index f531a30cd5..c4c30a9467 100644 --- a/modules/rtp_rtcp/source/rtcp_sender.cc +++ b/modules/rtp_rtcp/source/rtcp_sender.cc @@ -138,7 +138,7 @@ RTCPSender::Configuration RTCPSender::Configuration::FromRtpRtcpConfiguration( return result; } -RTCPSender::RTCPSender(const Configuration& config) +RTCPSender::RTCPSender(Configuration config) : audio_(config.audio), ssrc_(config.local_media_ssrc), clock_(config.clock), @@ -149,6 +149,8 @@ RTCPSender::RTCPSender(const Configuration& config) report_interval_(config.rtcp_report_interval.value_or( TimeDelta::Millis(config.audio ? kDefaultAudioReportInterval : kDefaultVideoReportInterval))), + schedule_next_rtcp_send_evaluation_function_( + std::move(config.schedule_next_rtcp_send_evaluation_function)), sending_(false), timestamp_offset_(0), last_rtp_timestamp_(0), @@ -201,7 +203,7 @@ void RTCPSender::SetRTCPStatus(RtcpMode new_method) { next_time_to_send_rtcp_ = absl::nullopt; } else if (method_ == RtcpMode::kOff) { // When switching on, reschedule the next packet - next_time_to_send_rtcp_ = clock_->CurrentTime() + report_interval_ / 2; + SetNextRtcpSendEvaluationDuration(report_interval_ / 2); } method_ = new_method; } @@ -284,7 +286,7 @@ void RTCPSender::SetRemb(int64_t bitrate_bps, std::vector ssrcs) { SetFlag(kRtcpRemb, /*is_volatile=*/false); // Send a REMB immediately if we have a new REMB. The frequency of REMBs is // throttled by the caller. - next_time_to_send_rtcp_ = clock_->CurrentTime(); + SetNextRtcpSendEvaluationDuration(TimeDelta::Zero()); } void RTCPSender::UnsetRemb() { @@ -428,7 +430,9 @@ bool RTCPSender::TimeToSendRTCPReport(bool sendKeyframeBeforeRTP) const { Timestamp now = clock_->CurrentTime(); MutexLock lock(&mutex_rtcp_sender_); - + RTC_DCHECK( + (method_ == RtcpMode::kOff && !next_time_to_send_rtcp_.has_value()) || + (method_ != RtcpMode::kOff && next_time_to_send_rtcp_.has_value())); if (method_ == RtcpMode::kOff) return false; @@ -807,7 +811,7 @@ void RTCPSender::PrepareReport(const FeedbackState& feedback_state) { random_.Rand(min_interval_int * 1 / 2, min_interval_int * 3 / 2)); RTC_DCHECK(!time_to_next.IsZero()); - next_time_to_send_rtcp_ = clock_->CurrentTime() + time_to_next; + SetNextRtcpSendEvaluationDuration(time_to_next); // RtcpSender expected to be used for sending either just sender reports // or just receiver reports. @@ -901,7 +905,7 @@ void RTCPSender::SetVideoBitrateAllocation( RTC_LOG(LS_INFO) << "Emitting TargetBitrate XR for SSRC " << ssrc_ << " with new layers enabled/disabled: " << video_bitrate_allocation_.ToString(); - next_time_to_send_rtcp_ = clock_->CurrentTime(); + SetNextRtcpSendEvaluationDuration(TimeDelta::Zero()); } else { video_bitrate_allocation_ = bitrate; } @@ -962,4 +966,10 @@ void RTCPSender::SendCombinedRtcpPacket( sender.Send(); } +void RTCPSender::SetNextRtcpSendEvaluationDuration(TimeDelta duration) { + next_time_to_send_rtcp_ = clock_->CurrentTime() + duration; + if (schedule_next_rtcp_send_evaluation_function_) + schedule_next_rtcp_send_evaluation_function_(duration); +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtcp_sender.h b/modules/rtp_rtcp/source/rtcp_sender.h index e3fda383ba..e50ce44e13 100644 --- a/modules/rtp_rtcp/source/rtcp_sender.h +++ b/modules/rtp_rtcp/source/rtcp_sender.h @@ -64,6 +64,12 @@ class RTCPSender final { // Estimate RTT as non-sender as described in // https://tools.ietf.org/html/rfc3611#section-4.4 and #section-4.5 bool non_sender_rtt_measurement = false; + // Optional callback which, if specified, is used by RTCPSender to schedule + // the next time to evaluate if RTCP should be sent by means of + // TimeToSendRTCPReport/SendRTCP. + // The RTCPSender client still needs to call TimeToSendRTCPReport/SendRTCP + // to actually get RTCP sent. + std::function schedule_next_rtcp_send_evaluation_function; RtcEventLog* event_log = nullptr; absl::optional rtcp_report_interval; @@ -91,7 +97,7 @@ class RTCPSender final { RTCPReceiver* receiver; }; - explicit RTCPSender(const Configuration& config); + explicit RTCPSender(Configuration config); // TODO(bugs.webrtc.org/11581): delete this temporary compatibility helper // once downstream dependencies migrates. explicit RTCPSender(const RtpRtcpInterface::Configuration& config); @@ -224,6 +230,10 @@ class RTCPSender final { void BuildNACK(const RtcpContext& context, PacketSender& sender) RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_rtcp_sender_); + // |duration| being TimeDelta::Zero() means schedule immediately. + void SetNextRtcpSendEvaluationDuration(TimeDelta duration) + RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_rtcp_sender_); + const bool audio_; // TODO(bugs.webrtc.org/11581): `mutex_rtcp_sender_` shouldn't be required if // we consistently run network related operations on the network thread. @@ -238,6 +248,10 @@ class RTCPSender final { Transport* const transport_; const TimeDelta report_interval_; + // Set from + // RTCPSender::Configuration::schedule_next_rtcp_send_evaluation_function. + const std::function + schedule_next_rtcp_send_evaluation_function_; mutable Mutex mutex_rtcp_sender_; bool sending_ RTC_GUARDED_BY(mutex_rtcp_sender_); diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc index 25c92b267a..77054576a8 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc @@ -20,12 +20,16 @@ #include #include "absl/types/optional.h" +#include "api/sequence_checker.h" #include "api/transport/field_trial_based_config.h" +#include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "modules/rtp_rtcp/source/rtcp_packet/dlrr.h" #include "modules/rtp_rtcp/source/rtp_rtcp_config.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "rtc_base/time_utils.h" #include "system_wrappers/include/ntp_time.h" #ifdef _WIN32 @@ -39,6 +43,22 @@ const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5; const int64_t kDefaultExpectedRetransmissionTimeMs = 125; constexpr TimeDelta kRttUpdateInterval = TimeDelta::Millis(1000); + +RTCPSender::Configuration AddRtcpSendEvaluationCallback( + RTCPSender::Configuration config, + std::function send_evaluation_callback) { + config.schedule_next_rtcp_send_evaluation_function = + std::move(send_evaluation_callback); + return config; +} + +int DelayMillisForDuration(TimeDelta duration) { + // TimeDelta::ms() rounds downwards sometimes which leads to too little time + // slept. Account for this, unless |duration| is exactly representable in + // millisecs. + return (duration.us() + rtc::kNumMillisecsPerSec - 1) / + rtc::kNumMicrosecsPerMillisec; +} } // namespace ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( @@ -57,8 +77,11 @@ void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber( ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) : worker_queue_(TaskQueueBase::Current()), - rtcp_sender_( - RTCPSender::Configuration::FromRtpRtcpConfiguration(configuration)), + rtcp_sender_(AddRtcpSendEvaluationCallback( + RTCPSender::Configuration::FromRtpRtcpConfiguration(configuration), + [this](TimeDelta duration) { + ScheduleRtcpSendEvaluation(duration); + })), rtcp_receiver_(configuration, this), clock_(configuration.clock), last_rtt_process_time_(clock_->TimeInMilliseconds()), @@ -139,11 +162,6 @@ void ModuleRtpRtcpImpl2::Process() { rtcp_sender_.SetTargetBitrate(target_bitrate); } } - - // TODO(bugs.webrtc.org/11581): Run this on a separate set of delayed tasks - // based off of next_time_to_send_rtcp_ in RTCPSender. - if (rtcp_sender_.TimeToSendRTCPReport()) - rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); } void ModuleRtpRtcpImpl2::SetRtxSendStatus(int mode) { @@ -771,4 +789,60 @@ void ModuleRtpRtcpImpl2::PeriodicUpdate() { rtcp_receiver_.NotifyTmmbrUpdated(); } +// RTC_RUN_ON(worker_queue_); +void ModuleRtpRtcpImpl2::MaybeSendRtcp() { + if (rtcp_sender_.TimeToSendRTCPReport()) + rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); +} + +// TODO(bugs.webrtc.org/12889): Consider removing this function when the issue +// is resolved. +// RTC_RUN_ON(worker_queue_); +void ModuleRtpRtcpImpl2::MaybeSendRtcpAtOrAfterTimestamp( + Timestamp execution_time) { + Timestamp now = clock_->CurrentTime(); + if (now >= execution_time) { + MaybeSendRtcp(); + return; + } + + RTC_DLOG(LS_WARNING) + << "BUGBUG: Task queue scheduled delayed call too early."; + + ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, execution_time - now); +} + +void ModuleRtpRtcpImpl2::ScheduleRtcpSendEvaluation(TimeDelta duration) { + // We end up here under various sequences including the worker queue, and + // the RTCPSender lock is held. + // We're assuming that the fact that RTCPSender executes under other sequences + // than the worker queue on which it's created on implies that external + // synchronization is present and removes this activity before destruction. + if (duration.IsZero()) { + worker_queue_->PostTask(ToQueuedTask(task_safety_, [this] { + RTC_DCHECK_RUN_ON(worker_queue_); + MaybeSendRtcp(); + })); + } else { + Timestamp execution_time = clock_->CurrentTime() + duration; + ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, duration); + } +} + +void ModuleRtpRtcpImpl2::ScheduleMaybeSendRtcpAtOrAfterTimestamp( + Timestamp execution_time, + TimeDelta duration) { + // We end up here under various sequences including the worker queue, and + // the RTCPSender lock is held. + // See note in ScheduleRtcpSendEvaluation about why |worker_queue_| can be + // accessed. + worker_queue_->PostDelayedTask( + ToQueuedTask(task_safety_, + [this, execution_time] { + RTC_DCHECK_RUN_ON(worker_queue_); + MaybeSendRtcpAtOrAfterTimestamp(execution_time); + }), + DelayMillisForDuration(duration)); +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h index 4c38517e85..849cc42c5e 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h @@ -23,6 +23,7 @@ #include "api/rtp_headers.h" #include "api/sequence_checker.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" #include "api/video/video_bitrate_allocation.h" #include "modules/include/module_fec_types.h" #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" @@ -32,7 +33,6 @@ #include "modules/rtp_rtcp/source/rtcp_sender.h" #include "modules/rtp_rtcp/source/rtp_packet_history.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" -#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h" #include "modules/rtp_rtcp/source/rtp_sender.h" #include "modules/rtp_rtcp/source/rtp_sender_egress.h" #include "rtc_base/gtest_prod_util.h" @@ -40,6 +40,8 @@ #include "rtc_base/system/no_unique_address.h" #include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/task_utils/repeating_task.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "rtc_base/thread_annotations.h" namespace webrtc { @@ -200,7 +202,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, int64_t ExpectedRetransmissionTimeMs() const override; // Force a send of an RTCP packet. - // Normal SR and RR are triggered via the process function. + // Normal SR and RR are triggered via the task queue that's current when this + // object is created. int32_t SendRTCP(RTCPPacketType rtcpPacketType) override; void GetSendStreamDataCounters( @@ -289,12 +292,28 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, // Returns true if the module is configured to store packets. bool StorePackets() const; + // Used from RtcpSenderMediator to maybe send rtcp. + void MaybeSendRtcp() RTC_RUN_ON(worker_queue_); + + // Called when |rtcp_sender_| informs of the next RTCP instant. The method may + // be called on various sequences, and is called under a RTCPSenderLock. + void ScheduleRtcpSendEvaluation(TimeDelta duration); + + // Helper method combating too early delayed calls from task queues. + // TODO(bugs.webrtc.org/12889): Consider removing this function when the issue + // is resolved. + void MaybeSendRtcpAtOrAfterTimestamp(Timestamp execution_time) + RTC_RUN_ON(worker_queue_); + + // Schedules a call to MaybeSendRtcpAtOrAfterTimestamp delayed by |duration|. + void ScheduleMaybeSendRtcpAtOrAfterTimestamp(Timestamp execution_time, + TimeDelta duration); + TaskQueueBase* const worker_queue_; RTC_NO_UNIQUE_ADDRESS SequenceChecker process_thread_checker_; RTC_NO_UNIQUE_ADDRESS SequenceChecker packet_sequence_checker_; std::unique_ptr rtp_sender_; - RTCPSender rtcp_sender_; RTCPReceiver rtcp_receiver_; @@ -316,6 +335,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, // The processed RTT from RtcpRttStats. mutable Mutex mutex_rtt_; int64_t rtt_ms_ RTC_GUARDED_BY(mutex_rtt_); + + RTC_NO_UNIQUE_ADDRESS ScopedTaskSafety task_safety_; }; } // namespace webrtc diff --git a/video/end_to_end_tests/network_state_tests.cc b/video/end_to_end_tests/network_state_tests.cc index 9abde3bb32..4e0e86f987 100644 --- a/video/end_to_end_tests/network_state_tests.cc +++ b/video/end_to_end_tests/network_state_tests.cc @@ -10,13 +10,19 @@ #include +#include "api/media_types.h" +#include "api/task_queue/default_task_queue_factory.h" +#include "api/task_queue/task_queue_base.h" +#include "api/task_queue/task_queue_factory.h" #include "api/test/simulated_network.h" #include "api/video_codecs/video_encoder.h" #include "call/fake_network_pipe.h" #include "call/simulated_network.h" #include "modules/rtp_rtcp/source/rtp_packet.h" +#include "rtc_base/location.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/task_queue_for_test.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "system_wrappers/include/sleep.h" #include "test/call_test.h" #include "test/fake_encoder.h" @@ -166,7 +172,10 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) { explicit NetworkStateTest(TaskQueueBase* task_queue) : EndToEndTest(kDefaultTimeoutMs), FakeEncoder(Clock::GetRealTimeClock()), - task_queue_(task_queue), + e2e_test_task_queue_(task_queue), + task_queue_(CreateDefaultTaskQueueFactory()->CreateTaskQueue( + "NetworkStateTest", + TaskQueueFactory::Priority::NORMAL)), sender_call_(nullptr), receiver_call_(nullptr), encoder_factory_(this), @@ -219,26 +228,36 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) { send_config->encoder_settings.encoder_factory = &encoder_factory_; } + void SignalChannelNetworkState(Call* call, + MediaType media_type, + NetworkState network_state) { + SendTask(RTC_FROM_HERE, e2e_test_task_queue_, + [call, media_type, network_state] { + call->SignalChannelNetworkState(media_type, network_state); + }); + } + void PerformTest() override { EXPECT_TRUE(encoded_frames_.Wait(kDefaultTimeoutMs)) << "No frames received by the encoder."; - SendTask(RTC_FROM_HERE, task_queue_, [this]() { + SendTask(RTC_FROM_HERE, task_queue_.get(), [this]() { // Wait for packets from both sender/receiver. WaitForPacketsOrSilence(false, false); // Sender-side network down for audio; there should be no effect on // video - sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkDown); + SignalChannelNetworkState(sender_call_, MediaType::AUDIO, kNetworkDown); + WaitForPacketsOrSilence(false, false); // Receiver-side network down for audio; no change expected - receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, - kNetworkDown); + SignalChannelNetworkState(receiver_call_, MediaType::AUDIO, + kNetworkDown); WaitForPacketsOrSilence(false, false); // Sender-side network down. - sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkDown); + SignalChannelNetworkState(sender_call_, MediaType::VIDEO, kNetworkDown); { MutexLock lock(&test_mutex_); // After network goes down we shouldn't be encoding more frames. @@ -248,14 +267,14 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) { WaitForPacketsOrSilence(true, false); // Receiver-side network down. - receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, - kNetworkDown); + SignalChannelNetworkState(receiver_call_, MediaType::VIDEO, + kNetworkDown); WaitForPacketsOrSilence(true, true); // Network up for audio for both sides; video is still not expected to // start - sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); - receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); + SignalChannelNetworkState(sender_call_, MediaType::AUDIO, kNetworkUp); + SignalChannelNetworkState(receiver_call_, MediaType::AUDIO, kNetworkUp); WaitForPacketsOrSilence(true, true); // Network back up again for both. @@ -265,8 +284,8 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) { // network. sender_state_ = kNetworkUp; } - sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); - receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); + SignalChannelNetworkState(sender_call_, MediaType::VIDEO, kNetworkUp); + SignalChannelNetworkState(receiver_call_, MediaType::VIDEO, kNetworkUp); WaitForPacketsOrSilence(false, false); // TODO(skvlad): add tests to verify that the audio streams are stopped @@ -340,7 +359,8 @@ TEST_F(NetworkStateEndToEndTest, RespectsNetworkState) { } } - TaskQueueBase* const task_queue_; + TaskQueueBase* const e2e_test_task_queue_; + std::unique_ptr task_queue_; Mutex test_mutex_; rtc::Event encoded_frames_; rtc::Event packet_event_; diff --git a/video/rtp_video_stream_receiver2_unittest.cc b/video/rtp_video_stream_receiver2_unittest.cc index d22f12e2b9..d23b6047aa 100644 --- a/video/rtp_video_stream_receiver2_unittest.cc +++ b/video/rtp_video_stream_receiver2_unittest.cc @@ -13,6 +13,7 @@ #include #include +#include "api/task_queue/task_queue_base.h" #include "api/video/video_codec_type.h" #include "api/video/video_frame_type.h" #include "common_video/h264/h264_common.h" @@ -38,6 +39,7 @@ #include "test/gtest.h" #include "test/mock_frame_transformer.h" #include "test/time_controller/simulated_task_queue.h" +#include "test/time_controller/simulated_time_controller.h" using ::testing::_; using ::testing::ElementsAre; @@ -158,7 +160,12 @@ class RtpVideoStreamReceiver2Test : public ::testing::Test, public: RtpVideoStreamReceiver2Test() : RtpVideoStreamReceiver2Test("") {} explicit RtpVideoStreamReceiver2Test(std::string field_trials) - : override_field_trials_(field_trials), + : time_controller_(Timestamp::Millis(100)), + task_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue( + "RtpVideoStreamReceiver2Test", + TaskQueueFactory::Priority::NORMAL)), + task_queue_setter_(task_queue_.get()), + override_field_trials_(field_trials), config_(CreateConfig()), process_thread_(ProcessThread::Create("TestThread")) { rtp_receive_statistics_ = @@ -233,8 +240,9 @@ class RtpVideoStreamReceiver2Test : public ::testing::Test, return config; } - TokenTaskQueue task_queue_; - TokenTaskQueue::CurrentTaskQueueSetter task_queue_setter_{&task_queue_}; + GlobalSimulatedTimeController time_controller_; + std::unique_ptr task_queue_; + TokenTaskQueue::CurrentTaskQueueSetter task_queue_setter_; const webrtc::test::ScopedFieldTrials override_field_trials_; VideoReceiveStream::Config config_;