diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 20c7627d80..82b345c67a 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -22,7 +22,6 @@ #include "call/rtp_video_sender.h" #include "logging/rtc_event_log/events/rtc_event_route_change.h" #include "rtc_base/checks.h" -#include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/rate_limiter.h" #include "system_wrappers/include/field_trial.h" @@ -67,9 +66,9 @@ RtpTransportControllerSend::RtpTransportControllerSend( TaskQueueFactory* task_queue_factory) : clock_(clock), event_log_(event_log), - pacer_(clock, &packet_router_, event_log), bitrate_configurator_(bitrate_config), process_thread_(std::move(process_thread)), + pacer_(clock, &packet_router_, event_log, nullptr, process_thread_.get()), observer_(nullptr), controller_factory_override_(controller_factory), controller_factory_fallback_( @@ -96,13 +95,11 @@ RtpTransportControllerSend::RtpTransportControllerSend( pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps), DataRate::Zero()); - process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); process_thread_->Start(); } RtpTransportControllerSend::~RtpTransportControllerSend() { process_thread_->Stop(); - process_thread_->DeRegisterModule(&pacer_); } RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender( diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index bbf3e238e5..75e29e472a 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -144,10 +144,10 @@ class RtpTransportControllerSend final const FieldTrialBasedConfig trial_based_config_; PacketRouter packet_router_; std::vector> video_rtp_senders_; - PacedSender pacer_; RtpBitrateConfigurator bitrate_configurator_; std::map network_routes_; const std::unique_ptr process_thread_; + PacedSender pacer_; TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index bb39f1f553..30b1b244a9 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -85,6 +85,7 @@ if (rtc_include_tests) { ":pacing", "../../api/units:data_rate", "../../api/units:time_delta", + "../../modules/utility:mock_process_thread", "../../rtc_base:checks", "../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_base_tests_utils", diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 7891897d5f..de9a4205ad 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -18,6 +18,7 @@ #include "api/rtc_event_log/rtc_event_log.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" +#include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/clock.h" @@ -29,15 +30,22 @@ const float PacedSender::kDefaultPaceMultiplier = 2.5f; PacedSender::PacedSender(Clock* clock, PacketRouter* packet_router, RtcEventLog* event_log, - const WebRtcKeyValueConfig* field_trials) + const WebRtcKeyValueConfig* field_trials, + ProcessThread* process_thread) : pacing_controller_(clock, static_cast(this), event_log, field_trials), packet_router_(packet_router), - process_thread_(nullptr) {} + process_thread_(process_thread) { + if (process_thread_) + process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE); +} -PacedSender::~PacedSender() = default; +PacedSender::~PacedSender() { + if (process_thread_) + process_thread_->DeRegisterModule(&module_proxy_); +} void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) { rtc::CritScope cs(&critsect_); @@ -49,11 +57,11 @@ void PacedSender::Pause() { rtc::CritScope cs(&critsect_); pacing_controller_.Pause(); } - rtc::CritScope cs(&process_thread_lock_); + // Tell the process thread to call our TimeUntilNextProcess() method to get // a new (longer) estimate for when to call Process(). if (process_thread_) - process_thread_->WakeUp(this); + process_thread_->WakeUp(&module_proxy_); } void PacedSender::Resume() { @@ -61,11 +69,11 @@ void PacedSender::Resume() { rtc::CritScope cs(&critsect_); pacing_controller_.Resume(); } - rtc::CritScope cs(&process_thread_lock_); + // Tell the process thread to call our TimeUntilNextProcess() method to // refresh the estimate for when to call Process(). if (process_thread_) - process_thread_->WakeUp(this); + process_thread_->WakeUp(&module_proxy_); } void PacedSender::SetCongestionWindow(DataSize congestion_window_size) { @@ -146,8 +154,7 @@ void PacedSender::Process() { void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << process_thread; - rtc::CritScope cs(&process_thread_lock_); - process_thread_ = process_thread; + RTC_DCHECK(!process_thread || process_thread == process_thread_); } void PacedSender::SetQueueTimeLimit(TimeDelta limit) { diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 30fcdab4cf..34141d832a 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -39,6 +39,8 @@ namespace webrtc { class Clock; class RtcEventLog; +// TODO(bugs.webrtc.org/10937): Remove the inheritance from Module after +// updating dependencies. class PacedSender : public Module, public RtpPacketPacer, public RtpPacketSender, @@ -56,10 +58,13 @@ class PacedSender : public Module, // overshoots from the encoder. static const float kDefaultPaceMultiplier; + // TODO(bugs.webrtc.org/10937): Make the |process_thread| argument be non + // optional once all callers have been updated. PacedSender(Clock* clock, PacketRouter* packet_router, RtcEventLog* event_log, - const WebRtcKeyValueConfig* field_trials = nullptr); + const WebRtcKeyValueConfig* field_trials = nullptr, + ProcessThread* process_thread = nullptr); ~PacedSender() override; @@ -109,15 +114,23 @@ class PacedSender : public Module, // Below are methods specific to this implementation, such as things related // to module processing thread specifics or methods exposed for test. + private: // Methods implementing Module. + // TODO(bugs.webrtc.org/10937): Remove the inheritance from Module once all + // use of it has been cleared up. // Returns the number of milliseconds until the module want a worker thread // to call Process. int64_t TimeUntilNextProcess() override; + // TODO(bugs.webrtc.org/10937): Make this private (and non virtual) once + // dependencies have been updated to not call this via the PacedSender + // interface. + public: // Process any pending packets in the queue(s). void Process() override; + private: // Called when the prober is associated with a process thread. void ProcessThreadAttached(ProcessThread* process_thread) override; @@ -131,18 +144,29 @@ class PacedSender : public Module, std::vector> GeneratePadding( DataSize size) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + // Private implementation of Module to not expose those implementation details + // publicly and control when the class is registered/deregistered. + class ModuleProxy : public Module { + public: + explicit ModuleProxy(PacedSender* delegate) : delegate_(delegate) {} + + private: + int64_t TimeUntilNextProcess() override { + return delegate_->TimeUntilNextProcess(); + } + void Process() override { return delegate_->Process(); } + void ProcessThreadAttached(ProcessThread* process_thread) override { + return delegate_->ProcessThreadAttached(process_thread); + } + + PacedSender* const delegate_; + } module_proxy_{this}; + rtc::CriticalSection critsect_; PacingController pacing_controller_ RTC_GUARDED_BY(critsect_); PacketRouter* const packet_router_; - - // Lock to avoid race when attaching process thread. This can happen due to - // the Call class setting network state on RtpTransportControllerSend, which - // in turn calls Pause/Resume on Pacedsender, before actually starting the - // pacer process thread. If RtpTransportControllerSend is running on a task - // queue separate from the thread used by Call, this causes a race. - rtc::CriticalSection process_thread_lock_; - ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_); + ProcessThread* const process_thread_; }; } // namespace webrtc #endif // MODULES_PACING_PACED_SENDER_H_ diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index f4fca6cfb6..581ff20ce4 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -18,6 +18,7 @@ #include "absl/memory/memory.h" #include "modules/pacing/packet_router.h" +#include "modules/utility/include/mock/mock_process_thread.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/field_trial.h" #include "test/field_trial.h" @@ -26,6 +27,7 @@ using ::testing::_; using ::testing::Return; +using ::testing::SaveArg; namespace { constexpr uint32_t kAudioSsrc = 12345; @@ -76,7 +78,12 @@ std::unique_ptr BuildRtpPacket(RtpPacketToSend::Type type) { TEST(PacedSenderTest, PacesPackets) { SimulatedClock clock(0); MockCallback callback; - PacedSender pacer(&clock, &callback, nullptr, nullptr); + MockProcessThread process_thread; + Module* paced_module = nullptr; + EXPECT_CALL(process_thread, RegisterModule(_, _)) + .WillOnce(SaveArg<0>(&paced_module)); + PacedSender pacer(&clock, &callback, nullptr, nullptr, &process_thread); + EXPECT_CALL(process_thread, DeRegisterModule(paced_module)).Times(1); // Insert a number of packets, covering one second. static constexpr size_t kPacketsToSend = 42; @@ -88,7 +95,7 @@ TEST(PacedSenderTest, PacesPackets) { // Expect all of them to be sent. size_t packets_sent = 0; - clock.AdvanceTimeMilliseconds(pacer.TimeUntilNextProcess()); + clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess()); EXPECT_CALL(callback, SendPacket) .WillRepeatedly( [&](std::unique_ptr packet, @@ -97,8 +104,8 @@ TEST(PacedSenderTest, PacesPackets) { const Timestamp start_time = clock.CurrentTime(); while (packets_sent < kPacketsToSend) { - clock.AdvanceTimeMilliseconds(pacer.TimeUntilNextProcess()); - pacer.Process(); + clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess()); + paced_module->Process(); } // Packets should be sent over a period of close to 1s. Expect a little lower