From c228575bafc5c6ce6d3de75cee7eba05b9c3bcf9 Mon Sep 17 00:00:00 2001 From: Byoungchan Lee Date: Tue, 27 Sep 2022 14:27:35 +0900 Subject: [PATCH] Make DegradedCall work without DCHECK failure Ability to emulate degraded networks using DegradedCall has not been covered by tests and it is crashing due to DCHECKs. Fix threading issues so this no longer crash. Bug: None Change-Id: I9276dfb1f71762faa02146af0bfaab713bebb7f7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/276060 Reviewed-by: Tomas Gunnarsson Commit-Queue: Daniel.L (Byoungchan) Lee Cr-Commit-Position: refs/heads/main@{#38216} --- call/BUILD.gn | 1 + call/degraded_call.cc | 90 +++++++++++++++++++++---- call/degraded_call.h | 27 +++++++- pc/peer_connection_field_trial_tests.cc | 82 +++++++++++++++++++++- 4 files changed, 182 insertions(+), 18 deletions(-) diff --git a/call/BUILD.gn b/call/BUILD.gn index efcb59107e..c56c557ecc 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -320,6 +320,7 @@ rtc_library("call") { "../rtc_base:logging", "../rtc_base:macromagic", "../rtc_base:rate_limiter", + "../rtc_base:rtc_event", "../rtc_base:rtc_task_queue", "../rtc_base:safe_minmax", "../rtc_base:stringutils", diff --git a/call/degraded_call.cc b/call/degraded_call.cc index 8c3da5730f..0090d3a081 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -14,17 +14,19 @@ #include #include "absl/strings/string_view.h" +#include "modules/rtp_rtcp/source/rtp_util.h" +#include "rtc_base/event.h" namespace webrtc { DegradedCall::FakeNetworkPipeOnTaskQueue::FakeNetworkPipeOnTaskQueue( TaskQueueBase* task_queue, - const ScopedTaskSafety& task_safety, + rtc::scoped_refptr call_alive, Clock* clock, std::unique_ptr network_behavior) : clock_(clock), task_queue_(task_queue), - task_safety_(task_safety), + call_alive_(std::move(call_alive)), pipe_(clock, std::move(network_behavior)) {} void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtp( @@ -61,13 +63,13 @@ bool DegradedCall::FakeNetworkPipeOnTaskQueue::Process() { return false; } - task_queue_->PostTask(SafeTask(task_safety_.flag(), [this, time_to_next] { + task_queue_->PostTask(SafeTask(call_alive_, [this, time_to_next] { RTC_DCHECK_RUN_ON(task_queue_); int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds(); if (!next_process_ms_ || next_process_time < *next_process_ms_) { next_process_ms_ = next_process_time; task_queue_->PostDelayedHighPrecisionTask( - SafeTask(task_safety_.flag(), + SafeTask(call_alive_, [this] { RTC_DCHECK_RUN_ON(task_queue_); if (!Process()) { @@ -126,12 +128,61 @@ bool DegradedCall::FakeNetworkPipeTransportAdapter::SendRtcp( return true; } +DegradedCall::ThreadedPacketReceiver::ThreadedPacketReceiver( + webrtc::TaskQueueBase* worker_thread, + webrtc::TaskQueueBase* network_thread, + rtc::scoped_refptr call_alive, + webrtc::PacketReceiver* receiver) + : worker_thread_(worker_thread), + network_thread_(network_thread), + call_alive_(std::move(call_alive)), + receiver_(receiver) {} + +DegradedCall::ThreadedPacketReceiver::~ThreadedPacketReceiver() = default; + +PacketReceiver::DeliveryStatus +DegradedCall::ThreadedPacketReceiver::DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + // `Call::DeliverPacket` expects RTCP packets to be delivered from the + // network thread and RTP packets to be delivered from the worker thread. + // Because `FakeNetworkPipe` queues packets, the thread used when this packet + // is delivered to `DegradedCall::DeliverPacket` may differ from the thread + // used when this packet is delivered to + // `ThreadedPacketReceiver::DeliverPacket`. To solve this problem, always + // make sure that packets are sent in the correct thread. + if (IsRtcpPacket(packet)) { + if (!network_thread_->IsCurrent()) { + network_thread_->PostTask( + SafeTask(call_alive_, [receiver = receiver_, media_type, + packet = std::move(packet), packet_time_us]() { + receiver->DeliverPacket(media_type, std::move(packet), + packet_time_us); + })); + return DELIVERY_OK; + } + } else { + if (!worker_thread_->IsCurrent()) { + worker_thread_->PostTask([receiver = receiver_, media_type, + packet = std::move(packet), packet_time_us]() { + receiver->DeliverPacket(media_type, std::move(packet), packet_time_us); + }); + return DELIVERY_OK; + } + } + + return receiver_->DeliverPacket(media_type, std::move(packet), + packet_time_us); +} + DegradedCall::DegradedCall( std::unique_ptr call, const std::vector& send_configs, const std::vector& receive_configs) : clock_(Clock::GetRealTimeClock()), call_(std::move(call)), + call_alive_(PendingTaskSafetyFlag::CreateDetached()), send_config_index_(0), send_configs_(send_configs), send_simulated_network_(nullptr), @@ -142,11 +193,13 @@ DegradedCall::DegradedCall( receive_simulated_network_ = network.get(); receive_pipe_ = std::make_unique(clock_, std::move(network)); - receive_pipe_->SetReceiver(call_->Receiver()); + packet_receiver_ = std::make_unique( + call_->worker_thread(), call_->network_thread(), call_alive_, + call_->Receiver()); + receive_pipe_->SetReceiver(packet_receiver_.get()); if (receive_configs_.size() > 1) { call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), - [this] { UpdateReceiveNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }), receive_configs_[0].duration); } } @@ -154,16 +207,29 @@ DegradedCall::DegradedCall( auto network = std::make_unique(send_configs_[0]); send_simulated_network_ = network.get(); send_pipe_ = std::make_unique( - call_->network_thread(), task_safety_, clock_, std::move(network)); + call_->network_thread(), call_alive_, clock_, std::move(network)); if (send_configs_.size() > 1) { call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }), send_configs_[0].duration); } } } -DegradedCall::~DegradedCall() = default; +DegradedCall::~DegradedCall() { + RTC_DCHECK_RUN_ON(call_->worker_thread()); + // Thread synchronization is required to call `SetNotAlive`. + // Otherwise, when the `DegradedCall` object is destroyed but + // `SetNotAlive` has not yet been called, + // another Closure guarded by `call_alive_` may be called. + rtc::Event event; + call_->network_thread()->PostTask( + [flag = std::move(call_alive_), &event]() mutable { + flag->SetNotAlive(); + event.Set(); + }); + event.Wait(rtc::Event::kForever); +} AudioSendStream* DegradedCall::CreateAudioSendStream( const AudioSendStream::Config& config) { @@ -352,7 +418,7 @@ void DegradedCall::UpdateSendNetworkConfig() { send_config_index_ = (send_config_index_ + 1) % send_configs_.size(); send_simulated_network_->SetConfig(send_configs_[send_config_index_]); call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }), send_configs_[send_config_index_].duration); } @@ -361,7 +427,7 @@ void DegradedCall::UpdateReceiveNetworkConfig() { receive_simulated_network_->SetConfig( receive_configs_[receive_config_index_]); call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), [this] { UpdateReceiveNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }), receive_configs_[receive_config_index_].duration); } } // namespace webrtc diff --git a/call/degraded_call.h b/call/degraded_call.h index fe5fd7c46c..522302283a 100644 --- a/call/degraded_call.h +++ b/call/degraded_call.h @@ -122,7 +122,7 @@ class DegradedCall : public Call, private PacketReceiver { public: FakeNetworkPipeOnTaskQueue( TaskQueueBase* task_queue, - const ScopedTaskSafety& task_safety, + rtc::scoped_refptr call_alive, Clock* clock, std::unique_ptr network_behavior); @@ -142,11 +142,30 @@ class DegradedCall : public Call, private PacketReceiver { Clock* const clock_; TaskQueueBase* const task_queue_; - const ScopedTaskSafety& task_safety_; + rtc::scoped_refptr call_alive_; FakeNetworkPipe pipe_; absl::optional next_process_ms_ RTC_GUARDED_BY(&task_queue_); }; + class ThreadedPacketReceiver : public PacketReceiver { + public: + ThreadedPacketReceiver(webrtc::TaskQueueBase* worker_thread, + webrtc::TaskQueueBase* network_thread, + rtc::scoped_refptr call_alive, + PacketReceiver* receiver); + ~ThreadedPacketReceiver() override; + + DeliveryStatus DeliverPacket(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) override; + + private: + webrtc::TaskQueueBase* const worker_thread_; + webrtc::TaskQueueBase* const network_thread_; + rtc::scoped_refptr call_alive_; + webrtc::PacketReceiver* const receiver_; + }; + // For audio/video send stream, a TransportAdapter instance is used to // intercept packets to be sent, and put them into a common FakeNetworkPipe // in such as way that they will eventually (unless dropped) be forwarded to @@ -178,7 +197,8 @@ class DegradedCall : public Call, private PacketReceiver { Clock* const clock_; const std::unique_ptr call_; - ScopedTaskSafety task_safety_; + // For cancelling tasks on the network thread when DegradedCall is destroyed + rtc::scoped_refptr call_alive_; size_t send_config_index_; const std::vector send_configs_; SimulatedNetwork* send_simulated_network_; @@ -192,6 +212,7 @@ class DegradedCall : public Call, private PacketReceiver { const std::vector receive_configs_; SimulatedNetwork* receive_simulated_network_; std::unique_ptr receive_pipe_; + std::unique_ptr packet_receiver_; }; } // namespace webrtc diff --git a/pc/peer_connection_field_trial_tests.cc b/pc/peer_connection_field_trial_tests.cc index 528b6ba2be..0e6e451a9a 100644 --- a/pc/peer_connection_field_trial_tests.cc +++ b/pc/peer_connection_field_trial_tests.cc @@ -25,15 +25,35 @@ #include "pc/peer_connection_wrapper.h" #include "pc/session_description.h" #include "pc/test/fake_audio_capture_module.h" +#include "pc/test/frame_generator_capturer_video_track_source.h" #include "pc/test/peer_connection_test_wrapper.h" +#include "rtc_base/gunit.h" #include "rtc_base/internal/default_socket_server.h" #include "rtc_base/physical_socket_server.h" #include "rtc_base/thread.h" #include "test/gtest.h" #include "test/scoped_key_value_config.h" +#ifdef WEBRTC_ANDROID +#include "pc/test/android_test_initializer.h" +#endif + namespace webrtc { +namespace { +static const int kDefaultTimeoutMs = 5000; + +bool AddIceCandidates(PeerConnectionWrapper* peer, + std::vector candidates) { + for (const auto candidate : candidates) { + if (!peer->pc()->AddIceCandidate(candidate)) { + return false; + } + } + return true; +} +} // namespace + using RTCConfiguration = PeerConnectionInterface::RTCConfiguration; class PeerConnectionFieldTrialTest : public ::testing::Test { @@ -41,8 +61,12 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { typedef std::unique_ptr WrapperPtr; PeerConnectionFieldTrialTest() - : socket_server_(rtc::CreateDefaultSocketServer()), + : clock_(Clock::GetRealTimeClock()), + socket_server_(rtc::CreateDefaultSocketServer()), main_thread_(socket_server_.get()) { +#ifdef WEBRTC_ANDROID + InitializeAndroidObjects(); +#endif webrtc::PeerConnectionInterface::IceServer ice_server; ice_server.uri = "stun:stun.l.google.com:19302"; config_.servers.push_back(ice_server); @@ -54,8 +78,6 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { void CreatePCFactory(std::unique_ptr field_trials) { PeerConnectionFactoryDependencies pcf_deps; pcf_deps.signaling_thread = rtc::Thread::Current(); - pcf_deps.worker_thread = rtc::Thread::Current(); - pcf_deps.network_thread = rtc::Thread::Current(); pcf_deps.trials = std::move(field_trials); pcf_deps.task_queue_factory = CreateDefaultTaskQueueFactory(); pcf_deps.call_factory = webrtc::CreateCallFactory(); @@ -66,6 +88,13 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { webrtc::SetMediaEngineDefaults(&media_deps); pcf_deps.media_engine = cricket::CreateMediaEngine(std::move(media_deps)); pc_factory_ = CreateModularPeerConnectionFactory(std::move(pcf_deps)); + + // Allow ADAPTER_TYPE_LOOPBACK to create PeerConnections with loopback in + // this test. + RTC_DCHECK(pc_factory_); + PeerConnectionFactoryInterface::Options options; + options.network_ignore_mask = 0; + pc_factory_->SetOptions(options); } WrapperPtr CreatePeerConnection() { @@ -79,6 +108,7 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { pc_factory_, result.MoveValue(), std::move(observer)); } + Clock* const clock_; std::unique_ptr socket_server_; rtc::AutoSocketServerThread main_thread_; rtc::scoped_refptr pc_factory_ = nullptr; @@ -188,4 +218,50 @@ TEST_F(PeerConnectionFieldTrialTest, InjectDependencyDescriptor) { EXPECT_TRUE(found2); } +// Test that the ability to emulate degraded networks works without crashing. +TEST_F(PeerConnectionFieldTrialTest, ApplyFakeNetworkConfig) { + std::unique_ptr field_trials = + std::make_unique( + "WebRTC-FakeNetworkSendConfig/link_capacity_kbps:500/" + "WebRTC-FakeNetworkReceiveConfig/loss_percent:1/"); + + CreatePCFactory(std::move(field_trials)); + + WrapperPtr caller = CreatePeerConnection(); + FrameGeneratorCapturerVideoTrackSource::Config config; + auto video_track_source = + rtc::make_ref_counted( + config, clock_, /*is_screencast=*/false); + caller->AddTrack( + pc_factory_->CreateVideoTrack("v", video_track_source.get())); + WrapperPtr callee = CreatePeerConnection(); + + ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal())); + ASSERT_TRUE( + caller->SetRemoteDescription(callee->CreateAnswerAndSetAsLocal())); + + // Do the SDP negotiation, and also exchange ice candidates. + ASSERT_TRUE(caller->ExchangeOfferAnswerWith(callee.get())); + ASSERT_TRUE_WAIT( + caller->signaling_state() == PeerConnectionInterface::kStable, + kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(caller->IsIceGatheringDone(), kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(callee->IsIceGatheringDone(), kDefaultTimeoutMs); + + // Connect an ICE candidate pairs. + ASSERT_TRUE( + AddIceCandidates(callee.get(), caller->observer()->GetAllCandidates())); + ASSERT_TRUE( + AddIceCandidates(caller.get(), callee->observer()->GetAllCandidates())); + + // This means that ICE and DTLS are connected. + ASSERT_TRUE_WAIT(callee->IsIceConnected(), kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(caller->IsIceConnected(), kDefaultTimeoutMs); + + // Send packets for kDefaultTimeoutMs + // For now, whether this field trial works or not is checked by + // whether a crash occurs. Additional validation can be added later. + WAIT(false, kDefaultTimeoutMs); +} + } // namespace webrtc