diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 144328c511..54713fef76 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -1124,6 +1124,8 @@ bool PeerConnection::GetStats(StatsObserver* observer, return false; } + RTC_LOG_THREAD_BLOCK_COUNT(); + stats_->UpdateStats(level); // The StatsCollector is used to tell if a track is valid because it may // remember tracks that the PeerConnection previously removed. @@ -1133,6 +1135,7 @@ bool PeerConnection::GetStats(StatsObserver* observer, return false; } message_handler_.PostGetStats(observer, stats_.get(), track); + return true; } @@ -1141,6 +1144,7 @@ void PeerConnection::GetStats(RTCStatsCollectorCallback* callback) { RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK(stats_collector_); RTC_DCHECK(callback); + RTC_LOG_THREAD_BLOCK_COUNT(); stats_collector_->GetStatsReport(callback); } @@ -1685,6 +1689,8 @@ void PeerConnection::Close() { RTC_DCHECK_RUN_ON(signaling_thread()); TRACE_EVENT0("webrtc", "PeerConnection::Close"); + RTC_LOG_THREAD_BLOCK_COUNT(); + if (IsClosed()) { return; } diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index f8c5bd5ec8..53a3702c40 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -150,6 +150,8 @@ void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) { return; } + RTC_LOG_THREAD_BLOCK_COUNT(); + if (channel) { RTC_DCHECK_EQ(media_type(), channel->media_type()); } @@ -170,14 +172,21 @@ void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) { : nullptr); } + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); + for (const auto& receiver : receivers_) { if (!channel_) { + // TODO(tommi): This can internally block and hop to the worker thread. + // It's likely that SetMediaChannel also does that, so perhaps we should + // require SetMediaChannel(nullptr) to also Stop() and skip this call. receiver->internal()->Stop(); } receiver->internal()->SetMediaChannel(channel_ ? channel_->media_channel() : nullptr); } + + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(receivers_.size() * 2); } void RtpTransceiver::AddSender( diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 1381bf99db..2d9f9c82f0 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -4700,11 +4700,29 @@ void SdpOfferAnswerHandler::DestroyTransceiverChannel( rtc::scoped_refptr> transceiver) { RTC_DCHECK(transceiver); + RTC_LOG_THREAD_BLOCK_COUNT(); + + // TODO(tommi): We're currently on the signaling thread. + // There are multiple hops to the worker ahead. + // Consider if we can make the call to SetChannel() on the worker thread + // (and require that to be the context it's always called in) and also + // call DestroyChannelInterface there, since it also needs to hop to the + // worker. cricket::ChannelInterface* channel = transceiver->internal()->channel(); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); if (channel) { + // TODO(tommi): VideoRtpReceiver::SetMediaChannel blocks and jumps to the + // worker thread. When being set to nullptr, there are additional blocking + // calls to e.g. ClearRecordableEncodedFrameCallback which triggers another + // blocking call or Stop() for video channels. transceiver->internal()->SetChannel(nullptr); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(2); + // TODO(tommi): All channel objects end up getting deleted on the + // worker thread. Can DestroyTransceiverChannel be purely posted to the + // worker? DestroyChannelInterface(channel); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(3); } } @@ -4734,6 +4752,9 @@ void SdpOfferAnswerHandler::DestroyChannelInterface( // DestroyChannelInterface to either be called on the worker thread, or do // this asynchronously on the worker. RTC_DCHECK(channel); + + RTC_LOG_THREAD_BLOCK_COUNT(); + switch (channel->media_type()) { case cricket::MEDIA_TYPE_AUDIO: channel_manager()->DestroyVoiceChannel( @@ -4751,6 +4772,10 @@ void SdpOfferAnswerHandler::DestroyChannelInterface( RTC_NOTREACHED() << "Unknown media type: " << channel->media_type(); break; } + + // TODO(tommi): Figure out why we can get 2 blocking calls when running + // PeerConnectionCryptoTest.CreateAnswerWithDifferentSslRoles. + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(2); } void SdpOfferAnswerHandler::DestroyAllChannels() { @@ -4758,18 +4783,25 @@ void SdpOfferAnswerHandler::DestroyAllChannels() { if (!transceivers()) { return; } + + RTC_LOG_THREAD_BLOCK_COUNT(); + // Destroy video channels first since they may have a pointer to a voice // channel. - for (const auto& transceiver : transceivers()->List()) { + auto list = transceivers()->List(); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); + + for (const auto& transceiver : list) { if (transceiver->media_type() == cricket::MEDIA_TYPE_VIDEO) { DestroyTransceiverChannel(transceiver); } } - for (const auto& transceiver : transceivers()->List()) { + for (const auto& transceiver : list) { if (transceiver->media_type() == cricket::MEDIA_TYPE_AUDIO) { DestroyTransceiverChannel(transceiver); } } + DestroyDataChannelTransport(); } diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index a047eeda0f..039f82ad92 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -352,6 +352,33 @@ Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { thread_->SetAllowBlockingCalls(previous_state_); } +#if RTC_DCHECK_IS_ON +Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls( + std::function callback) + : thread_(Thread::Current()), + base_blocking_call_count_(thread_->GetBlockingCallCount()), + base_could_be_blocking_call_count_( + thread_->GetCouldBeBlockingCallCount()), + result_callback_(std::move(callback)) {} + +Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() { + result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount()); +} + +uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const { + return thread_->GetBlockingCallCount() - base_blocking_call_count_; +} + +uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const { + return thread_->GetCouldBeBlockingCallCount() - + base_could_be_blocking_call_count_; +} + +uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const { + return GetBlockingCallCount() + GetCouldBeBlockingCallCount(); +} +#endif + Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {} Thread::Thread(std::unique_ptr ss) @@ -901,6 +928,10 @@ void Thread::Send(const Location& posted_from, msg.message_id = id; msg.pdata = pdata; if (IsCurrent()) { +#if RTC_DCHECK_IS_ON + RTC_DCHECK_RUN_ON(this); + could_be_blocking_call_count_++; +#endif msg.phandler->OnMessage(&msg); return; } @@ -911,6 +942,8 @@ void Thread::Send(const Location& posted_from, #if RTC_DCHECK_IS_ON if (current_thread) { + RTC_DCHECK_RUN_ON(current_thread); + current_thread->blocking_call_count_++; RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, this); @@ -1034,6 +1067,17 @@ void Thread::DisallowAllInvokes() { #endif } +#if RTC_DCHECK_IS_ON +uint32_t Thread::GetBlockingCallCount() const { + RTC_DCHECK_RUN_ON(this); + return blocking_call_count_; +} +uint32_t Thread::GetCouldBeBlockingCallCount() const { + RTC_DCHECK_RUN_ON(this); + return could_be_blocking_call_count_; +} +#endif + // Returns true if no policies added or if there is at least one policy // that permits invocation to |target| thread. bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) { diff --git a/rtc_base/thread.h b/rtc_base/thread.h index e16d5d1dc2..6d3c39b8ac 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -42,6 +42,32 @@ #include "rtc_base/win32.h" #endif +#if RTC_DCHECK_IS_ON +// Counts how many blocking Thread::Invoke or Thread::Send calls are made from +// within a scope and logs the number of blocking calls at the end of the scope. +#define RTC_LOG_THREAD_BLOCK_COUNT() \ + rtc::Thread::ScopedCountBlockingCalls blocked_call_count_printer( \ + [func = __func__](uint32_t actual_block, uint32_t could_block) { \ + auto total = actual_block + could_block; \ + if (total) { \ + RTC_LOG(LS_WARNING) << "Blocking " << func << ": total=" << total \ + << " (actual=" << actual_block \ + << ", could=" << could_block << ")"; \ + } \ + }) + +// Adds an RTC_DCHECK_LE that checks that the number of blocking calls are +// less than or equal to a specific value. Use to avoid regressing in the +// number of blocking thread calls. +// Note: Use of this macro, requires RTC_LOG_THREAD_BLOCK_COUNT() to be called +// first. +#define RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) \ + RTC_DCHECK_LE(blocked_call_count_printer.GetTotalBlockedCallCount(), x) +#else +#define RTC_LOG_THREAD_BLOCK_COUNT() +#define RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) +#endif + namespace rtc { class Thread; @@ -212,6 +238,30 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { const bool previous_state_; }; +#if RTC_DCHECK_IS_ON + class ScopedCountBlockingCalls { + public: + ScopedCountBlockingCalls(std::function callback); + ScopedCountBlockingCalls(const ScopedDisallowBlockingCalls&) = delete; + ScopedCountBlockingCalls& operator=(const ScopedDisallowBlockingCalls&) = + delete; + ~ScopedCountBlockingCalls(); + + uint32_t GetBlockingCallCount() const; + uint32_t GetCouldBeBlockingCallCount() const; + uint32_t GetTotalBlockedCallCount() const; + + private: + Thread* const thread_; + const uint32_t base_blocking_call_count_; + const uint32_t base_could_be_blocking_call_count_; + std::function result_callback_; + }; + + uint32_t GetBlockingCallCount() const; + uint32_t GetCouldBeBlockingCallCount() const; +#endif + SocketServer* socketserver(); // Note: The behavior of Thread has changed. When a thread is stopped, @@ -577,7 +627,9 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { MessageList messages_ RTC_GUARDED_BY(crit_); PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_); -#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) +#if RTC_DCHECK_IS_ON + uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0; + uint32_t could_be_blocking_call_count_ RTC_GUARDED_BY(this) = 0; std::vector allowed_threads_ RTC_GUARDED_BY(this); bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false; #endif diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index eb3b4ec003..86e429e72f 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -256,6 +256,47 @@ TEST(ThreadTest, DISABLED_Main) { EXPECT_EQ(55, sock_client.last); } +TEST(ThreadTest, CountBlockingCalls) { + // When the test runs, this will print out: + // (thread_unittest.cc:262): Blocking TestBody: total=2 (actual=1, could=1) + RTC_LOG_THREAD_BLOCK_COUNT(); +#if RTC_DCHECK_IS_ON + rtc::Thread* current = rtc::Thread::Current(); + ASSERT_TRUE(current); + rtc::Thread::ScopedCountBlockingCalls blocked_calls( + [&](uint32_t actual_block, uint32_t could_block) { + EXPECT_EQ(1u, actual_block); + EXPECT_EQ(1u, could_block); + }); + + EXPECT_EQ(0u, blocked_calls.GetBlockingCallCount()); + EXPECT_EQ(0u, blocked_calls.GetCouldBeBlockingCallCount()); + EXPECT_EQ(0u, blocked_calls.GetTotalBlockedCallCount()); + + // Test invoking on the current thread. This should not count as an 'actual' + // invoke, but should still count as an invoke that could block since we + // that the call to Invoke serves a purpose in some configurations (and should + // not be used a general way to call methods on the same thread). + current->Invoke(RTC_FROM_HERE, []() {}); + EXPECT_EQ(0u, blocked_calls.GetBlockingCallCount()); + EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount()); + EXPECT_EQ(1u, blocked_calls.GetTotalBlockedCallCount()); + + // Create a new thread to invoke on. + auto thread = Thread::CreateWithSocketServer(); + thread->Start(); + EXPECT_EQ(42, thread->Invoke(RTC_FROM_HERE, []() { return 42; })); + EXPECT_EQ(1u, blocked_calls.GetBlockingCallCount()); + EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount()); + EXPECT_EQ(2u, blocked_calls.GetTotalBlockedCallCount()); + thread->Stop(); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(2); +#else + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); + RTC_LOG(LS_INFO) << "Test not active in this config"; +#endif +} + // Test that setting thread names doesn't cause a malfunction. // There's no easy way to verify the name was set properly at this time. TEST(ThreadTest, Names) {