diff --git a/call/call_factory.cc b/call/call_factory.cc index 6d489359d3..6b4f419742 100644 --- a/call/call_factory.cc +++ b/call/call_factory.cc @@ -78,8 +78,8 @@ Call* CallFactory::CreateCall(const Call::Config& config) { if (send_degradation_config || receive_degradation_config) { return new DegradedCall(std::unique_ptr(Call::Create(config)), - send_degradation_config, - receive_degradation_config); + send_degradation_config, receive_degradation_config, + config.task_queue_factory); } return Call::Create(config); diff --git a/call/degraded_call.cc b/call/degraded_call.cc index 4b71e86f9d..5a185d5665 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -17,64 +17,68 @@ namespace webrtc { -namespace { -constexpr int64_t kDoNothingProcessIntervalMs = 5000; -} // namespace - -FakeNetworkPipeModule::~FakeNetworkPipeModule() = default; - -FakeNetworkPipeModule::FakeNetworkPipeModule( +DegradedCall::FakeNetworkPipeOnTaskQueue::FakeNetworkPipeOnTaskQueue( + TaskQueueFactory* task_queue_factory, Clock* clock, std::unique_ptr network_behavior, Transport* transport) - : pipe_(clock, std::move(network_behavior), transport) {} + : clock_(clock), + task_queue_(task_queue_factory->CreateTaskQueue( + "DegradedSendQueue", + TaskQueueFactory::Priority::NORMAL)), + pipe_(clock, std::move(network_behavior), transport) {} -void FakeNetworkPipeModule::SendRtp(const uint8_t* packet, - size_t length, - const PacketOptions& options) { +void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtp( + const uint8_t* packet, + size_t length, + const PacketOptions& options) { pipe_.SendRtp(packet, length, options); - MaybeResumeProcess(); + Process(); } -void FakeNetworkPipeModule::SendRtcp(const uint8_t* packet, size_t length) { +void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtcp(const uint8_t* packet, + size_t length) { pipe_.SendRtcp(packet, length); - MaybeResumeProcess(); + Process(); } -void FakeNetworkPipeModule::MaybeResumeProcess() { - rtc::CritScope cs(&process_thread_lock_); - if (!pending_process_ && pipe_.TimeUntilNextProcess() && process_thread_) { - process_thread_->WakeUp(nullptr); - } -} - -int64_t FakeNetworkPipeModule::TimeUntilNextProcess() { - auto delay = pipe_.TimeUntilNextProcess(); - rtc::CritScope cs(&process_thread_lock_); - pending_process_ = delay.has_value(); - return delay.value_or(kDoNothingProcessIntervalMs); -} - -void FakeNetworkPipeModule::ProcessThreadAttached( - ProcessThread* process_thread) { - rtc::CritScope cs(&process_thread_lock_); - process_thread_ = process_thread; -} - -void FakeNetworkPipeModule::Process() { +bool DegradedCall::FakeNetworkPipeOnTaskQueue::Process() { pipe_.Process(); + auto time_to_next = pipe_.TimeUntilNextProcess(); + if (!time_to_next) { + // Packet was probably sent immediately. + return false; + } + + task_queue_.PostTask([this, time_to_next]() { + RTC_DCHECK_RUN_ON(&task_queue_); + int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds(); + if (!next_process_ms_ || next_process_time < *next_process_ms_) { + next_process_ms_ = next_process_time; + task_queue_.PostDelayedTask( + [this]() { + RTC_DCHECK_RUN_ON(&task_queue_); + if (!Process()) { + next_process_ms_.reset(); + } + }, + *time_to_next); + } + }); + + return true; } DegradedCall::DegradedCall( std::unique_ptr call, absl::optional send_config, - absl::optional receive_config) + absl::optional receive_config, + TaskQueueFactory* task_queue_factory) : clock_(Clock::GetRealTimeClock()), call_(std::move(call)), + task_queue_factory_(task_queue_factory), send_config_(send_config), - send_process_thread_( - send_config_ ? ProcessThread::Create("DegradedSendThread") : nullptr), - num_send_streams_(0), + send_simulated_network_(nullptr), receive_config_(receive_config) { if (receive_config_) { auto network = absl::make_unique(*receive_config_); @@ -83,19 +87,9 @@ DegradedCall::DegradedCall( absl::make_unique(clock_, std::move(network)); receive_pipe_->SetReceiver(call_->Receiver()); } - if (send_process_thread_) { - send_process_thread_->Start(); - } } -DegradedCall::~DegradedCall() { - if (send_pipe_) { - send_process_thread_->DeRegisterModule(send_pipe_.get()); - } - if (send_process_thread_) { - send_process_thread_->Stop(); - } -} +DegradedCall::~DegradedCall() = default; AudioSendStream* DegradedCall::CreateAudioSendStream( const AudioSendStream::Config& config) { @@ -122,12 +116,10 @@ VideoSendStream* DegradedCall::CreateVideoSendStream( if (send_config_ && !send_pipe_) { auto network = absl::make_unique(*send_config_); send_simulated_network_ = network.get(); - send_pipe_ = absl::make_unique( - clock_, std::move(network), config.send_transport); + send_pipe_ = absl::make_unique( + task_queue_factory_, clock_, std::move(network), config.send_transport); config.send_transport = this; - send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); } - ++num_send_streams_; return call_->CreateVideoSendStream(std::move(config), std::move(encoder_config)); } @@ -139,25 +131,16 @@ VideoSendStream* DegradedCall::CreateVideoSendStream( if (send_config_ && !send_pipe_) { auto network = absl::make_unique(*send_config_); send_simulated_network_ = network.get(); - send_pipe_ = absl::make_unique( - clock_, std::move(network), config.send_transport); + send_pipe_ = absl::make_unique( + task_queue_factory_, clock_, std::move(network), config.send_transport); config.send_transport = this; - send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); } - ++num_send_streams_; return call_->CreateVideoSendStream( std::move(config), std::move(encoder_config), std::move(fec_controller)); } void DegradedCall::DestroyVideoSendStream(VideoSendStream* send_stream) { call_->DestroyVideoSendStream(send_stream); - if (send_pipe_ && num_send_streams_ > 0) { - --num_send_streams_; - if (num_send_streams_ == 0) { - send_process_thread_->DeRegisterModule(send_pipe_.get()); - send_pipe_.reset(); - } - } } VideoReceiveStream* DegradedCall::CreateVideoReceiveStream( diff --git a/call/degraded_call.h b/call/degraded_call.h index adac631478..400450ea08 100644 --- a/call/degraded_call.h +++ b/call/degraded_call.h @@ -33,44 +33,20 @@ #include "call/simulated_network.h" #include "call/video_receive_stream.h" #include "call/video_send_stream.h" -#include "modules/include/module.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/copy_on_write_buffer.h" #include "rtc_base/network/sent_packet.h" +#include "rtc_base/task_queue.h" #include "system_wrappers/include/clock.h" namespace webrtc { -class FakeNetworkPipeModule : public Module { - public: - FakeNetworkPipeModule( - Clock* clock, - std::unique_ptr network_behavior, - Transport* transport); - ~FakeNetworkPipeModule() override; - void SendRtp(const uint8_t* packet, - size_t length, - const PacketOptions& options); - void SendRtcp(const uint8_t* packet, size_t length); - - // Implements Module interface - int64_t TimeUntilNextProcess() override; - void ProcessThreadAttached(ProcessThread* process_thread) override; - void Process() override; - - private: - void MaybeResumeProcess(); - FakeNetworkPipe pipe_; - rtc::CriticalSection process_thread_lock_; - ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr; - bool pending_process_ RTC_GUARDED_BY(process_thread_lock_) = false; -}; - class DegradedCall : public Call, private Transport, private PacketReceiver { public: explicit DegradedCall( std::unique_ptr call, absl::optional send_config, - absl::optional receive_config); + absl::optional receive_config, + TaskQueueFactory* task_queue_factory); ~DegradedCall() override; // Implements Call. @@ -125,17 +101,40 @@ class DegradedCall : public Call, private Transport, private PacketReceiver { int64_t packet_time_us) override; private: + class FakeNetworkPipeOnTaskQueue { + public: + FakeNetworkPipeOnTaskQueue( + TaskQueueFactory* task_queue_factory, + Clock* clock, + std::unique_ptr network_behavior, + Transport* transport); + + void SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options); + void SendRtcp(const uint8_t* packet, size_t length); + + private: + // Try to process packets on the fake network queue. + // Returns true if call resulted in a delayed process, false if queue empty. + bool Process(); + + Clock* const clock_; + rtc::TaskQueue task_queue_; + FakeNetworkPipe pipe_; + absl::optional next_process_ms_ RTC_GUARDED_BY(&task_queue_); + }; + Clock* const clock_; const std::unique_ptr call_; + TaskQueueFactory* const task_queue_factory_; void MediaTransportChange(MediaTransportInterface* media_transport) override; void SetClientBitratePreferences( const webrtc::BitrateSettings& preferences) override {} const absl::optional send_config_; - const std::unique_ptr send_process_thread_; SimulatedNetwork* send_simulated_network_; - std::unique_ptr send_pipe_; - size_t num_send_streams_; + std::unique_ptr send_pipe_; const absl::optional receive_config_; SimulatedNetwork* receive_simulated_network_;