diff --git a/modules/video_coding/BUILD.gn b/modules/video_coding/BUILD.gn index 2d0a6b3426..4061ab31ca 100644 --- a/modules/video_coding/BUILD.gn +++ b/modules/video_coding/BUILD.gn @@ -84,7 +84,11 @@ rtc_library("nack_module") { "../../rtc_base:checks", "../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_numerics", + "../../rtc_base:rtc_task_queue", "../../rtc_base/experiments:field_trial_parser", + "../../rtc_base/synchronization:sequence_checker", + "../../rtc_base/task_utils:pending_task_safety_flag", + "../../rtc_base/task_utils:repeating_task", "../../system_wrappers", "../../system_wrappers:field_trial", "../utility", diff --git a/modules/video_coding/nack_module2.cc b/modules/video_coding/nack_module2.cc index 267eaebb7a..8a3a731ed0 100644 --- a/modules/video_coding/nack_module2.cc +++ b/modules/video_coding/nack_module2.cc @@ -14,10 +14,10 @@ #include #include "api/units/timestamp.h" -#include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" #include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/logging.h" +#include "rtc_base/task_queue.h" #include "system_wrappers/include/field_trial.h" namespace webrtc { @@ -27,8 +27,6 @@ const int kMaxPacketAge = 10000; const int kMaxNackPackets = 1000; const int kDefaultRttMs = 100; const int kMaxNackRetries = 10; -const int kProcessFrequency = 50; -const int kProcessIntervalMs = 1000 / kProcessFrequency; const int kMaxReorderedPackets = 128; const int kNumReorderingBuckets = 10; const int kDefaultSendNackDelayMs = 0; @@ -45,6 +43,8 @@ int64_t GetSendNackDelay() { } } // namespace +constexpr TimeDelta NackModule2::kUpdateInterval; + NackModule2::NackInfo::NackInfo() : seq_num(0), send_at_seq_num(0), sent_at_time(-1), retries(0) {} @@ -88,32 +88,58 @@ NackModule2::BackoffSettings::ParseFromFieldTrials() { return absl::nullopt; } -NackModule2::NackModule2(Clock* clock, +NackModule2::NackModule2(TaskQueueBase* current_queue, + Clock* clock, NackSender* nack_sender, - KeyFrameRequestSender* keyframe_request_sender) - : clock_(clock), + KeyFrameRequestSender* keyframe_request_sender, + TimeDelta update_interval /*= kUpdateInterval*/) + : worker_thread_(current_queue), + update_interval_(update_interval), + clock_(clock), nack_sender_(nack_sender), keyframe_request_sender_(keyframe_request_sender), reordering_histogram_(kNumReorderingBuckets, kMaxReorderedPackets), initialized_(false), rtt_ms_(kDefaultRttMs), newest_seq_num_(0), - next_process_time_ms_(-1), send_nack_delay_ms_(GetSendNackDelay()), backoff_settings_(BackoffSettings::ParseFromFieldTrials()) { RTC_DCHECK(clock_); RTC_DCHECK(nack_sender_); RTC_DCHECK(keyframe_request_sender_); + RTC_DCHECK_GT(update_interval.ms(), 0); + RTC_DCHECK(worker_thread_); + RTC_DCHECK(worker_thread_->IsCurrent()); + + repeating_task_ = RepeatingTaskHandle::DelayedStart( + TaskQueueBase::Current(), update_interval_, + [this]() { + RTC_DCHECK_RUN_ON(worker_thread_); + std::vector nack_batch = GetNackBatch(kTimeOnly); + if (!nack_batch.empty()) { + // This batch of NACKs is triggered externally; there is no external + // initiator who can batch them with other feedback messages. + nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false); + } + return update_interval_; + }, + clock_); +} + +NackModule2::~NackModule2() { + RTC_DCHECK_RUN_ON(worker_thread_); + repeating_task_.Stop(); } int NackModule2::OnReceivedPacket(uint16_t seq_num, bool is_keyframe) { + RTC_DCHECK_RUN_ON(worker_thread_); return OnReceivedPacket(seq_num, is_keyframe, false); } int NackModule2::OnReceivedPacket(uint16_t seq_num, bool is_keyframe, bool is_recovered) { - rtc::CritScope lock(&crit_); + RTC_DCHECK_RUN_ON(worker_thread_); // TODO(philipel): When the packet includes information whether it is // retransmitted or not, use that value instead. For // now set it to true, which will cause the reordering @@ -182,61 +208,24 @@ int NackModule2::OnReceivedPacket(uint16_t seq_num, } void NackModule2::ClearUpTo(uint16_t seq_num) { - rtc::CritScope lock(&crit_); - nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num)); - keyframe_list_.erase(keyframe_list_.begin(), - keyframe_list_.lower_bound(seq_num)); - recovered_list_.erase(recovered_list_.begin(), - recovered_list_.lower_bound(seq_num)); + // Called via RtpVideoStreamReceiver2::FrameContinuous on the network thread. + worker_thread_->PostTask(ToQueuedTask(task_safety_, [seq_num, this]() { + RTC_DCHECK_RUN_ON(worker_thread_); + nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num)); + keyframe_list_.erase(keyframe_list_.begin(), + keyframe_list_.lower_bound(seq_num)); + recovered_list_.erase(recovered_list_.begin(), + recovered_list_.lower_bound(seq_num)); + })); } void NackModule2::UpdateRtt(int64_t rtt_ms) { - rtc::CritScope lock(&crit_); + RTC_DCHECK_RUN_ON(worker_thread_); rtt_ms_ = rtt_ms; } -void NackModule2::Clear() { - rtc::CritScope lock(&crit_); - nack_list_.clear(); - keyframe_list_.clear(); - recovered_list_.clear(); -} - -int64_t NackModule2::TimeUntilNextProcess() { - return std::max(next_process_time_ms_ - clock_->TimeInMilliseconds(), - 0); -} - -void NackModule2::Process() { - if (nack_sender_) { - std::vector nack_batch; - { - rtc::CritScope lock(&crit_); - nack_batch = GetNackBatch(kTimeOnly); - } - - if (!nack_batch.empty()) { - // This batch of NACKs is triggered externally; there is no external - // initiator who can batch them with other feedback messages. - nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false); - } - } - - // Update the next_process_time_ms_ in intervals to achieve - // the targeted frequency over time. Also add multiple intervals - // in case of a skip in time as to not make uneccessary - // calls to Process in order to catch up. - int64_t now_ms = clock_->TimeInMilliseconds(); - if (next_process_time_ms_ == -1) { - next_process_time_ms_ = now_ms + kProcessIntervalMs; - } else { - next_process_time_ms_ = next_process_time_ms_ + kProcessIntervalMs + - (now_ms - next_process_time_ms_) / - kProcessIntervalMs * kProcessIntervalMs; - } -} - bool NackModule2::RemovePacketsUntilKeyFrame() { + // Called on worker_thread_. while (!keyframe_list_.empty()) { auto it = nack_list_.lower_bound(*keyframe_list_.begin()); @@ -256,6 +245,7 @@ bool NackModule2::RemovePacketsUntilKeyFrame() { void NackModule2::AddPacketsToNack(uint16_t seq_num_start, uint16_t seq_num_end) { + // Called on worker_thread_. // Remove old packets. auto it = nack_list_.lower_bound(seq_num_end - kMaxPacketAge); nack_list_.erase(nack_list_.begin(), it); @@ -290,6 +280,8 @@ void NackModule2::AddPacketsToNack(uint16_t seq_num_start, } std::vector NackModule2::GetNackBatch(NackFilterOptions options) { + // Called on worker_thread_. + bool consider_seq_num = options != kTimeOnly; bool consider_timestamp = options != kSeqNumOnly; Timestamp now = clock_->CurrentTime(); @@ -335,12 +327,14 @@ std::vector NackModule2::GetNackBatch(NackFilterOptions options) { } void NackModule2::UpdateReorderingStatistics(uint16_t seq_num) { + // Running on worker_thread_. RTC_DCHECK(AheadOf(newest_seq_num_, seq_num)); uint16_t diff = ReverseDiff(newest_seq_num_, seq_num); reordering_histogram_.Add(diff); } int NackModule2::WaitNumberOfPackets(float probability) const { + // Called on worker_thread_; if (reordering_histogram_.NumValues() == 0) return 0; return reordering_histogram_.InverseCdf(probability); diff --git a/modules/video_coding/nack_module2.h b/modules/video_coding/nack_module2.h index 6518f32bb6..89dd082192 100644 --- a/modules/video_coding/nack_module2.h +++ b/modules/video_coding/nack_module2.h @@ -18,32 +18,37 @@ #include #include "api/units/time_delta.h" -#include "modules/include/module.h" #include "modules/include/module_common_types.h" #include "modules/video_coding/histogram.h" -#include "rtc_base/critical_section.h" #include "rtc_base/numerics/sequence_number_util.h" +#include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" #include "system_wrappers/include/clock.h" namespace webrtc { -class NackModule2 final : public Module { +// TODO(bugs.webrtc.org/11594): This class no longer implements the Module +// interface and therefore "NackModule" may not be a descriptive name anymore. +// Consider renaming to e.g. NackTracker or NackRequester. +class NackModule2 final { public: - NackModule2(Clock* clock, + static constexpr TimeDelta kUpdateInterval = TimeDelta::Millis(20); + + NackModule2(TaskQueueBase* current_queue, + Clock* clock, NackSender* nack_sender, - KeyFrameRequestSender* keyframe_request_sender); + KeyFrameRequestSender* keyframe_request_sender, + TimeDelta update_interval = kUpdateInterval); + ~NackModule2(); int OnReceivedPacket(uint16_t seq_num, bool is_keyframe); int OnReceivedPacket(uint16_t seq_num, bool is_keyframe, bool is_recovered); void ClearUpTo(uint16_t seq_num); void UpdateRtt(int64_t rtt_ms); - void Clear(); - - // Module implementation - int64_t TimeUntilNextProcess() override; - void Process() override; private: // Which fields to consider when deciding which packet to nack in @@ -79,24 +84,30 @@ class NackModule2 final : public Module { }; void AddPacketsToNack(uint16_t seq_num_start, uint16_t seq_num_end) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); // Removes packets from the nack list until the next keyframe. Returns true // if packets were removed. - bool RemovePacketsUntilKeyFrame() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + bool RemovePacketsUntilKeyFrame() + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); std::vector GetNackBatch(NackFilterOptions options) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); // Update the reordering distribution. void UpdateReorderingStatistics(uint16_t seq_num) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); // Returns how many packets we have to wait in order to receive the packet // with probability |probabilty| or higher. int WaitNumberOfPackets(float probability) const - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); + + TaskQueueBase* const worker_thread_; + + // Used to regularly call SendNack if needed. + RepeatingTaskHandle repeating_task_ RTC_GUARDED_BY(worker_thread_); + const TimeDelta update_interval_; - rtc::CriticalSection crit_; Clock* const clock_; NackSender* const nack_sender_; KeyFrameRequestSender* const keyframe_request_sender_; @@ -105,23 +116,23 @@ class NackModule2 final : public Module { // known thread (e.g. see |initialized_|). Those probably do not need // synchronized access. std::map> nack_list_ - RTC_GUARDED_BY(crit_); + RTC_GUARDED_BY(worker_thread_); std::set> keyframe_list_ - RTC_GUARDED_BY(crit_); + RTC_GUARDED_BY(worker_thread_); std::set> recovered_list_ - RTC_GUARDED_BY(crit_); - video_coding::Histogram reordering_histogram_ RTC_GUARDED_BY(crit_); - bool initialized_ RTC_GUARDED_BY(crit_); - int64_t rtt_ms_ RTC_GUARDED_BY(crit_); - uint16_t newest_seq_num_ RTC_GUARDED_BY(crit_); - - // Only touched on the process thread. - int64_t next_process_time_ms_; + RTC_GUARDED_BY(worker_thread_); + video_coding::Histogram reordering_histogram_ RTC_GUARDED_BY(worker_thread_); + bool initialized_ RTC_GUARDED_BY(worker_thread_); + int64_t rtt_ms_ RTC_GUARDED_BY(worker_thread_); + uint16_t newest_seq_num_ RTC_GUARDED_BY(worker_thread_); // Adds a delay before send nack on packet received. const int64_t send_nack_delay_ms_; const absl::optional backoff_settings_; + + // Used to signal destruction to potentially pending tasks. + ScopedTaskSafety task_safety_; }; } // namespace webrtc diff --git a/modules/video_coding/nack_module2_unittest.cc b/modules/video_coding/nack_module2_unittest.cc index ebc28ecb5a..acd1eead01 100644 --- a/modules/video_coding/nack_module2_unittest.cc +++ b/modules/video_coding/nack_module2_unittest.cc @@ -18,8 +18,12 @@ #include "system_wrappers/include/clock.h" #include "test/field_trial.h" #include "test/gtest.h" +#include "test/run_loop.h" namespace webrtc { +// TODO(bugs.webrtc.org/11594): Use the use the GlobalSimulatedTimeController +// instead of RunLoop. At the moment we mix use of the Clock and the underlying +// implementation of RunLoop, which is realtime. class TestNackModule2 : public ::testing::TestWithParam, public NackSender, public KeyFrameRequestSender { @@ -29,68 +33,116 @@ class TestNackModule2 : public ::testing::TestWithParam, field_trial_(GetParam() ? "WebRTC-ExponentialNackBackoff/enabled:true/" : "WebRTC-ExponentialNackBackoff/enabled:false/"), - nack_module_(clock_.get(), this, this), keyframes_requested_(0) {} - void SetUp() override { nack_module_.UpdateRtt(kDefaultRttMs); } + void SetUp() override {} void SendNack(const std::vector& sequence_numbers, bool buffering_allowed) override { sent_nacks_.insert(sent_nacks_.end(), sequence_numbers.begin(), sequence_numbers.end()); + if (waiting_for_send_nack_) { + waiting_for_send_nack_ = false; + loop_.Quit(); + } } void RequestKeyFrame() override { ++keyframes_requested_; } + void Flush() { + // nack_module.Process(); + loop_.Flush(); + } + + bool WaitForSendNack() { + if (timed_out_) { + RTC_NOTREACHED(); + return false; + } + + RTC_DCHECK(!waiting_for_send_nack_); + + waiting_for_send_nack_ = true; + loop_.PostDelayedTask( + [this]() { + timed_out_ = true; + loop_.Quit(); + }, + 1000); + + loop_.Run(); + + if (timed_out_) + return false; + + RTC_DCHECK(!waiting_for_send_nack_); + return true; + } + + NackModule2& CreateNackModule( + TimeDelta interval = NackModule2::kUpdateInterval) { + RTC_DCHECK(!nack_module_.get()); + nack_module_ = std::make_unique( + TaskQueueBase::Current(), clock_.get(), this, this, interval); + nack_module_->UpdateRtt(kDefaultRttMs); + return *nack_module_.get(); + } + static constexpr int64_t kDefaultRttMs = 20; + test::RunLoop loop_; std::unique_ptr clock_; test::ScopedFieldTrials field_trial_; - NackModule2 nack_module_; + std::unique_ptr nack_module_; std::vector sent_nacks_; int keyframes_requested_; + bool waiting_for_send_nack_ = false; + bool timed_out_ = false; }; TEST_P(TestNackModule2, NackOnePacket) { - nack_module_.OnReceivedPacket(1, false, false); - nack_module_.OnReceivedPacket(3, false, false); - EXPECT_EQ(1u, sent_nacks_.size()); + NackModule2& nack_module = CreateNackModule(); + nack_module.OnReceivedPacket(1, false, false); + nack_module.OnReceivedPacket(3, false, false); + ASSERT_EQ(1u, sent_nacks_.size()); EXPECT_EQ(2, sent_nacks_[0]); } TEST_P(TestNackModule2, WrappingSeqNum) { - nack_module_.OnReceivedPacket(0xfffe, false, false); - nack_module_.OnReceivedPacket(1, false, false); - EXPECT_EQ(2u, sent_nacks_.size()); + NackModule2& nack_module = CreateNackModule(); + nack_module.OnReceivedPacket(0xfffe, false, false); + nack_module.OnReceivedPacket(1, false, false); + ASSERT_EQ(2u, sent_nacks_.size()); EXPECT_EQ(0xffff, sent_nacks_[0]); EXPECT_EQ(0, sent_nacks_[1]); } TEST_P(TestNackModule2, WrappingSeqNumClearToKeyframe) { - nack_module_.OnReceivedPacket(0xfffe, false, false); - nack_module_.OnReceivedPacket(1, false, false); - EXPECT_EQ(2u, sent_nacks_.size()); + NackModule2& nack_module = CreateNackModule(TimeDelta::Millis(10)); + nack_module.OnReceivedPacket(0xfffe, false, false); + nack_module.OnReceivedPacket(1, false, false); + ASSERT_EQ(2u, sent_nacks_.size()); EXPECT_EQ(0xffff, sent_nacks_[0]); EXPECT_EQ(0, sent_nacks_[1]); sent_nacks_.clear(); - nack_module_.OnReceivedPacket(2, true, false); - EXPECT_EQ(0u, sent_nacks_.size()); + nack_module.OnReceivedPacket(2, true, false); + ASSERT_EQ(0u, sent_nacks_.size()); - nack_module_.OnReceivedPacket(501, true, false); - EXPECT_EQ(498u, sent_nacks_.size()); + nack_module.OnReceivedPacket(501, true, false); + ASSERT_EQ(498u, sent_nacks_.size()); for (int seq_num = 3; seq_num < 501; ++seq_num) EXPECT_EQ(seq_num, sent_nacks_[seq_num - 3]); sent_nacks_.clear(); - nack_module_.OnReceivedPacket(1001, false, false); + nack_module.OnReceivedPacket(1001, false, false); EXPECT_EQ(499u, sent_nacks_.size()); for (int seq_num = 502; seq_num < 1001; ++seq_num) EXPECT_EQ(seq_num, sent_nacks_[seq_num - 502]); sent_nacks_.clear(); clock_->AdvanceTimeMilliseconds(100); - nack_module_.Process(); - EXPECT_EQ(999u, sent_nacks_.size()); + ASSERT_TRUE(WaitForSendNack()); + ASSERT_EQ(999u, sent_nacks_.size()); EXPECT_EQ(0xffff, sent_nacks_[0]); EXPECT_EQ(0, sent_nacks_[1]); for (int seq_num = 3; seq_num < 501; ++seq_num) @@ -102,15 +154,15 @@ TEST_P(TestNackModule2, WrappingSeqNumClearToKeyframe) { // It will then clear all nacks up to the next keyframe (seq num 2), // thus removing 0xffff and 0 from the nack list. sent_nacks_.clear(); - nack_module_.OnReceivedPacket(1004, false, false); - EXPECT_EQ(2u, sent_nacks_.size()); + nack_module.OnReceivedPacket(1004, false, false); + ASSERT_EQ(2u, sent_nacks_.size()); EXPECT_EQ(1002, sent_nacks_[0]); EXPECT_EQ(1003, sent_nacks_[1]); sent_nacks_.clear(); clock_->AdvanceTimeMilliseconds(100); - nack_module_.Process(); - EXPECT_EQ(999u, sent_nacks_.size()); + ASSERT_TRUE(WaitForSendNack()); + ASSERT_EQ(999u, sent_nacks_.size()); for (int seq_num = 3; seq_num < 501; ++seq_num) EXPECT_EQ(seq_num, sent_nacks_[seq_num - 3]); for (int seq_num = 502; seq_num < 1001; ++seq_num) @@ -118,65 +170,39 @@ TEST_P(TestNackModule2, WrappingSeqNumClearToKeyframe) { // Adding packet 1007 will cause the nack module to overflow again, thus // clearing everything up to 501 which is the next keyframe. - nack_module_.OnReceivedPacket(1007, false, false); + nack_module.OnReceivedPacket(1007, false, false); sent_nacks_.clear(); clock_->AdvanceTimeMilliseconds(100); - nack_module_.Process(); - EXPECT_EQ(503u, sent_nacks_.size()); + ASSERT_TRUE(WaitForSendNack()); + ASSERT_EQ(503u, sent_nacks_.size()); for (int seq_num = 502; seq_num < 1001; ++seq_num) EXPECT_EQ(seq_num, sent_nacks_[seq_num - 502]); EXPECT_EQ(1005, sent_nacks_[501]); EXPECT_EQ(1006, sent_nacks_[502]); } -TEST_P(TestNackModule2, DontBurstOnTimeSkip) { - nack_module_.Process(); - clock_->AdvanceTimeMilliseconds(20); - EXPECT_EQ(0, nack_module_.TimeUntilNextProcess()); - nack_module_.Process(); - - clock_->AdvanceTimeMilliseconds(100); - EXPECT_EQ(0, nack_module_.TimeUntilNextProcess()); - nack_module_.Process(); - EXPECT_EQ(20, nack_module_.TimeUntilNextProcess()); - - clock_->AdvanceTimeMilliseconds(19); - EXPECT_EQ(1, nack_module_.TimeUntilNextProcess()); - clock_->AdvanceTimeMilliseconds(2); - nack_module_.Process(); - EXPECT_EQ(19, nack_module_.TimeUntilNextProcess()); - - clock_->AdvanceTimeMilliseconds(19); - EXPECT_EQ(0, nack_module_.TimeUntilNextProcess()); - nack_module_.Process(); - - clock_->AdvanceTimeMilliseconds(21); - EXPECT_EQ(0, nack_module_.TimeUntilNextProcess()); - nack_module_.Process(); - EXPECT_EQ(19, nack_module_.TimeUntilNextProcess()); -} - TEST_P(TestNackModule2, ResendNack) { - nack_module_.OnReceivedPacket(1, false, false); - nack_module_.OnReceivedPacket(3, false, false); + NackModule2& nack_module = CreateNackModule(TimeDelta::Millis(1)); + nack_module.OnReceivedPacket(1, false, false); + nack_module.OnReceivedPacket(3, false, false); size_t expected_nacks_sent = 1; - EXPECT_EQ(expected_nacks_sent, sent_nacks_.size()); + ASSERT_EQ(expected_nacks_sent, sent_nacks_.size()); EXPECT_EQ(2, sent_nacks_[0]); if (GetParam()) { // Retry has to wait at least 5ms by default. - nack_module_.UpdateRtt(1); + nack_module.UpdateRtt(1); clock_->AdvanceTimeMilliseconds(4); - nack_module_.Process(); // Too early. + Flush(); // Too early. EXPECT_EQ(expected_nacks_sent, sent_nacks_.size()); clock_->AdvanceTimeMilliseconds(1); - nack_module_.Process(); // Now allowed. + WaitForSendNack(); // Now allowed. EXPECT_EQ(++expected_nacks_sent, sent_nacks_.size()); } else { - nack_module_.UpdateRtt(1); + nack_module.UpdateRtt(1); clock_->AdvanceTimeMilliseconds(1); - nack_module_.Process(); // Fast retransmit allowed. + WaitForSendNack(); // Fast retransmit allowed. EXPECT_EQ(++expected_nacks_sent, sent_nacks_.size()); } @@ -185,7 +211,7 @@ TEST_P(TestNackModule2, ResendNack) { for (int i = 2; i < 10; ++i) { // Change RTT, above the 40ms max for exponential backoff. TimeDelta rtt = TimeDelta::Millis(160); // + (i * 10 - 40) - nack_module_.UpdateRtt(rtt.ms()); + nack_module.UpdateRtt(rtt.ms()); // RTT gets capped at 160ms in backoff calculations. TimeDelta expected_backoff_delay = @@ -193,26 +219,27 @@ TEST_P(TestNackModule2, ResendNack) { // Move to one millisecond before next allowed NACK. clock_->AdvanceTimeMilliseconds(expected_backoff_delay.ms() - 1); - nack_module_.Process(); + Flush(); EXPECT_EQ(expected_nacks_sent, sent_nacks_.size()); // Move to one millisecond after next allowed NACK. // After rather than on to avoid rounding errors. clock_->AdvanceTimeMilliseconds(2); - nack_module_.Process(); // Now allowed. + WaitForSendNack(); // Now allowed. EXPECT_EQ(++expected_nacks_sent, sent_nacks_.size()); } // Giving up after 10 tries. clock_->AdvanceTimeMilliseconds(3000); - nack_module_.Process(); + Flush(); EXPECT_EQ(expected_nacks_sent, sent_nacks_.size()); } TEST_P(TestNackModule2, ResendPacketMaxRetries) { - nack_module_.OnReceivedPacket(1, false, false); - nack_module_.OnReceivedPacket(3, false, false); - EXPECT_EQ(1u, sent_nacks_.size()); + NackModule2& nack_module = CreateNackModule(TimeDelta::Millis(1)); + nack_module.OnReceivedPacket(1, false, false); + nack_module.OnReceivedPacket(3, false, false); + ASSERT_EQ(1u, sent_nacks_.size()); EXPECT_EQ(2, sent_nacks_[0]); int backoff_factor = 1; @@ -220,111 +247,124 @@ TEST_P(TestNackModule2, ResendPacketMaxRetries) { // Exponential backoff, so that we don't reject NACK because of time. clock_->AdvanceTimeMilliseconds(backoff_factor * kDefaultRttMs); backoff_factor *= 2; - nack_module_.Process(); + WaitForSendNack(); EXPECT_EQ(retries + 1, sent_nacks_.size()); } clock_->AdvanceTimeMilliseconds(backoff_factor * kDefaultRttMs); - nack_module_.Process(); + Flush(); EXPECT_EQ(10u, sent_nacks_.size()); } TEST_P(TestNackModule2, TooLargeNackList) { - nack_module_.OnReceivedPacket(0, false, false); - nack_module_.OnReceivedPacket(1001, false, false); + NackModule2& nack_module = CreateNackModule(); + nack_module.OnReceivedPacket(0, false, false); + nack_module.OnReceivedPacket(1001, false, false); EXPECT_EQ(1000u, sent_nacks_.size()); EXPECT_EQ(0, keyframes_requested_); - nack_module_.OnReceivedPacket(1003, false, false); + nack_module.OnReceivedPacket(1003, false, false); EXPECT_EQ(1000u, sent_nacks_.size()); EXPECT_EQ(1, keyframes_requested_); - nack_module_.OnReceivedPacket(1004, false, false); + nack_module.OnReceivedPacket(1004, false, false); EXPECT_EQ(1000u, sent_nacks_.size()); EXPECT_EQ(1, keyframes_requested_); } TEST_P(TestNackModule2, TooLargeNackListWithKeyFrame) { - nack_module_.OnReceivedPacket(0, false, false); - nack_module_.OnReceivedPacket(1, true, false); - nack_module_.OnReceivedPacket(1001, false, false); + NackModule2& nack_module = CreateNackModule(); + nack_module.OnReceivedPacket(0, false, false); + nack_module.OnReceivedPacket(1, true, false); + nack_module.OnReceivedPacket(1001, false, false); EXPECT_EQ(999u, sent_nacks_.size()); EXPECT_EQ(0, keyframes_requested_); - nack_module_.OnReceivedPacket(1003, false, false); + nack_module.OnReceivedPacket(1003, false, false); EXPECT_EQ(1000u, sent_nacks_.size()); EXPECT_EQ(0, keyframes_requested_); - nack_module_.OnReceivedPacket(1005, false, false); + nack_module.OnReceivedPacket(1005, false, false); EXPECT_EQ(1000u, sent_nacks_.size()); EXPECT_EQ(1, keyframes_requested_); } TEST_P(TestNackModule2, ClearUpTo) { - nack_module_.OnReceivedPacket(0, false, false); - nack_module_.OnReceivedPacket(100, false, false); + NackModule2& nack_module = CreateNackModule(TimeDelta::Millis(1)); + nack_module.OnReceivedPacket(0, false, false); + nack_module.OnReceivedPacket(100, false, false); EXPECT_EQ(99u, sent_nacks_.size()); sent_nacks_.clear(); clock_->AdvanceTimeMilliseconds(100); - nack_module_.ClearUpTo(50); - nack_module_.Process(); - EXPECT_EQ(50u, sent_nacks_.size()); + nack_module.ClearUpTo(50); + WaitForSendNack(); + ASSERT_EQ(50u, sent_nacks_.size()); EXPECT_EQ(50, sent_nacks_[0]); } TEST_P(TestNackModule2, ClearUpToWrap) { - nack_module_.OnReceivedPacket(0xfff0, false, false); - nack_module_.OnReceivedPacket(0xf, false, false); + NackModule2& nack_module = CreateNackModule(); + nack_module.OnReceivedPacket(0xfff0, false, false); + nack_module.OnReceivedPacket(0xf, false, false); EXPECT_EQ(30u, sent_nacks_.size()); sent_nacks_.clear(); clock_->AdvanceTimeMilliseconds(100); - nack_module_.ClearUpTo(0); - nack_module_.Process(); - EXPECT_EQ(15u, sent_nacks_.size()); + nack_module.ClearUpTo(0); + WaitForSendNack(); + ASSERT_EQ(15u, sent_nacks_.size()); EXPECT_EQ(0, sent_nacks_[0]); } TEST_P(TestNackModule2, PacketNackCount) { - EXPECT_EQ(0, nack_module_.OnReceivedPacket(0, false, false)); - EXPECT_EQ(0, nack_module_.OnReceivedPacket(2, false, false)); - EXPECT_EQ(1, nack_module_.OnReceivedPacket(1, false, false)); + NackModule2& nack_module = CreateNackModule(TimeDelta::Millis(1)); + EXPECT_EQ(0, nack_module.OnReceivedPacket(0, false, false)); + EXPECT_EQ(0, nack_module.OnReceivedPacket(2, false, false)); + EXPECT_EQ(1, nack_module.OnReceivedPacket(1, false, false)); sent_nacks_.clear(); - nack_module_.UpdateRtt(100); - EXPECT_EQ(0, nack_module_.OnReceivedPacket(5, false, false)); + nack_module.UpdateRtt(100); + EXPECT_EQ(0, nack_module.OnReceivedPacket(5, false, false)); clock_->AdvanceTimeMilliseconds(100); - nack_module_.Process(); + WaitForSendNack(); + EXPECT_EQ(4u, sent_nacks_.size()); + clock_->AdvanceTimeMilliseconds(125); - nack_module_.Process(); - EXPECT_EQ(3, nack_module_.OnReceivedPacket(3, false, false)); - EXPECT_EQ(3, nack_module_.OnReceivedPacket(4, false, false)); - EXPECT_EQ(0, nack_module_.OnReceivedPacket(4, false, false)); + WaitForSendNack(); + + EXPECT_EQ(6u, sent_nacks_.size()); + + EXPECT_EQ(3, nack_module.OnReceivedPacket(3, false, false)); + EXPECT_EQ(3, nack_module.OnReceivedPacket(4, false, false)); + EXPECT_EQ(0, nack_module.OnReceivedPacket(4, false, false)); } TEST_P(TestNackModule2, NackListFullAndNoOverlapWithKeyframes) { + NackModule2& nack_module = CreateNackModule(); const int kMaxNackPackets = 1000; const unsigned int kFirstGap = kMaxNackPackets - 20; const unsigned int kSecondGap = 200; uint16_t seq_num = 0; - nack_module_.OnReceivedPacket(seq_num++, true, false); + nack_module.OnReceivedPacket(seq_num++, true, false); seq_num += kFirstGap; - nack_module_.OnReceivedPacket(seq_num++, true, false); + nack_module.OnReceivedPacket(seq_num++, true, false); EXPECT_EQ(kFirstGap, sent_nacks_.size()); sent_nacks_.clear(); seq_num += kSecondGap; - nack_module_.OnReceivedPacket(seq_num, true, false); + nack_module.OnReceivedPacket(seq_num, true, false); EXPECT_EQ(kSecondGap, sent_nacks_.size()); } TEST_P(TestNackModule2, HandleFecRecoveredPacket) { - nack_module_.OnReceivedPacket(1, false, false); - nack_module_.OnReceivedPacket(4, false, true); + NackModule2& nack_module = CreateNackModule(); + nack_module.OnReceivedPacket(1, false, false); + nack_module.OnReceivedPacket(4, false, true); EXPECT_EQ(0u, sent_nacks_.size()); - nack_module_.OnReceivedPacket(5, false, false); + nack_module.OnReceivedPacket(5, false, false); EXPECT_EQ(2u, sent_nacks_.size()); } TEST_P(TestNackModule2, SendNackWithoutDelay) { - nack_module_.OnReceivedPacket(0, false, false); - nack_module_.OnReceivedPacket(100, false, false); + NackModule2& nack_module = CreateNackModule(); + nack_module.OnReceivedPacket(0, false, false); + nack_module.OnReceivedPacket(100, false, false); EXPECT_EQ(99u, sent_nacks_.size()); } @@ -339,7 +379,7 @@ class TestNackModule2WithFieldTrial : public ::testing::Test, TestNackModule2WithFieldTrial() : nack_delay_field_trial_("WebRTC-SendNackDelayMs/10/"), clock_(new SimulatedClock(0)), - nack_module_(clock_.get(), this, this), + nack_module_(TaskQueueBase::Current(), clock_.get(), this, this), keyframes_requested_(0) {} void SendNack(const std::vector& sequence_numbers, diff --git a/video/rtp_video_stream_receiver2.cc b/video/rtp_video_stream_receiver2.cc index 98351c49cf..2c7bd4bb4e 100644 --- a/video/rtp_video_stream_receiver2.cc +++ b/video/rtp_video_stream_receiver2.cc @@ -103,7 +103,21 @@ std::unique_ptr CreateRtpRtcpModule( return rtp_rtcp; } +std::unique_ptr MaybeConstructNackModule( + TaskQueueBase* current_queue, + const VideoReceiveStream::Config& config, + Clock* clock, + NackSender* nack_sender, + KeyFrameRequestSender* keyframe_request_sender) { + if (config.rtp.nack.rtp_history_ms == 0) + return nullptr; + + return std::make_unique(current_queue, clock, nack_sender, + keyframe_request_sender); +} + static const int kPacketLogIntervalMs = 10000; + } // namespace RtpVideoStreamReceiver2::RtcpFeedbackBuffer::RtcpFeedbackBuffer( @@ -120,22 +134,22 @@ RtpVideoStreamReceiver2::RtcpFeedbackBuffer::RtcpFeedbackBuffer( } void RtpVideoStreamReceiver2::RtcpFeedbackBuffer::RequestKeyFrame() { - rtc::CritScope lock(&cs_); + RTC_DCHECK_RUN_ON(&worker_task_checker_); request_key_frame_ = true; } void RtpVideoStreamReceiver2::RtcpFeedbackBuffer::SendNack( const std::vector& sequence_numbers, bool buffering_allowed) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); RTC_DCHECK(!sequence_numbers.empty()); - rtc::CritScope lock(&cs_); nack_sequence_numbers_.insert(nack_sequence_numbers_.end(), sequence_numbers.cbegin(), sequence_numbers.cend()); if (!buffering_allowed) { // Note that while *buffering* is not allowed, *batching* is, meaning that // previously buffered messages may be sent along with the current message. - SendRtcpFeedback(ConsumeRtcpFeedbackLocked()); + SendBufferedRtcpFeedback(); } } @@ -144,8 +158,8 @@ void RtpVideoStreamReceiver2::RtcpFeedbackBuffer::SendLossNotification( uint16_t last_received_seq_num, bool decodability_flag, bool buffering_allowed) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); RTC_DCHECK(buffering_allowed); - rtc::CritScope lock(&cs_); RTC_DCHECK(!lntf_state_) << "SendLossNotification() called twice in a row with no call to " "SendBufferedRtcpFeedback() in between."; @@ -154,48 +168,38 @@ void RtpVideoStreamReceiver2::RtcpFeedbackBuffer::SendLossNotification( } void RtpVideoStreamReceiver2::RtcpFeedbackBuffer::SendBufferedRtcpFeedback() { - SendRtcpFeedback(ConsumeRtcpFeedback()); -} + RTC_DCHECK_RUN_ON(&worker_task_checker_); -RtpVideoStreamReceiver2::RtcpFeedbackBuffer::ConsumedRtcpFeedback -RtpVideoStreamReceiver2::RtcpFeedbackBuffer::ConsumeRtcpFeedback() { - rtc::CritScope lock(&cs_); - return ConsumeRtcpFeedbackLocked(); -} + bool request_key_frame = false; + std::vector nack_sequence_numbers; + absl::optional lntf_state; -RtpVideoStreamReceiver2::RtcpFeedbackBuffer::ConsumedRtcpFeedback -RtpVideoStreamReceiver2::RtcpFeedbackBuffer::ConsumeRtcpFeedbackLocked() { - ConsumedRtcpFeedback feedback; - std::swap(feedback.request_key_frame, request_key_frame_); - std::swap(feedback.nack_sequence_numbers, nack_sequence_numbers_); - std::swap(feedback.lntf_state, lntf_state_); - return feedback; -} + std::swap(request_key_frame, request_key_frame_); + std::swap(nack_sequence_numbers, nack_sequence_numbers_); + std::swap(lntf_state, lntf_state_); -void RtpVideoStreamReceiver2::RtcpFeedbackBuffer::SendRtcpFeedback( - ConsumedRtcpFeedback feedback) { - if (feedback.lntf_state) { + if (lntf_state) { // If either a NACK or a key frame request is sent, we should buffer // the LNTF and wait for them (NACK or key frame request) to trigger // the compound feedback message. // Otherwise, the LNTF should be sent out immediately. const bool buffering_allowed = - feedback.request_key_frame || !feedback.nack_sequence_numbers.empty(); + request_key_frame || !nack_sequence_numbers.empty(); loss_notification_sender_->SendLossNotification( - feedback.lntf_state->last_decoded_seq_num, - feedback.lntf_state->last_received_seq_num, - feedback.lntf_state->decodability_flag, buffering_allowed); + lntf_state->last_decoded_seq_num, lntf_state->last_received_seq_num, + lntf_state->decodability_flag, buffering_allowed); } - if (feedback.request_key_frame) { + if (request_key_frame) { key_frame_request_sender_->RequestKeyFrame(); - } else if (!feedback.nack_sequence_numbers.empty()) { - nack_sender_->SendNack(feedback.nack_sequence_numbers, true); + } else if (!nack_sequence_numbers.empty()) { + nack_sender_->SendNack(nack_sequence_numbers, true); } } RtpVideoStreamReceiver2::RtpVideoStreamReceiver2( + TaskQueueBase* current_queue, Clock* clock, Transport* transport, RtcpRttStats* rtt_stats, @@ -236,6 +240,11 @@ RtpVideoStreamReceiver2::RtpVideoStreamReceiver2( // TODO(bugs.webrtc.org/10336): Let |rtcp_feedback_buffer_| communicate // directly with |rtp_rtcp_|. rtcp_feedback_buffer_(this, nack_sender, this), + nack_module_(MaybeConstructNackModule(current_queue, + config_, + clock_, + &rtcp_feedback_buffer_, + &rtcp_feedback_buffer_)), packet_buffer_(clock_, kPacketBufferStartSize, PacketBufferMaxSize()), has_received_frame_(false), frames_decryptable_(false), @@ -283,12 +292,6 @@ RtpVideoStreamReceiver2::RtpVideoStreamReceiver2( &rtcp_feedback_buffer_); } - if (config_.rtp.nack.rtp_history_ms != 0) { - nack_module_ = std::make_unique(clock_, &rtcp_feedback_buffer_, - &rtcp_feedback_buffer_); - process_thread_->RegisterModule(nack_module_.get(), RTC_FROM_HERE); - } - reference_finder_ = std::make_unique(this); @@ -313,10 +316,6 @@ RtpVideoStreamReceiver2::RtpVideoStreamReceiver2( RtpVideoStreamReceiver2::~RtpVideoStreamReceiver2() { RTC_DCHECK(secondary_sinks_.empty()); - if (nack_module_) { - process_thread_->DeRegisterModule(nack_module_.get()); - } - process_thread_->DeRegisterModule(rtp_rtcp_.get()); if (packet_router_) @@ -330,6 +329,7 @@ void RtpVideoStreamReceiver2::AddReceiveCodec( const VideoCodec& video_codec, const std::map& codec_params, bool raw_payload) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); payload_type_map_.emplace( video_codec.plType, raw_payload ? std::make_unique() @@ -338,20 +338,19 @@ void RtpVideoStreamReceiver2::AddReceiveCodec( } absl::optional RtpVideoStreamReceiver2::GetSyncInfo() const { + RTC_DCHECK_RUN_ON(&worker_task_checker_); Syncable::Info info; if (rtp_rtcp_->RemoteNTP(&info.capture_time_ntp_secs, &info.capture_time_ntp_frac, nullptr, nullptr, &info.capture_time_source_clock) != 0) { return absl::nullopt; } - { - rtc::CritScope lock(&sync_info_lock_); - if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { - return absl::nullopt; - } - info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; - info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; + + if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { + return absl::nullopt; } + info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; + info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; // Leaves info.current_delay_ms uninitialized. return info; @@ -637,11 +636,10 @@ void RtpVideoStreamReceiver2::OnRtpPacket(const RtpPacketReceived& packet) { if (!packet.recovered()) { // TODO(nisse): Exclude out-of-order packets? int64_t now_ms = clock_->TimeInMilliseconds(); - { - rtc::CritScope cs(&sync_info_lock_); - last_received_rtp_timestamp_ = packet.Timestamp(); - last_received_rtp_system_time_ms_ = now_ms; - } + + last_received_rtp_timestamp_ = packet.Timestamp(); + last_received_rtp_system_time_ms_ = now_ms; + // Periodically log the RTP header of incoming packets. if (now_ms - last_packet_log_ms_ > kPacketLogIntervalMs) { rtc::StringBuilder ss; @@ -678,6 +676,7 @@ void RtpVideoStreamReceiver2::OnRtpPacket(const RtpPacketReceived& packet) { } void RtpVideoStreamReceiver2::RequestKeyFrame() { + RTC_DCHECK_RUN_ON(&worker_task_checker_); // TODO(bugs.webrtc.org/10336): Allow the sender to ignore key frame requests // issued by anything other than the LossNotificationController if it (the // sender) is relying on LNTF alone. @@ -708,15 +707,18 @@ bool RtpVideoStreamReceiver2::IsRetransmissionsEnabled() const { void RtpVideoStreamReceiver2::RequestPacketRetransmit( const std::vector& sequence_numbers) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); rtp_rtcp_->SendNack(sequence_numbers); } bool RtpVideoStreamReceiver2::IsDecryptable() const { - return frames_decryptable_.load(); + RTC_DCHECK_RUN_ON(&worker_task_checker_); + return frames_decryptable_; } void RtpVideoStreamReceiver2::OnInsertedPacket( video_coding::PacketBuffer::InsertResult result) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); video_coding::PacketBuffer::Packet* first_packet = nullptr; int max_nack_count; int64_t min_recv_time; @@ -787,7 +789,7 @@ void RtpVideoStreamReceiver2::OnInsertedPacket( void RtpVideoStreamReceiver2::OnAssembledFrame( std::unique_ptr frame) { - RTC_DCHECK_RUN_ON(&network_tc_); + RTC_DCHECK_RUN_ON(&worker_task_checker_); RTC_DCHECK(frame); const absl::optional& descriptor = @@ -815,7 +817,6 @@ void RtpVideoStreamReceiver2::OnAssembledFrame( has_received_frame_ = true; } - rtc::CritScope lock(&reference_finder_lock_); // Reset |reference_finder_| if |frame| is new and the codec have changed. if (current_codec_) { bool frame_is_newer = @@ -857,13 +858,12 @@ void RtpVideoStreamReceiver2::OnAssembledFrame( void RtpVideoStreamReceiver2::OnCompleteFrame( std::unique_ptr frame) { - { - rtc::CritScope lock(&last_seq_num_cs_); - video_coding::RtpFrameObject* rtp_frame = - static_cast(frame.get()); - last_seq_num_for_pic_id_[rtp_frame->id.picture_id] = - rtp_frame->last_seq_num(); - } + RTC_DCHECK_RUN_ON(&worker_task_checker_); + video_coding::RtpFrameObject* rtp_frame = + static_cast(frame.get()); + last_seq_num_for_pic_id_[rtp_frame->id.picture_id] = + rtp_frame->last_seq_num(); + last_completed_picture_id_ = std::max(last_completed_picture_id_, frame->id.picture_id); complete_frame_callback_->OnCompleteFrame(std::move(frame)); @@ -871,20 +871,22 @@ void RtpVideoStreamReceiver2::OnCompleteFrame( void RtpVideoStreamReceiver2::OnDecryptedFrame( std::unique_ptr frame) { - rtc::CritScope lock(&reference_finder_lock_); + RTC_DCHECK_RUN_ON(&worker_task_checker_); reference_finder_->ManageFrame(std::move(frame)); } void RtpVideoStreamReceiver2::OnDecryptionStatusChange( FrameDecryptorInterface::Status status) { - frames_decryptable_.store( + RTC_DCHECK_RUN_ON(&worker_task_checker_); + // Called from BufferedFrameDecryptor::DecryptFrame. + frames_decryptable_ = (status == FrameDecryptorInterface::Status::kOk) || - (status == FrameDecryptorInterface::Status::kRecoverable)); + (status == FrameDecryptorInterface::Status::kRecoverable); } void RtpVideoStreamReceiver2::SetFrameDecryptor( rtc::scoped_refptr frame_decryptor) { - RTC_DCHECK_RUN_ON(&network_tc_); + RTC_DCHECK_RUN_ON(&worker_task_checker_); if (buffered_frame_decryptor_ == nullptr) { buffered_frame_decryptor_ = std::make_unique(this, this); @@ -894,7 +896,7 @@ void RtpVideoStreamReceiver2::SetFrameDecryptor( void RtpVideoStreamReceiver2::SetDepacketizerToDecoderFrameTransformer( rtc::scoped_refptr frame_transformer) { - RTC_DCHECK_RUN_ON(&network_tc_); + RTC_DCHECK_RUN_ON(&worker_task_checker_); frame_transformer_delegate_ = new rtc::RefCountedObject( this, std::move(frame_transformer), rtc::Thread::Current(), @@ -903,6 +905,7 @@ void RtpVideoStreamReceiver2::SetDepacketizerToDecoderFrameTransformer( } void RtpVideoStreamReceiver2::UpdateRtt(int64_t max_rtt_ms) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); if (nack_module_) nack_module_->UpdateRtt(max_rtt_ms); } @@ -938,11 +941,12 @@ void RtpVideoStreamReceiver2::RemoveSecondarySink( void RtpVideoStreamReceiver2::ManageFrame( std::unique_ptr frame) { - rtc::CritScope lock(&reference_finder_lock_); + RTC_DCHECK_RUN_ON(&worker_task_checker_); reference_finder_->ManageFrame(std::move(frame)); } void RtpVideoStreamReceiver2::ReceivePacket(const RtpPacketReceived& packet) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); if (packet.payload_size() == 0) { // Padding or keep-alive packet. // TODO(nisse): Could drop empty packets earlier, but need to figure out how @@ -992,10 +996,10 @@ void RtpVideoStreamReceiver2::ParseAndHandleEncapsulatingHeader( // RtpFrameReferenceFinder will need to know about padding to // correctly calculate frame references. void RtpVideoStreamReceiver2::NotifyReceiverOfEmptyPacket(uint16_t seq_num) { - { - rtc::CritScope lock(&reference_finder_lock_); - reference_finder_->PaddingReceived(seq_num); - } + RTC_DCHECK_RUN_ON(&worker_task_checker_); + + reference_finder_->PaddingReceived(seq_num); + OnInsertedPacket(packet_buffer_.InsertPadding(seq_num)); if (nack_module_) { nack_module_->OnReceivedPacket(seq_num, /* is_keyframe = */ false, @@ -1052,39 +1056,37 @@ bool RtpVideoStreamReceiver2::DeliverRtcp(const uint8_t* rtcp_packet, } void RtpVideoStreamReceiver2::FrameContinuous(int64_t picture_id) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); if (!nack_module_) return; int seq_num = -1; - { - rtc::CritScope lock(&last_seq_num_cs_); - auto seq_num_it = last_seq_num_for_pic_id_.find(picture_id); - if (seq_num_it != last_seq_num_for_pic_id_.end()) - seq_num = seq_num_it->second; - } + auto seq_num_it = last_seq_num_for_pic_id_.find(picture_id); + if (seq_num_it != last_seq_num_for_pic_id_.end()) + seq_num = seq_num_it->second; if (seq_num != -1) nack_module_->ClearUpTo(seq_num); } void RtpVideoStreamReceiver2::FrameDecoded(int64_t picture_id) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); + // Running on the decoder thread. int seq_num = -1; - { - rtc::CritScope lock(&last_seq_num_cs_); - auto seq_num_it = last_seq_num_for_pic_id_.find(picture_id); - if (seq_num_it != last_seq_num_for_pic_id_.end()) { - seq_num = seq_num_it->second; - last_seq_num_for_pic_id_.erase(last_seq_num_for_pic_id_.begin(), - ++seq_num_it); - } + auto seq_num_it = last_seq_num_for_pic_id_.find(picture_id); + if (seq_num_it != last_seq_num_for_pic_id_.end()) { + seq_num = seq_num_it->second; + last_seq_num_for_pic_id_.erase(last_seq_num_for_pic_id_.begin(), + ++seq_num_it); } + if (seq_num != -1) { packet_buffer_.ClearTo(seq_num); - rtc::CritScope lock(&reference_finder_lock_); reference_finder_->ClearTo(seq_num); } } void RtpVideoStreamReceiver2::SignalNetworkState(NetworkState state) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); rtp_rtcp_->SetRTCPStatus(state == kNetworkUp ? config_.rtp.rtcp_mode : RtcpMode::kOff); } @@ -1127,6 +1129,8 @@ void RtpVideoStreamReceiver2::UpdateHistograms() { } void RtpVideoStreamReceiver2::InsertSpsPpsIntoTracker(uint8_t payload_type) { + RTC_DCHECK_RUN_ON(&worker_task_checker_); + auto codec_params_it = pt_codec_params_.find(payload_type); if (codec_params_it == pt_codec_params_.end()) return; diff --git a/video/rtp_video_stream_receiver2.h b/video/rtp_video_stream_receiver2.h index 3026e1dac3..287bb4fd41 100644 --- a/video/rtp_video_stream_receiver2.h +++ b/video/rtp_video_stream_receiver2.h @@ -11,15 +11,12 @@ #ifndef VIDEO_RTP_VIDEO_STREAM_RECEIVER2_H_ #define VIDEO_RTP_VIDEO_STREAM_RECEIVER2_H_ -#include -#include #include #include #include #include #include "absl/types/optional.h" -#include "api/array_view.h" #include "api/crypto/frame_decryptor_interface.h" #include "api/video/color_space.h" #include "api/video_codecs/video_codec.h" @@ -42,12 +39,10 @@ #include "modules/video_coding/rtp_frame_reference_finder.h" #include "modules/video_coding/unique_timestamp_counter.h" #include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" #include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/numerics/sequence_number_util.h" #include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/thread_annotations.h" -#include "rtc_base/thread_checker.h" #include "video/buffered_frame_decryptor.h" #include "video/rtp_video_stream_receiver_frame_transformer_delegate.h" @@ -72,6 +67,7 @@ class RtpVideoStreamReceiver2 : public LossNotificationSender, public RtpVideoFrameReceiver { public: RtpVideoStreamReceiver2( + TaskQueueBase* current_queue, Clock* clock, Transport* transport, RtcpRttStats* rtt_stats, @@ -206,21 +202,20 @@ class RtpVideoStreamReceiver2 : public LossNotificationSender, ~RtcpFeedbackBuffer() override = default; // KeyFrameRequestSender implementation. - void RequestKeyFrame() RTC_LOCKS_EXCLUDED(cs_) override; + void RequestKeyFrame() override; // NackSender implementation. void SendNack(const std::vector& sequence_numbers, - bool buffering_allowed) RTC_LOCKS_EXCLUDED(cs_) override; + bool buffering_allowed) override; // LossNotificationSender implementation. void SendLossNotification(uint16_t last_decoded_seq_num, uint16_t last_received_seq_num, bool decodability_flag, - bool buffering_allowed) - RTC_LOCKS_EXCLUDED(cs_) override; + bool buffering_allowed) override; // Send all RTCP feedback messages buffered thus far. - void SendBufferedRtcpFeedback() RTC_LOCKS_EXCLUDED(cs_); + void SendBufferedRtcpFeedback(); private: // LNTF-related state. @@ -236,32 +231,21 @@ class RtpVideoStreamReceiver2 : public LossNotificationSender, uint16_t last_received_seq_num; bool decodability_flag; }; - struct ConsumedRtcpFeedback { - bool request_key_frame = false; - std::vector nack_sequence_numbers; - absl::optional lntf_state; - }; - - ConsumedRtcpFeedback ConsumeRtcpFeedback() RTC_LOCKS_EXCLUDED(cs_); - ConsumedRtcpFeedback ConsumeRtcpFeedbackLocked() - RTC_EXCLUSIVE_LOCKS_REQUIRED(cs_); - // This method is called both with and without cs_ held. - void SendRtcpFeedback(ConsumedRtcpFeedback feedback); + SequenceChecker worker_task_checker_; KeyFrameRequestSender* const key_frame_request_sender_; NackSender* const nack_sender_; LossNotificationSender* const loss_notification_sender_; - // NACKs are accessible from two threads due to nack_module_ being a module. - rtc::CriticalSection cs_; - // Key-frame-request-related state. - bool request_key_frame_ RTC_GUARDED_BY(cs_); + bool request_key_frame_ RTC_GUARDED_BY(worker_task_checker_); // NACK-related state. - std::vector nack_sequence_numbers_ RTC_GUARDED_BY(cs_); + std::vector nack_sequence_numbers_ + RTC_GUARDED_BY(worker_task_checker_); - absl::optional lntf_state_ RTC_GUARDED_BY(cs_); + absl::optional lntf_state_ + RTC_GUARDED_BY(worker_task_checker_); }; enum ParseGenericDependenciesResult { kDropPacket, @@ -311,7 +295,7 @@ class RtpVideoStreamReceiver2 : public LossNotificationSender, KeyFrameRequestSender* const keyframe_request_sender_; RtcpFeedbackBuffer rtcp_feedback_buffer_; - std::unique_ptr nack_module_; + const std::unique_ptr nack_module_; std::unique_ptr loss_notification_controller_; video_coding::PacketBuffer packet_buffer_; @@ -329,47 +313,43 @@ class RtpVideoStreamReceiver2 : public LossNotificationSender, absl::optional video_structure_frame_id_ RTC_GUARDED_BY(worker_task_checker_); - rtc::CriticalSection reference_finder_lock_; std::unique_ptr reference_finder_ - RTC_GUARDED_BY(reference_finder_lock_); - absl::optional current_codec_; - uint32_t last_assembled_frame_rtp_timestamp_; + RTC_GUARDED_BY(worker_task_checker_); + absl::optional current_codec_ + RTC_GUARDED_BY(worker_task_checker_); + uint32_t last_assembled_frame_rtp_timestamp_ + RTC_GUARDED_BY(worker_task_checker_); - rtc::CriticalSection last_seq_num_cs_; std::map last_seq_num_for_pic_id_ - RTC_GUARDED_BY(last_seq_num_cs_); - video_coding::H264SpsPpsTracker tracker_; + RTC_GUARDED_BY(worker_task_checker_); + video_coding::H264SpsPpsTracker tracker_ RTC_GUARDED_BY(worker_task_checker_); // Maps payload id to the depacketizer. - std::map> payload_type_map_; + std::map> payload_type_map_ + RTC_GUARDED_BY(worker_task_checker_); // TODO(johan): Remove pt_codec_params_ once // https://bugs.chromium.org/p/webrtc/issues/detail?id=6883 is resolved. // Maps a payload type to a map of out-of-band supplied codec parameters. - std::map> pt_codec_params_; - int16_t last_payload_type_ = -1; + std::map> pt_codec_params_ + RTC_GUARDED_BY(worker_task_checker_); + int16_t last_payload_type_ RTC_GUARDED_BY(worker_task_checker_) = -1; - bool has_received_frame_; + bool has_received_frame_ RTC_GUARDED_BY(worker_task_checker_); std::vector secondary_sinks_ RTC_GUARDED_BY(worker_task_checker_); - // Info for GetSyncInfo is updated on network or worker thread, and queried on - // the worker thread. - rtc::CriticalSection sync_info_lock_; absl::optional last_received_rtp_timestamp_ - RTC_GUARDED_BY(sync_info_lock_); + RTC_GUARDED_BY(worker_task_checker_); absl::optional last_received_rtp_system_time_ms_ - RTC_GUARDED_BY(sync_info_lock_); + RTC_GUARDED_BY(worker_task_checker_); - // Used to validate the buffered frame decryptor is always run on the correct - // thread. - rtc::ThreadChecker network_tc_; // Handles incoming encrypted frames and forwards them to the // rtp_reference_finder if they are decryptable. std::unique_ptr buffered_frame_decryptor_ - RTC_PT_GUARDED_BY(network_tc_); - std::atomic frames_decryptable_; + RTC_PT_GUARDED_BY(worker_task_checker_); + bool frames_decryptable_ RTC_GUARDED_BY(worker_task_checker_); absl::optional last_color_space_; AbsoluteCaptureTimeReceiver absolute_capture_time_receiver_ diff --git a/video/rtp_video_stream_receiver2_unittest.cc b/video/rtp_video_stream_receiver2_unittest.cc index d8784e7d45..57fba8f9cf 100644 --- a/video/rtp_video_stream_receiver2_unittest.cc +++ b/video/rtp_video_stream_receiver2_unittest.cc @@ -173,10 +173,11 @@ class RtpVideoStreamReceiver2Test : public ::testing::Test { rtp_receive_statistics_ = ReceiveStatistics::Create(Clock::GetRealTimeClock()); rtp_video_stream_receiver_ = std::make_unique( - Clock::GetRealTimeClock(), &mock_transport_, nullptr, nullptr, &config_, - rtp_receive_statistics_.get(), nullptr, nullptr, process_thread_.get(), - &mock_nack_sender_, &mock_key_frame_request_sender_, - &mock_on_complete_frame_callback_, nullptr, nullptr); + TaskQueueBase::Current(), Clock::GetRealTimeClock(), &mock_transport_, + nullptr, nullptr, &config_, rtp_receive_statistics_.get(), nullptr, + nullptr, process_thread_.get(), &mock_nack_sender_, + &mock_key_frame_request_sender_, &mock_on_complete_frame_callback_, + nullptr, nullptr); VideoCodec codec; codec.plType = kPayloadType; codec.codecType = kVideoCodecGeneric; @@ -1131,10 +1132,10 @@ TEST_F(RtpVideoStreamReceiver2Test, TransformFrame) { EXPECT_CALL(*mock_frame_transformer, RegisterTransformedFrameSinkCallback(_, config_.rtp.remote_ssrc)); auto receiver = std::make_unique( - Clock::GetRealTimeClock(), &mock_transport_, nullptr, nullptr, &config_, - rtp_receive_statistics_.get(), nullptr, nullptr, process_thread_.get(), - &mock_nack_sender_, nullptr, &mock_on_complete_frame_callback_, nullptr, - mock_frame_transformer); + TaskQueueBase::Current(), Clock::GetRealTimeClock(), &mock_transport_, + nullptr, nullptr, &config_, rtp_receive_statistics_.get(), nullptr, + nullptr, process_thread_.get(), &mock_nack_sender_, nullptr, + &mock_on_complete_frame_callback_, nullptr, mock_frame_transformer); VideoCodec video_codec; video_codec.plType = kPayloadType; video_codec.codecType = kVideoCodecGeneric; diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index 1470123cd5..9413b72354 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -201,7 +201,8 @@ VideoReceiveStream2::VideoReceiveStream2( rtp_receive_statistics_(ReceiveStatistics::Create(clock_)), timing_(timing), video_receiver_(clock_, timing_.get()), - rtp_video_stream_receiver_(clock_, + rtp_video_stream_receiver_(worker_thread_, + clock_, &transport_adapter_, call_stats->AsRtcpRttStats(), packet_router, @@ -232,7 +233,6 @@ VideoReceiveStream2::VideoReceiveStream2( RTC_DCHECK(call_stats_); module_process_sequence_checker_.Detach(); - network_sequence_checker_.Detach(); RTC_DCHECK(!config_.decoders.empty()); std::set decoder_payload_types; @@ -472,8 +472,6 @@ bool VideoReceiveStream2::SetBaseMinimumPlayoutDelayMs(int delay_ms) { return false; } - // TODO(bugs.webrtc.org/11489): Consider posting to worker. - rtc::CritScope cs(&playout_delay_lock_); base_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); return true; @@ -481,8 +479,6 @@ bool VideoReceiveStream2::SetBaseMinimumPlayoutDelayMs(int delay_ms) { int VideoReceiveStream2::GetBaseMinimumPlayoutDelayMs() const { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); - - rtc::CritScope cs(&playout_delay_lock_); return base_minimum_playout_delay_ms_; } @@ -522,18 +518,26 @@ void VideoReceiveStream2::SetDepacketizerToDecoderFrameTransformer( void VideoReceiveStream2::SendNack( const std::vector& sequence_numbers, bool buffering_allowed) { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); RTC_DCHECK(buffering_allowed); rtp_video_stream_receiver_.RequestPacketRetransmit(sequence_numbers); } void VideoReceiveStream2::RequestKeyFrame(int64_t timestamp_ms) { + // Running on worker_sequence_checker_. + // Called from RtpVideoStreamReceiver (rtp_video_stream_receiver_ is + // ultimately responsible). rtp_video_stream_receiver_.RequestKeyFrame(); - last_keyframe_request_ms_ = timestamp_ms; + decode_queue_.PostTask([this, timestamp_ms]() { + RTC_DCHECK_RUN_ON(&decode_queue_); + last_keyframe_request_ms_ = timestamp_ms; + }); } void VideoReceiveStream2::OnCompleteFrame( std::unique_ptr frame) { - RTC_DCHECK_RUN_ON(&network_sequence_checker_); + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + // TODO(https://bugs.webrtc.org/9974): Consider removing this workaround. int64_t time_now_ms = clock_->TimeInMilliseconds(); if (last_complete_frame_time_ms_ > 0 && @@ -542,19 +546,13 @@ void VideoReceiveStream2::OnCompleteFrame( } last_complete_frame_time_ms_ = time_now_ms; - // TODO(bugs.webrtc.org/11489): We grab the playout_delay_lock_ lock - // potentially twice. Consider checking both min/max and posting to worker if - // there's a change. If we always update playout delays on the worker, we - // don't need a lock. const PlayoutDelay& playout_delay = frame->EncodedImage().playout_delay_; if (playout_delay.min_ms >= 0) { - rtc::CritScope cs(&playout_delay_lock_); frame_minimum_playout_delay_ms_ = playout_delay.min_ms; UpdatePlayoutDelays(); } if (playout_delay.max_ms >= 0) { - rtc::CritScope cs(&playout_delay_lock_); frame_maximum_playout_delay_ms_ = playout_delay.max_ms; UpdatePlayoutDelays(); } @@ -602,22 +600,20 @@ void VideoReceiveStream2::SetEstimatedPlayoutNtpTimestampMs( void VideoReceiveStream2::SetMinimumPlayoutDelay(int delay_ms) { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); - // TODO(bugs.webrtc.org/11489): See if we can't get rid of the - // |playout_delay_lock_| - rtc::CritScope cs(&playout_delay_lock_); syncable_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); } -int64_t VideoReceiveStream2::GetWaitMs() const { +int64_t VideoReceiveStream2::GetMaxWaitMs() const { return keyframe_required_ ? max_wait_for_keyframe_ms_ : max_wait_for_frame_ms_; } void VideoReceiveStream2::StartNextDecode() { + // Running on the decode thread. TRACE_EVENT0("webrtc", "VideoReceiveStream2::StartNextDecode"); frame_buffer_->NextFrame( - GetWaitMs(), keyframe_required_, &decode_queue_, + GetMaxWaitMs(), keyframe_required_, &decode_queue_, /* encoded frame handler */ [this](std::unique_ptr frame, ReturnReason res) { RTC_DCHECK_EQ(frame == nullptr, res == ReturnReason::kTimeout); @@ -629,7 +625,12 @@ void VideoReceiveStream2::StartNextDecode() { if (frame) { HandleEncodedFrame(std::move(frame)); } else { - HandleFrameBufferTimeout(); + int64_t now_ms = clock_->TimeInMilliseconds(); + worker_thread_->PostTask(ToQueuedTask( + task_safety_, [this, now_ms, wait_ms = GetMaxWaitMs()]() { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + HandleFrameBufferTimeout(now_ms, wait_ms); + })); } StartNextDecode(); }); @@ -649,25 +650,48 @@ void VideoReceiveStream2::HandleEncodedFrame( } } stats_proxy_.OnPreDecode(frame->CodecSpecific()->codecType, qp); - HandleKeyFrameGeneration(frame->FrameType() == VideoFrameType::kVideoFrameKey, - now_ms); + + bool force_request_key_frame = false; + int64_t decoded_frame_picture_id = -1; + + const bool keyframe_request_is_due = + now_ms >= (last_keyframe_request_ms_ + max_wait_for_keyframe_ms_); + int decode_result = video_receiver_.Decode(frame.get()); if (decode_result == WEBRTC_VIDEO_CODEC_OK || decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) { keyframe_required_ = false; frame_decoded_ = true; - rtp_video_stream_receiver_.FrameDecoded(frame->id.picture_id); + + decoded_frame_picture_id = frame->id.picture_id; if (decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) - RequestKeyFrame(now_ms); + force_request_key_frame = true; } else if (!frame_decoded_ || !keyframe_required_ || - (last_keyframe_request_ms_ + max_wait_for_keyframe_ms_ < now_ms)) { + keyframe_request_is_due) { keyframe_required_ = true; // TODO(philipel): Remove this keyframe request when downstream project // has been fixed. - RequestKeyFrame(now_ms); + force_request_key_frame = true; } + bool received_frame_is_keyframe = + frame->FrameType() == VideoFrameType::kVideoFrameKey; + + worker_thread_->PostTask(ToQueuedTask( + task_safety_, + [this, now_ms, received_frame_is_keyframe, force_request_key_frame, + decoded_frame_picture_id, keyframe_request_is_due]() { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + + if (decoded_frame_picture_id != -1) + rtp_video_stream_receiver_.FrameDecoded(decoded_frame_picture_id); + + HandleKeyFrameGeneration(received_frame_is_keyframe, now_ms, + force_request_key_frame, + keyframe_request_is_due); + })); + if (encoded_frame_buffer_function_) { frame->Retain(); encoded_frame_buffer_function_(WebRtcRecordableEncodedFrame(*frame)); @@ -676,48 +700,58 @@ void VideoReceiveStream2::HandleEncodedFrame( void VideoReceiveStream2::HandleKeyFrameGeneration( bool received_frame_is_keyframe, - int64_t now_ms) { + int64_t now_ms, + bool always_request_key_frame, + bool keyframe_request_is_due) { + // Running on worker_sequence_checker_. + + bool request_key_frame = always_request_key_frame; + // Repeat sending keyframe requests if we've requested a keyframe. - if (!keyframe_generation_requested_) { - return; - } - if (received_frame_is_keyframe) { - keyframe_generation_requested_ = false; - } else if (last_keyframe_request_ms_ + max_wait_for_keyframe_ms_ <= now_ms) { - if (!IsReceivingKeyFrame(now_ms)) { - RequestKeyFrame(now_ms); + if (keyframe_generation_requested_) { + if (received_frame_is_keyframe) { + keyframe_generation_requested_ = false; + } else if (keyframe_request_is_due) { + if (!IsReceivingKeyFrame(now_ms)) { + request_key_frame = true; + } + } else { + // It hasn't been long enough since the last keyframe request, do nothing. } - } else { - // It hasn't been long enough since the last keyframe request, do nothing. + } + + if (request_key_frame) { + // HandleKeyFrameGeneration is initated from the decode thread - + // RequestKeyFrame() triggers a call back to the decode thread. + // Perhaps there's a way to avoid that. + RequestKeyFrame(now_ms); } } -void VideoReceiveStream2::HandleFrameBufferTimeout() { - // Running on |decode_queue_|. - int64_t now_ms = clock_->TimeInMilliseconds(); +void VideoReceiveStream2::HandleFrameBufferTimeout(int64_t now_ms, + int64_t wait_ms) { + // Running on |worker_sequence_checker_|. absl::optional last_packet_ms = rtp_video_stream_receiver_.LastReceivedPacketMs(); // To avoid spamming keyframe requests for a stream that is not active we // check if we have received a packet within the last 5 seconds. - bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000; - if (!stream_is_active) { - worker_thread_->PostTask(ToQueuedTask(task_safety_, [this]() { - RTC_DCHECK_RUN_ON(&worker_sequence_checker_); - stats_proxy_.OnStreamInactive(); - })); - } + const bool stream_is_active = + last_packet_ms && now_ms - *last_packet_ms < 5000; + if (!stream_is_active) + stats_proxy_.OnStreamInactive(); if (stream_is_active && !IsReceivingKeyFrame(now_ms) && (!config_.crypto_options.sframe.require_frame_encryption || rtp_video_stream_receiver_.IsDecryptable())) { - RTC_LOG(LS_WARNING) << "No decodable frame in " << GetWaitMs() + RTC_LOG(LS_WARNING) << "No decodable frame in " << wait_ms << " ms, requesting keyframe."; RequestKeyFrame(now_ms); } } bool VideoReceiveStream2::IsReceivingKeyFrame(int64_t timestamp_ms) const { + // Running on worker_sequence_checker_. absl::optional last_keyframe_packet_ms = rtp_video_stream_receiver_.LastReceivedKeyframePacketMs(); @@ -730,6 +764,7 @@ bool VideoReceiveStream2::IsReceivingKeyFrame(int64_t timestamp_ms) const { } void VideoReceiveStream2::UpdatePlayoutDelays() const { + // Running on worker_sequence_checker_. const int minimum_delay_ms = std::max({frame_minimum_playout_delay_ms_, base_minimum_playout_delay_ms_, syncable_minimum_playout_delay_ms_}); @@ -752,36 +787,43 @@ VideoReceiveStream2::SetAndGetRecordingState(RecordingState state, bool generate_key_frame) { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); rtc::Event event; - RecordingState old_state; - decode_queue_.PostTask([this, &event, &old_state, generate_key_frame, - state = std::move(state)] { - RTC_DCHECK_RUN_ON(&decode_queue_); - // Save old state. - old_state.callback = std::move(encoded_frame_buffer_function_); - old_state.keyframe_needed = keyframe_generation_requested_; - old_state.last_keyframe_request_ms = last_keyframe_request_ms_; - // Set new state. - encoded_frame_buffer_function_ = std::move(state.callback); - if (generate_key_frame) { - RequestKeyFrame(clock_->TimeInMilliseconds()); - keyframe_generation_requested_ = true; - } else { - keyframe_generation_requested_ = state.keyframe_needed; - last_keyframe_request_ms_ = state.last_keyframe_request_ms.value_or(0); - } - event.Set(); - }); + // Save old state, set the new state. + RecordingState old_state; + + decode_queue_.PostTask( + [this, &event, &old_state, callback = std::move(state.callback), + generate_key_frame, + last_keyframe_request = state.last_keyframe_request_ms.value_or(0)] { + RTC_DCHECK_RUN_ON(&decode_queue_); + old_state.callback = std::move(encoded_frame_buffer_function_); + encoded_frame_buffer_function_ = std::move(callback); + + old_state.last_keyframe_request_ms = last_keyframe_request_ms_; + last_keyframe_request_ms_ = generate_key_frame + ? clock_->TimeInMilliseconds() + : last_keyframe_request; + + event.Set(); + }); + + old_state.keyframe_needed = keyframe_generation_requested_; + + if (generate_key_frame) { + rtp_video_stream_receiver_.RequestKeyFrame(); + keyframe_generation_requested_ = true; + } else { + keyframe_generation_requested_ = state.keyframe_needed; + } + event.Wait(rtc::Event::kForever); return old_state; } void VideoReceiveStream2::GenerateKeyFrame() { - decode_queue_.PostTask([this]() { - RTC_DCHECK_RUN_ON(&decode_queue_); - RequestKeyFrame(clock_->TimeInMilliseconds()); - keyframe_generation_requested_ = true; - }); + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + RequestKeyFrame(clock_->TimeInMilliseconds()); + keyframe_generation_requested_ = true; } } // namespace internal diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index 0eab5dd293..ef0a002cbe 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -27,6 +27,7 @@ #include "modules/video_coding/video_receiver2.h" #include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "system_wrappers/include/clock.h" #include "video/receive_statistics_proxy2.h" #include "video/rtp_streams_synchronizer2.h" @@ -158,24 +159,28 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, void GenerateKeyFrame() override; private: - int64_t GetWaitMs() const; + int64_t GetMaxWaitMs() const RTC_RUN_ON(decode_queue_); void StartNextDecode() RTC_RUN_ON(decode_queue_); void HandleEncodedFrame(std::unique_ptr frame) RTC_RUN_ON(decode_queue_); - void HandleFrameBufferTimeout() RTC_RUN_ON(decode_queue_); + void HandleFrameBufferTimeout(int64_t now_ms, int64_t wait_ms) + RTC_RUN_ON(worker_sequence_checker_); void UpdatePlayoutDelays() const - RTC_EXCLUSIVE_LOCKS_REQUIRED(playout_delay_lock_); - void RequestKeyFrame(int64_t timestamp_ms) RTC_RUN_ON(decode_queue_); - void HandleKeyFrameGeneration(bool received_frame_is_keyframe, int64_t now_ms) - RTC_RUN_ON(decode_queue_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_sequence_checker_); + void RequestKeyFrame(int64_t timestamp_ms) + RTC_RUN_ON(worker_sequence_checker_); + void HandleKeyFrameGeneration(bool received_frame_is_keyframe, + int64_t now_ms, + bool always_request_key_frame, + bool keyframe_request_is_due) + RTC_RUN_ON(worker_sequence_checker_); bool IsReceivingKeyFrame(int64_t timestamp_ms) const - RTC_RUN_ON(decode_queue_); + RTC_RUN_ON(worker_sequence_checker_); void UpdateHistograms(); SequenceChecker worker_sequence_checker_; SequenceChecker module_process_sequence_checker_; - SequenceChecker network_sequence_checker_; TaskQueueFactory* const task_queue_factory_; @@ -216,40 +221,43 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, // Whenever we are in an undecodable state (stream has just started or due to // a decoding error) we require a keyframe to restart the stream. - bool keyframe_required_ = true; + bool keyframe_required_ RTC_GUARDED_BY(decode_queue_) = true; // If we have successfully decoded any frame. - bool frame_decoded_ = false; + bool frame_decoded_ RTC_GUARDED_BY(decode_queue_) = false; - int64_t last_keyframe_request_ms_ = 0; - int64_t last_complete_frame_time_ms_ = 0; + int64_t last_keyframe_request_ms_ RTC_GUARDED_BY(decode_queue_) = 0; + int64_t last_complete_frame_time_ms_ + RTC_GUARDED_BY(worker_sequence_checker_) = 0; // Keyframe request intervals are configurable through field trials. const int max_wait_for_keyframe_ms_; const int max_wait_for_frame_ms_; - rtc::CriticalSection playout_delay_lock_; - // All of them tries to change current min_playout_delay on |timing_| but // source of the change request is different in each case. Among them the // biggest delay is used. -1 means use default value from the |timing_|. // // Minimum delay as decided by the RTP playout delay extension. - int frame_minimum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1; - // Minimum delay as decided by the setLatency function in "webrtc/api". - int base_minimum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1; - // Minimum delay as decided by the A/V synchronization feature. - int syncable_minimum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = + int frame_minimum_playout_delay_ms_ RTC_GUARDED_BY(worker_sequence_checker_) = -1; + // Minimum delay as decided by the setLatency function in "webrtc/api". + int base_minimum_playout_delay_ms_ RTC_GUARDED_BY(worker_sequence_checker_) = + -1; + // Minimum delay as decided by the A/V synchronization feature. + int syncable_minimum_playout_delay_ms_ + RTC_GUARDED_BY(worker_sequence_checker_) = -1; // Maximum delay as decided by the RTP playout delay extension. - int frame_maximum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1; + int frame_maximum_playout_delay_ms_ RTC_GUARDED_BY(worker_sequence_checker_) = + -1; // Function that is triggered with encoded frames, if not empty. std::function encoded_frame_buffer_function_ RTC_GUARDED_BY(decode_queue_); // Set to true while we're requesting keyframes but not yet received one. - bool keyframe_generation_requested_ RTC_GUARDED_BY(decode_queue_) = false; + bool keyframe_generation_requested_ RTC_GUARDED_BY(worker_sequence_checker_) = + false; // Defined last so they are destroyed before all other members. rtc::TaskQueue decode_queue_;