diff --git a/call/BUILD.gn b/call/BUILD.gn index 38b2f4145c..9011094ce7 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -228,6 +228,7 @@ rtc_static_library("call") { "../logging:rtc_event_rtp_rtcp", "../logging:rtc_event_video", "../logging:rtc_stream_config", + "../modules:module_api", "../modules/bitrate_controller", "../modules/congestion_controller", "../modules/pacing", @@ -300,7 +301,6 @@ rtc_source_set("simulated_packet_receiver") { deps = [ ":call_interfaces", "../api:simulated_network_api", - "../modules:module_api", ] } @@ -317,7 +317,6 @@ rtc_source_set("fake_network") { "../api:libjingle_peerconnection_api", "../api:simulated_network_api", "../api:transport_api", - "../modules:module_api", "../modules/utility", "../rtc_base:checks", "../rtc_base:rtc_base_approved", diff --git a/call/degraded_call.cc b/call/degraded_call.cc index c066d5d6d3..4b6d1af7dc 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -15,6 +15,55 @@ #include "rtc_base/location.h" namespace webrtc { + +namespace { +constexpr int64_t kDoNothingProcessIntervalMs = 5000; +} // namespace + +FakeNetworkPipeModule::~FakeNetworkPipeModule() = default; + +FakeNetworkPipeModule::FakeNetworkPipeModule( + Clock* clock, + std::unique_ptr network_behavior, + Transport* transport) + : pipe_(clock, std::move(network_behavior), transport) {} + +void FakeNetworkPipeModule::SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options) { + pipe_.SendRtp(packet, length, options); + MaybeResumeProcess(); +} + +void FakeNetworkPipeModule::SendRtcp(const uint8_t* packet, size_t length) { + pipe_.SendRtcp(packet, length); + MaybeResumeProcess(); +} + +void FakeNetworkPipeModule::MaybeResumeProcess() { + rtc::CritScope cs(&process_thread_lock_); + if (!pending_process_ && pipe_.TimeUntilNextProcess() && process_thread_) { + process_thread_->WakeUp(nullptr); + } +} + +int64_t FakeNetworkPipeModule::TimeUntilNextProcess() { + auto delay = pipe_.TimeUntilNextProcess(); + rtc::CritScope cs(&process_thread_lock_); + pending_process_ = delay.has_value(); + return delay.value_or(kDoNothingProcessIntervalMs); +} + +void FakeNetworkPipeModule::ProcessThreadAttached( + ProcessThread* process_thread) { + rtc::CritScope cs(&process_thread_lock_); + process_thread_ = process_thread; +} + +void FakeNetworkPipeModule::Process() { + pipe_.Process(); +} + DegradedCall::DegradedCall( std::unique_ptr call, absl::optional send_config, @@ -72,8 +121,8 @@ VideoSendStream* DegradedCall::CreateVideoSendStream( if (send_config_ && !send_pipe_) { auto network = absl::make_unique(*send_config_); send_simulated_network_ = network.get(); - send_pipe_ = absl::make_unique(clock_, std::move(network), - config.send_transport); + send_pipe_ = absl::make_unique( + clock_, std::move(network), config.send_transport); config.send_transport = this; send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); } @@ -89,8 +138,8 @@ VideoSendStream* DegradedCall::CreateVideoSendStream( if (send_config_ && !send_pipe_) { auto network = absl::make_unique(*send_config_); send_simulated_network_ = network.get(); - send_pipe_ = absl::make_unique(clock_, std::move(network), - config.send_transport); + send_pipe_ = absl::make_unique( + clock_, std::move(network), config.send_transport); config.send_transport = this; send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); } diff --git a/call/degraded_call.h b/call/degraded_call.h index 8f062c3a7b..89eafdb9dc 100644 --- a/call/degraded_call.h +++ b/call/degraded_call.h @@ -32,6 +32,7 @@ #include "call/simulated_network.h" #include "call/video_receive_stream.h" #include "call/video_send_stream.h" +#include "modules/include/module.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/bitrate_allocation_strategy.h" #include "rtc_base/copy_on_write_buffer.h" @@ -39,6 +40,30 @@ #include "system_wrappers/include/clock.h" namespace webrtc { +class FakeNetworkPipeModule : public Module { + public: + FakeNetworkPipeModule( + Clock* clock, + std::unique_ptr network_behavior, + Transport* transport); + ~FakeNetworkPipeModule() override; + void SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options); + void SendRtcp(const uint8_t* packet, size_t length); + + // Implements Module interface + int64_t TimeUntilNextProcess() override; + void ProcessThreadAttached(ProcessThread* process_thread) override; + void Process() override; + + private: + void MaybeResumeProcess(); + FakeNetworkPipe pipe_; + rtc::CriticalSection process_thread_lock_; + ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr; + bool pending_process_ RTC_GUARDED_BY(process_thread_lock_) = false; +}; class DegradedCall : public Call, private Transport, private PacketReceiver { public: @@ -111,7 +136,7 @@ class DegradedCall : public Call, private Transport, private PacketReceiver { const absl::optional send_config_; const std::unique_ptr send_process_thread_; SimulatedNetwork* send_simulated_network_; - std::unique_ptr send_pipe_; + std::unique_ptr send_pipe_; size_t num_send_streams_; const absl::optional receive_config_; diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc index b5c0cb52e3..46adcb47a9 100644 --- a/call/fake_network_pipe.cc +++ b/call/fake_network_pipe.cc @@ -24,7 +24,6 @@ namespace webrtc { namespace { -constexpr int64_t kDefaultProcessIntervalMs = 5; constexpr int64_t kLogIntervalMs = 5000; } // namespace @@ -167,12 +166,6 @@ bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet, packets_in_flight_.pop_back(); ++dropped_packets_; } - if (network_behavior_->NextDeliveryTimeUs()) { - rtc::CritScope crit(&process_thread_lock_); - if (process_thread_) - process_thread_->WakeUp(nullptr); - } - return sent; } @@ -292,19 +285,14 @@ void FakeNetworkPipe::DeliverNetworkPacket(NetworkPacket* packet) { } } -int64_t FakeNetworkPipe::TimeUntilNextProcess() { +absl::optional FakeNetworkPipe::TimeUntilNextProcess() { rtc::CritScope crit(&process_lock_); absl::optional delivery_us = network_behavior_->NextDeliveryTimeUs(); if (delivery_us) { int64_t delay_us = *delivery_us - clock_->TimeInMicroseconds(); return std::max((delay_us + 500) / 1000, 0); } - return kDefaultProcessIntervalMs; -} - -void FakeNetworkPipe::ProcessThreadAttached(ProcessThread* process_thread) { - rtc::CritScope cs(&process_thread_lock_); - process_thread_ = process_thread; + return absl::nullopt; } bool FakeNetworkPipe::HasTransport() const { diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h index 2c41dbfd93..661815bf44 100644 --- a/call/fake_network_pipe.h +++ b/call/fake_network_pipe.h @@ -86,8 +86,7 @@ class NetworkPacket { // Class faking a network link, internally is uses an implementation of a // SimulatedNetworkInterface to simulate network behavior. -class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, - public Transport { +class FakeNetworkPipe : public SimulatedPacketReceiverInterface { public: // Will keep |network_behavior| alive while pipe is alive itself. // Use these constructors if you plan to insert packets using DeliverPacket(). @@ -119,8 +118,8 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, // constructor. bool SendRtp(const uint8_t* packet, size_t length, - const PacketOptions& options) override; - bool SendRtcp(const uint8_t* packet, size_t length) override; + const PacketOptions& options); + bool SendRtcp(const uint8_t* packet, size_t length); // Implements the PacketReceiver interface. When/if packets are delivered, // they will be passed directly to the receiver instance given in @@ -138,8 +137,7 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, // Processes the network queues and trigger PacketReceiver::IncomingPacket for // packets ready to be delivered. void Process() override; - int64_t TimeUntilNextProcess() override; - void ProcessThreadAttached(ProcessThread* process_thread) override; + absl::optional TimeUntilNextProcess() override; // Get statistics. float PercentageLoss(); @@ -193,10 +191,6 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, // |process_lock| guards the data structures involved in delay and loss // processes, such as the packet queues. rtc::CriticalSection process_lock_; - - rtc::CriticalSection process_thread_lock_; - ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr; - // Packets are added at the back of the deque, this makes the deque ordered // by increasing send time. The common case when removing packets from the // deque is removing early packets, which will be close to the front of the diff --git a/call/simulated_network.cc b/call/simulated_network.cc index 0884b295f8..c80255f388 100644 --- a/call/simulated_network.cc +++ b/call/simulated_network.cc @@ -20,6 +20,9 @@ #include "rtc_base/checks.h" namespace webrtc { +namespace { +constexpr int64_t kDefaultProcessDelayUs = 5000; +} SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config, uint64_t random_seed) @@ -76,15 +79,16 @@ bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) { // calculated in UpdateCapacityQueue. queue_size_bytes_ += packet.size; capacity_link_.push({packet, packet.send_time_us}); + if (!next_process_time_us_) { + next_process_time_us_ = packet.send_time_us + kDefaultProcessDelayUs; + } return true; } absl::optional SimulatedNetwork::NextDeliveryTimeUs() const { RTC_DCHECK_RUNS_SERIALIZED(&process_checker_); - if (!delay_link_.empty()) - return delay_link_.begin()->arrival_time_us; - return absl::nullopt; + return next_process_time_us_; } void SimulatedNetwork::UpdateCapacityQueue(ConfigState state, @@ -198,6 +202,14 @@ std::vector SimulatedNetwork::DequeueDeliverablePackets( PacketDeliveryInfo(packet_info.packet, packet_info.arrival_time_us)); delay_link_.pop_front(); } + + if (!delay_link_.empty()) { + next_process_time_us_ = delay_link_.front().arrival_time_us; + } else if (!capacity_link_.empty()) { + next_process_time_us_ = receive_time_us + kDefaultProcessDelayUs; + } else { + next_process_time_us_.reset(); + } return packets_to_deliver; } diff --git a/call/simulated_network.h b/call/simulated_network.h index 6adb412edf..632eb5dfed 100644 --- a/call/simulated_network.h +++ b/call/simulated_network.h @@ -87,6 +87,8 @@ class SimulatedNetwork : public NetworkBehaviorInterface { int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0; absl::optional last_capacity_link_visit_us_ RTC_GUARDED_BY(process_checker_); + absl::optional next_process_time_us_ + RTC_GUARDED_BY(process_checker_); }; } // namespace webrtc diff --git a/call/simulated_packet_receiver.h b/call/simulated_packet_receiver.h index 03d7e96ea6..2db46e8c38 100644 --- a/call/simulated_packet_receiver.h +++ b/call/simulated_packet_receiver.h @@ -13,13 +13,12 @@ #include "api/test/simulated_network.h" #include "call/packet_receiver.h" -#include "modules/include/module.h" namespace webrtc { // Private API that is fixing surface between DirectTransport and underlying // network conditions simulation implementation. -class SimulatedPacketReceiverInterface : public PacketReceiver, public Module { +class SimulatedPacketReceiverInterface : public PacketReceiver { public: // Must not be called in parallel with DeliverPacket or Process. // Destination receiver will be injected with this method @@ -27,6 +26,15 @@ class SimulatedPacketReceiverInterface : public PacketReceiver, public Module { // Reports average packet delay. virtual int AverageDelay() = 0; + + // Process any pending tasks such as timeouts. + // Called on a worker thread. + virtual void Process() = 0; + + // Returns the time until next process or nullopt to indicate that the next + // process time is unknown. If the next process time is unknown, this should + // be checked again any time a packet is enqueued. + virtual absl::optional TimeUntilNextProcess() = 0; }; } // namespace webrtc diff --git a/call/test/fake_network_pipe_unittest.cc b/call/test/fake_network_pipe_unittest.cc index 9f2a663403..b8c7e56d17 100644 --- a/call/test/fake_network_pipe_unittest.cc +++ b/call/test/fake_network_pipe_unittest.cc @@ -259,7 +259,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithEmptyPipeTest) { // Check that all the packets were sent. EXPECT_EQ(static_cast(2 * kNumPackets), pipe->SentPackets()); - fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); + fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess()); EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0); pipe->Process(); } @@ -307,7 +307,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithPacketsInPipeTest) { // Check that all the packets were sent. EXPECT_EQ(static_cast(kNumPackets), pipe->SentPackets()); - fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); + fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess()); EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0); pipe->Process(); } diff --git a/test/direct_transport.cc b/test/direct_transport.cc index fd7369171a..b7554496c6 100644 --- a/test/direct_transport.cc +++ b/test/direct_transport.cc @@ -50,19 +50,18 @@ DirectTransport::DirectTransport( } DirectTransport::~DirectTransport() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); - // Constructor updates |next_scheduled_task_|, so it's guaranteed to - // be initialized. - task_queue_->CancelTask(next_scheduled_task_); + if (next_process_task_) + task_queue_->CancelTask(*next_process_task_); } void DirectTransport::StopSending() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); - task_queue_->CancelTask(next_scheduled_task_); + rtc::CritScope cs(&process_lock_); + if (next_process_task_) + task_queue_->CancelTask(*next_process_task_); } void DirectTransport::SetReceiver(PacketReceiver* receiver) { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); + rtc::CritScope cs(&process_lock_); fake_network_->SetReceiver(receiver); } @@ -92,6 +91,9 @@ void DirectTransport::SendPacket(const uint8_t* data, size_t length) { int64_t send_time = clock_->TimeInMicroseconds(); fake_network_->DeliverPacket(media_type, rtc::CopyOnWriteBuffer(data, length), send_time); + rtc::CritScope cs(&process_lock_); + if (!next_process_task_) + ProcessPackets(); } int DirectTransport::GetAverageDelayMs() { @@ -104,17 +106,20 @@ void DirectTransport::Start() { send_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); send_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); } - SendPackets(); } -void DirectTransport::SendPackets() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); - - fake_network_->Process(); - - int64_t delay_ms = fake_network_->TimeUntilNextProcess(); - next_scheduled_task_ = - task_queue_->PostDelayedTask([this]() { SendPackets(); }, delay_ms); +void DirectTransport::ProcessPackets() { + next_process_task_.reset(); + auto delay_ms = fake_network_->TimeUntilNextProcess(); + if (delay_ms) { + next_process_task_ = task_queue_->PostDelayedTask( + [this]() { + fake_network_->Process(); + rtc::CritScope cs(&process_lock_); + ProcessPackets(); + }, + *delay_ms); + } } } // namespace test } // namespace webrtc diff --git a/test/direct_transport.h b/test/direct_transport.h index f926ec5b6a..d70748ffc6 100644 --- a/test/direct_transport.h +++ b/test/direct_transport.h @@ -60,7 +60,7 @@ class DirectTransport : public Transport { int GetAverageDelayMs(); private: - void SendPackets(); + void ProcessPackets() RTC_EXCLUSIVE_LOCKS_REQUIRED(&process_lock_); void SendPacket(const uint8_t* data, size_t length); void Start(); @@ -68,13 +68,14 @@ class DirectTransport : public Transport { Clock* const clock_; SingleThreadedTaskQueueForTesting* const task_queue_; - SingleThreadedTaskQueueForTesting::TaskId next_scheduled_task_ - RTC_GUARDED_BY(&sequence_checker_); + + rtc::CriticalSection process_lock_; + absl::optional next_process_task_ + RTC_GUARDED_BY(&process_lock_); const Demuxer demuxer_; const std::unique_ptr fake_network_; - rtc::SequencedTaskChecker sequence_checker_; }; } // namespace test } // namespace webrtc