diff --git a/media/BUILD.gn b/media/BUILD.gn index 55fc59c420..84f938b2d2 100644 --- a/media/BUILD.gn +++ b/media/BUILD.gn @@ -332,6 +332,8 @@ rtc_library("rtc_audio_video") { "../rtc_base/experiments:rate_control_settings", "../rtc_base/synchronization:mutex", "../rtc_base/system:rtc_export", + "../rtc_base/task_utils:pending_task_safety_flag", + "../rtc_base/task_utils:to_queued_task", "../rtc_base/third_party/base64", "../system_wrappers", "../system_wrappers:metrics", diff --git a/media/base/media_channel.h b/media/base/media_channel.h index a947b47998..95e4cc7aa0 100644 --- a/media/base/media_channel.h +++ b/media/base/media_channel.h @@ -179,7 +179,7 @@ class MediaChannel : public sigslot::has_slots<> { // Sets the abstract interface class for sending RTP/RTCP data. virtual void SetInterface(NetworkInterface* iface) RTC_LOCKS_EXCLUDED(network_interface_mutex_); - // Called when a RTP packet is received. + // Called on the network when an RTP packet is received. virtual void OnPacketReceived(rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) = 0; // Called when the socket's ability to send has changed. diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index 758e495a9f..161c075674 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -716,7 +716,8 @@ WebRtcVideoChannel::WebRtcVideoChannel( "WebRTC-Video-BufferPacketsWithUnknownSsrc") ? new UnhandledPacketsBuffer() : nullptr) { - RTC_DCHECK(thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&thread_checker_); + network_thread_checker_.Detach(); rtcp_receiver_report_ssrc_ = kDefaultRtcpReceiverReportSsrc; sending_ = false; @@ -1684,67 +1685,75 @@ void WebRtcVideoChannel::FillSendAndReceiveCodecStats( void WebRtcVideoChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { - RTC_DCHECK_RUN_ON(&thread_checker_); - const webrtc::PacketReceiver::DeliveryStatus delivery_result = - call_->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO, packet, - packet_time_us); - switch (delivery_result) { - case webrtc::PacketReceiver::DELIVERY_OK: - return; - case webrtc::PacketReceiver::DELIVERY_PACKET_ERROR: - return; - case webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC: - break; - } + RTC_DCHECK_RUN_ON(&network_thread_checker_); + // TODO(bugs.webrtc.org/11993): This code is very similar to what + // WebRtcVoiceMediaChannel::OnPacketReceived does. For maintainability and + // consistency it would be good to move the interaction with call_->Receiver() + // to a common implementation and provide a callback on the worker thread + // for the exception case (DELIVERY_UNKNOWN_SSRC) and how retry is attempted. + worker_thread_->PostTask( + ToQueuedTask(task_safety_, [this, packet, packet_time_us] { + RTC_DCHECK_RUN_ON(&thread_checker_); + const webrtc::PacketReceiver::DeliveryStatus delivery_result = + call_->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO, packet, + packet_time_us); + switch (delivery_result) { + case webrtc::PacketReceiver::DELIVERY_OK: + return; + case webrtc::PacketReceiver::DELIVERY_PACKET_ERROR: + return; + case webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC: + break; + } - uint32_t ssrc = 0; - if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) { - return; - } + uint32_t ssrc = 0; + if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) { + return; + } - if (unknown_ssrc_packet_buffer_) { - unknown_ssrc_packet_buffer_->AddPacket(ssrc, packet_time_us, packet); - return; - } + if (unknown_ssrc_packet_buffer_) { + unknown_ssrc_packet_buffer_->AddPacket(ssrc, packet_time_us, packet); + return; + } - if (discard_unknown_ssrc_packets_) { - return; - } + if (discard_unknown_ssrc_packets_) { + return; + } - int payload_type = 0; - if (!GetRtpPayloadType(packet.cdata(), packet.size(), &payload_type)) { - return; - } + int payload_type = 0; + if (!GetRtpPayloadType(packet.cdata(), packet.size(), &payload_type)) { + return; + } - // See if this payload_type is registered as one that usually gets its own - // SSRC (RTX) or at least is safe to drop either way (FEC). If it is, and - // it wasn't handled above by DeliverPacket, that means we don't know what - // stream it associates with, and we shouldn't ever create an implicit channel - // for these. - for (auto& codec : recv_codecs_) { - if (payload_type == codec.rtx_payload_type || - payload_type == codec.ulpfec.red_rtx_payload_type || - payload_type == codec.ulpfec.ulpfec_payload_type) { - return; - } - } - if (payload_type == recv_flexfec_payload_type_) { - return; - } + // See if this payload_type is registered as one that usually gets its + // own SSRC (RTX) or at least is safe to drop either way (FEC). If it + // is, and it wasn't handled above by DeliverPacket, that means we don't + // know what stream it associates with, and we shouldn't ever create an + // implicit channel for these. + for (auto& codec : recv_codecs_) { + if (payload_type == codec.rtx_payload_type || + payload_type == codec.ulpfec.red_rtx_payload_type || + payload_type == codec.ulpfec.ulpfec_payload_type) { + return; + } + } + if (payload_type == recv_flexfec_payload_type_) { + return; + } - switch (unsignalled_ssrc_handler_->OnUnsignalledSsrc(this, ssrc)) { - case UnsignalledSsrcHandler::kDropPacket: - return; - case UnsignalledSsrcHandler::kDeliverPacket: - break; - } + switch (unsignalled_ssrc_handler_->OnUnsignalledSsrc(this, ssrc)) { + case UnsignalledSsrcHandler::kDropPacket: + return; + case UnsignalledSsrcHandler::kDeliverPacket: + break; + } - if (call_->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO, packet, - packet_time_us) != - webrtc::PacketReceiver::DELIVERY_OK) { - RTC_LOG(LS_WARNING) << "Failed to deliver RTP packet on re-delivery."; - return; - } + if (call_->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO, packet, + packet_time_us) != + webrtc::PacketReceiver::DELIVERY_OK) { + RTC_LOG(LS_WARNING) << "Failed to deliver RTP packet on re-delivery."; + } + })); } void WebRtcVideoChannel::BackfillBufferedPackets( diff --git a/media/engine/webrtc_video_engine.h b/media/engine/webrtc_video_engine.h index 321a5a8c2e..327c4e7525 100644 --- a/media/engine/webrtc_video_engine.h +++ b/media/engine/webrtc_video_engine.h @@ -34,6 +34,7 @@ #include "media/engine/unhandled_packets_buffer.h" #include "rtc_base/network_route.h" #include "rtc_base/synchronization/mutex.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/thread_checker.h" @@ -554,11 +555,13 @@ class WebRtcVideoChannel : public VideoMediaChannel, RTC_EXCLUSIVE_LOCKS_REQUIRED(thread_checker_); rtc::Thread* const worker_thread_; + webrtc::ScopedTaskSafety task_safety_; + rtc::ThreadChecker network_thread_checker_; rtc::ThreadChecker thread_checker_; uint32_t rtcp_receiver_report_ssrc_ RTC_GUARDED_BY(thread_checker_); bool sending_ RTC_GUARDED_BY(thread_checker_); - webrtc::Call* const call_ RTC_GUARDED_BY(thread_checker_); + webrtc::Call* const call_; DefaultUnsignalledSsrcHandler default_unsignalled_ssrc_handler_ RTC_GUARDED_BY(thread_checker_); diff --git a/media/engine/webrtc_video_engine_unittest.cc b/media/engine/webrtc_video_engine_unittest.cc index 72fbc56885..21d1160070 100644 --- a/media/engine/webrtc_video_engine_unittest.cc +++ b/media/engine/webrtc_video_engine_unittest.cc @@ -6134,6 +6134,7 @@ TEST_F(WebRtcVideoChannelTest, DefaultReceiveStreamReconfiguresToUseRtx) { rtc::SetBE32(&data[8], ssrcs[0]); rtc::CopyOnWriteBuffer packet(data, kDataLength); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); ASSERT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()) << "No default receive stream created."; @@ -6292,6 +6293,7 @@ TEST_F(WebRtcVideoChannelTest, RecvUnsignaledSsrcWithSignaledStreamId) { rtc::SetBE32(&data[8], kIncomingUnsignalledSsrc); rtc::CopyOnWriteBuffer packet(data, kDataLength); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // The stream should now be created with the appropriate sync label. EXPECT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()); @@ -6304,6 +6306,7 @@ TEST_F(WebRtcVideoChannelTest, RecvUnsignaledSsrcWithSignaledStreamId) { EXPECT_EQ(0u, fake_call_->GetVideoReceiveStreams().size()); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); EXPECT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()); EXPECT_TRUE( fake_call_->GetVideoReceiveStreams()[0]->GetConfig().sync_group.empty()); @@ -6321,6 +6324,7 @@ TEST_F(WebRtcVideoChannelTest, rtc::SetBE32(&data[8], kIncomingUnsignalledSsrc); rtc::CopyOnWriteBuffer packet(data, kDataLength); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // Default receive stream created. const auto& receivers1 = fake_call_->GetVideoReceiveStreams(); @@ -6374,6 +6378,7 @@ TEST_F(WebRtcVideoChannelTest, BaseMinimumPlayoutDelayMsUnsignaledRecvStream) { rtc::SetBE32(&data[8], kIncomingUnsignalledSsrc); rtc::CopyOnWriteBuffer packet(data, kDataLength); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); recv_stream = fake_call_->GetVideoReceiveStream(kIncomingUnsignalledSsrc); EXPECT_EQ(recv_stream->base_mininum_playout_delay_ms(), 200); @@ -6411,6 +6416,7 @@ void WebRtcVideoChannelTest::TestReceiveUnsignaledSsrcPacket( rtc::SetBE32(&data[8], kIncomingUnsignalledSsrc); rtc::CopyOnWriteBuffer packet(data, kDataLength); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); if (expect_created_receive_stream) { EXPECT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()) @@ -6498,6 +6504,7 @@ TEST_F(WebRtcVideoChannelTest, ReceiveDifferentUnsignaledSsrc) { cricket::SetRtpHeader(data, sizeof(data), rtpHeader); rtc::CopyOnWriteBuffer packet(data, sizeof(data)); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // VP8 packet should create default receive stream. ASSERT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()); FakeVideoReceiveStream* recv_stream = fake_call_->GetVideoReceiveStreams()[0]; @@ -6519,6 +6526,7 @@ TEST_F(WebRtcVideoChannelTest, ReceiveDifferentUnsignaledSsrc) { cricket::SetRtpHeader(data, sizeof(data), rtpHeader); rtc::CopyOnWriteBuffer packet2(data, sizeof(data)); channel_->OnPacketReceived(packet2, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // VP9 packet should replace the default receive SSRC. ASSERT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()); recv_stream = fake_call_->GetVideoReceiveStreams()[0]; @@ -6541,6 +6549,7 @@ TEST_F(WebRtcVideoChannelTest, ReceiveDifferentUnsignaledSsrc) { cricket::SetRtpHeader(data, sizeof(data), rtpHeader); rtc::CopyOnWriteBuffer packet3(data, sizeof(data)); channel_->OnPacketReceived(packet3, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // H264 packet should replace the default receive SSRC. ASSERT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()); recv_stream = fake_call_->GetVideoReceiveStreams()[0]; @@ -6580,6 +6589,7 @@ TEST_F(WebRtcVideoChannelTest, cricket::SetRtpHeader(data, sizeof(data), rtp_header); rtc::CopyOnWriteBuffer packet(data, sizeof(data)); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // Default receive stream should be created. ASSERT_EQ(1u, fake_call_->GetVideoReceiveStreams().size()); FakeVideoReceiveStream* recv_stream0 = @@ -6598,6 +6608,7 @@ TEST_F(WebRtcVideoChannelTest, cricket::SetRtpHeader(data, sizeof(data), rtp_header); packet.SetData(data, sizeof(data)); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // New default receive stream should be created, but old stream should remain. ASSERT_EQ(2u, fake_call_->GetVideoReceiveStreams().size()); EXPECT_EQ(recv_stream0, fake_call_->GetVideoReceiveStreams()[0]); @@ -8211,6 +8222,7 @@ TEST_F(WebRtcVideoChannelTest, cricket::SetRtpHeader(data, sizeof(data), rtpHeader); rtc::CopyOnWriteBuffer packet(data, sizeof(data)); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); // The |ssrc| member should still be unset. rtp_parameters = channel_->GetDefaultRtpReceiveParameters(); diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc index 72871d2e9c..f6b42369a1 100644 --- a/media/engine/webrtc_voice_engine.cc +++ b/media/engine/webrtc_voice_engine.cc @@ -46,7 +46,10 @@ #include "rtc_base/strings/audio_format_to_string.h" #include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_format.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/third_party/base64/base64.h" +#include "rtc_base/thread.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/metrics.h" @@ -267,7 +270,7 @@ WebRtcVoiceEngine::WebRtcVoiceEngine( } WebRtcVoiceEngine::~WebRtcVoiceEngine() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_LOG(LS_INFO) << "WebRtcVoiceEngine::~WebRtcVoiceEngine"; if (initialized_) { StopAecDump(); @@ -281,7 +284,7 @@ WebRtcVoiceEngine::~WebRtcVoiceEngine() { } void WebRtcVoiceEngine::Init() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_LOG(LS_INFO) << "WebRtcVoiceEngine::Init"; // TaskQueue expects to be created/destroyed on the same thread. @@ -362,7 +365,7 @@ void WebRtcVoiceEngine::Init() { rtc::scoped_refptr WebRtcVoiceEngine::GetAudioState() const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); return audio_state_; } @@ -371,13 +374,13 @@ VoiceMediaChannel* WebRtcVoiceEngine::CreateMediaChannel( const MediaConfig& config, const AudioOptions& options, const webrtc::CryptoOptions& crypto_options) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); return new WebRtcVoiceMediaChannel(this, config, options, crypto_options, call); } bool WebRtcVoiceEngine::ApplyOptions(const AudioOptions& options_in) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_LOG(LS_INFO) << "WebRtcVoiceEngine::ApplyOptions: " << options_in.ToString(); AudioOptions options = options_in; // The options are modified below. @@ -621,13 +624,13 @@ WebRtcVoiceEngine::GetRtpHeaderExtensions() const { } void WebRtcVoiceEngine::RegisterChannel(WebRtcVoiceMediaChannel* channel) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(channel); channels_.push_back(channel); } void WebRtcVoiceEngine::UnregisterChannel(WebRtcVoiceMediaChannel* channel) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); auto it = absl::c_find(channels_, channel); RTC_DCHECK(it != channels_.end()); channels_.erase(it); @@ -635,7 +638,7 @@ void WebRtcVoiceEngine::UnregisterChannel(WebRtcVoiceMediaChannel* channel) { bool WebRtcVoiceEngine::StartAecDump(webrtc::FileWrapper file, int64_t max_size_bytes) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); webrtc::AudioProcessing* ap = apm(); if (!ap) { @@ -650,7 +653,7 @@ bool WebRtcVoiceEngine::StartAecDump(webrtc::FileWrapper file, } void WebRtcVoiceEngine::StopAecDump() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); webrtc::AudioProcessing* ap = apm(); if (ap) { ap->DetachAecDump(); @@ -661,18 +664,18 @@ void WebRtcVoiceEngine::StopAecDump() { } webrtc::AudioDeviceModule* WebRtcVoiceEngine::adm() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(adm_); return adm_.get(); } webrtc::AudioProcessing* WebRtcVoiceEngine::apm() const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); return apm_.get(); } webrtc::AudioState* WebRtcVoiceEngine::audio_state() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(audio_state_); return audio_state_.get(); } @@ -814,7 +817,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream WebRtcAudioSendStream& operator=(const WebRtcAudioSendStream&) = delete; ~WebRtcAudioSendStream() override { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); ClearSource(); call_->DestroyAudioSendStream(stream_); } @@ -826,7 +829,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream } void SetRtpExtensions(const std::vector& extensions) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.rtp.extensions = extensions; rtp_parameters_.header_extensions = extensions; ReconfigureAudioSendStream(); @@ -838,7 +841,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream } void SetMid(const std::string& mid) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); if (config_.rtp.mid == mid) { return; } @@ -848,14 +851,14 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream void SetFrameEncryptor( rtc::scoped_refptr frame_encryptor) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.frame_encryptor = frame_encryptor; ReconfigureAudioSendStream(); } void SetAudioNetworkAdaptorConfig( const absl::optional& audio_network_adaptor_config) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); if (audio_network_adaptor_config_from_options_ == audio_network_adaptor_config) { return; @@ -867,7 +870,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream } bool SetMaxSendBitrate(int bps) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(config_.send_codec_spec); RTC_DCHECK(audio_codec_spec_); auto send_rate = ComputeSendBitrate( @@ -890,32 +893,32 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream int payload_freq, int event, int duration_ms) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); return stream_->SendTelephoneEvent(payload_type, payload_freq, event, duration_ms); } void SetSend(bool send) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); send_ = send; UpdateSendState(); } void SetMuted(bool muted) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); stream_->SetMuted(muted); muted_ = muted; } bool muted() const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); return muted_; } webrtc::AudioSendStream::Stats GetStats(bool has_remote_tracks) const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); return stream_->GetStats(has_remote_tracks); } @@ -925,7 +928,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream // This method is called on the libjingle worker thread. // TODO(xians): Make sure Start() is called only once. void SetSource(AudioSource* source) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(source); if (source_) { RTC_DCHECK(source_ == source); @@ -940,7 +943,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream // callback will be received after this method. // This method is called on the libjingle worker thread. void ClearSource() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); if (source_) { source_->SetSink(nullptr); source_ = nullptr; @@ -976,7 +979,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream // Callback from the |source_| when it is going away. In case Start() has // never been called, this callback won't be triggered. void OnClose() override { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); // Set |source_| to nullptr to make sure no more callback will get into // the source. source_ = nullptr; @@ -1043,14 +1046,14 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream void SetEncoderToPacketizerFrameTransformer( rtc::scoped_refptr frame_transformer) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.frame_transformer = std::move(frame_transformer); ReconfigureAudioSendStream(); } private: void UpdateSendState() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); RTC_DCHECK_EQ(1UL, rtp_parameters_.encodings.size()); if (send_ && source_ != nullptr && rtp_parameters_.encodings[0].active) { @@ -1061,7 +1064,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream } void UpdateAllowedBitrateRange() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); // The order of precedence, from lowest to highest is: // - a reasonable default of 32kbps min/max // - fixed target bitrate from codec spec @@ -1093,7 +1096,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream void UpdateSendCodecSpec( const webrtc::AudioSendStream::Config::SendCodecSpec& send_codec_spec) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.send_codec_spec = send_codec_spec; auto info = config_.encoder_factory->QueryAudioEncoder(send_codec_spec.format); @@ -1136,7 +1139,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioSendStream } void ReconfigureAudioSendStream() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); stream_->Reconfigure(config_); } @@ -1218,19 +1221,19 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { WebRtcAudioReceiveStream& operator=(const WebRtcAudioReceiveStream&) = delete; ~WebRtcAudioReceiveStream() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); call_->DestroyAudioReceiveStream(stream_); } void SetFrameDecryptor( rtc::scoped_refptr frame_decryptor) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.frame_decryptor = frame_decryptor; RecreateAudioReceiveStream(); } void SetLocalSsrc(uint32_t local_ssrc) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); if (local_ssrc != config_.rtp.local_ssrc) { config_.rtp.local_ssrc = local_ssrc; RecreateAudioReceiveStream(); @@ -1239,7 +1242,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { void SetUseTransportCcAndRecreateStream(bool use_transport_cc, bool use_nack) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.rtp.transport_cc = use_transport_cc; config_.rtp.nack.rtp_history_ms = use_nack ? kNackRtpHistoryMs : 0; ReconfigureAudioReceiveStream(); @@ -1247,21 +1250,21 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { void SetRtpExtensionsAndRecreateStream( const std::vector& extensions) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.rtp.extensions = extensions; RecreateAudioReceiveStream(); } // Set a new payload type -> decoder map. void SetDecoderMap(const std::map& decoder_map) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.decoder_map = decoder_map; ReconfigureAudioReceiveStream(); } void MaybeRecreateAudioReceiveStream( const std::vector& stream_ids) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); std::string sync_group; if (!stream_ids.empty()) { sync_group = stream_ids[0]; @@ -1277,13 +1280,13 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { webrtc::AudioReceiveStream::Stats GetStats( bool get_and_clear_legacy_stats) const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); return stream_->GetStats(get_and_clear_legacy_stats); } void SetRawAudioSink(std::unique_ptr sink) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); // Need to update the stream's sink first; once raw_audio_sink_ is // reassigned, whatever was in there before is destroyed. stream_->SetSink(sink.get()); @@ -1291,13 +1294,13 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { } void SetOutputVolume(double volume) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); output_volume_ = volume; stream_->SetGain(volume); } void SetPlayout(bool playout) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); if (playout) { stream_->Start(); @@ -1307,7 +1310,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { } bool SetBaseMinimumPlayoutDelayMs(int delay_ms) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); if (stream_->SetBaseMinimumPlayoutDelayMs(delay_ms)) { // Memorize only valid delay because during stream recreation it will be @@ -1324,13 +1327,13 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { } int GetBaseMinimumPlayoutDelayMs() const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); return stream_->GetBaseMinimumPlayoutDelayMs(); } std::vector GetSources() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); return stream_->GetSources(); } @@ -1346,14 +1349,14 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { void SetDepacketizerToDecoderFrameTransformer( rtc::scoped_refptr frame_transformer) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); config_.frame_transformer = std::move(frame_transformer); ReconfigureAudioReceiveStream(); } private: void RecreateAudioReceiveStream() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); bool was_running = false; if (stream_) { was_running = stream_->IsRunning(); @@ -1368,7 +1371,7 @@ class WebRtcVoiceMediaChannel::WebRtcAudioReceiveStream { } void ReconfigureAudioReceiveStream() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK(stream_); stream_->Reconfigure(config_); } @@ -1390,12 +1393,15 @@ WebRtcVoiceMediaChannel::WebRtcVoiceMediaChannel( const webrtc::CryptoOptions& crypto_options, webrtc::Call* call) : VoiceMediaChannel(config), + worker_thread_(rtc::Thread::Current()), engine_(engine), call_(call), audio_config_(config.audio), crypto_options_(crypto_options), audio_red_for_opus_trial_enabled_( IsEnabled(call->trials(), "WebRTC-Audio-Red-For-Opus")) { + RTC_DCHECK_RUN_ON(worker_thread_); + network_thread_checker_.Detach(); RTC_LOG(LS_VERBOSE) << "WebRtcVoiceMediaChannel::WebRtcVoiceMediaChannel"; RTC_DCHECK(call); engine->RegisterChannel(this); @@ -1403,7 +1409,7 @@ WebRtcVoiceMediaChannel::WebRtcVoiceMediaChannel( } WebRtcVoiceMediaChannel::~WebRtcVoiceMediaChannel() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_VERBOSE) << "WebRtcVoiceMediaChannel::~WebRtcVoiceMediaChannel"; // TODO(solenberg): Should be able to delete the streams directly, without // going through RemoveNnStream(), once stream objects handle @@ -1420,7 +1426,7 @@ WebRtcVoiceMediaChannel::~WebRtcVoiceMediaChannel() { bool WebRtcVoiceMediaChannel::SetSendParameters( const AudioSendParameters& params) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::SetSendParameters"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "WebRtcVoiceMediaChannel::SetSendParameters: " << params.ToString(); // TODO(pthatcher): Refactor this to be more clean now that we have @@ -1466,7 +1472,7 @@ bool WebRtcVoiceMediaChannel::SetSendParameters( bool WebRtcVoiceMediaChannel::SetRecvParameters( const AudioRecvParameters& params) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::SetRecvParameters"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "WebRtcVoiceMediaChannel::SetRecvParameters: " << params.ToString(); // TODO(pthatcher): Refactor this to be more clean now that we have @@ -1493,7 +1499,7 @@ bool WebRtcVoiceMediaChannel::SetRecvParameters( webrtc::RtpParameters WebRtcVoiceMediaChannel::GetRtpSendParameters( uint32_t ssrc) const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); auto it = send_streams_.find(ssrc); if (it == send_streams_.end()) { RTC_LOG(LS_WARNING) << "Attempting to get RTP send parameters for stream " @@ -1514,7 +1520,7 @@ webrtc::RtpParameters WebRtcVoiceMediaChannel::GetRtpSendParameters( webrtc::RTCError WebRtcVoiceMediaChannel::SetRtpSendParameters( uint32_t ssrc, const webrtc::RtpParameters& parameters) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); auto it = send_streams_.find(ssrc); if (it == send_streams_.end()) { RTC_LOG(LS_WARNING) << "Attempting to set RTP send parameters for stream " @@ -1569,7 +1575,7 @@ webrtc::RTCError WebRtcVoiceMediaChannel::SetRtpSendParameters( webrtc::RtpParameters WebRtcVoiceMediaChannel::GetRtpReceiveParameters( uint32_t ssrc) const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); webrtc::RtpParameters rtp_params; auto it = recv_streams_.find(ssrc); if (it == recv_streams_.end()) { @@ -1589,7 +1595,7 @@ webrtc::RtpParameters WebRtcVoiceMediaChannel::GetRtpReceiveParameters( webrtc::RtpParameters WebRtcVoiceMediaChannel::GetDefaultRtpReceiveParameters() const { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); webrtc::RtpParameters rtp_params; if (!default_sink_) { RTC_LOG(LS_WARNING) << "Attempting to get RTP parameters for the default, " @@ -1606,7 +1612,7 @@ webrtc::RtpParameters WebRtcVoiceMediaChannel::GetDefaultRtpReceiveParameters() } bool WebRtcVoiceMediaChannel::SetOptions(const AudioOptions& options) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "Setting voice channel options: " << options.ToString(); // We retain all of the existing options, and apply the given ones @@ -1632,7 +1638,7 @@ bool WebRtcVoiceMediaChannel::SetOptions(const AudioOptions& options) { bool WebRtcVoiceMediaChannel::SetRecvCodecs( const std::vector& codecs) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); // Set the payload types to be used for incoming media. RTC_LOG(LS_INFO) << "Setting receive voice codecs."; @@ -1714,7 +1720,7 @@ bool WebRtcVoiceMediaChannel::SetRecvCodecs( // and receive streams may be reconfigured based on the new settings. bool WebRtcVoiceMediaChannel::SetSendCodecs( const std::vector& codecs) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); dtmf_payload_type_ = absl::nullopt; dtmf_payload_freq_ = -1; @@ -1861,7 +1867,7 @@ bool WebRtcVoiceMediaChannel::SetSendCodecs( void WebRtcVoiceMediaChannel::SetPlayout(bool playout) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::SetPlayout"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); if (playout_ == playout) { return; } @@ -1904,7 +1910,7 @@ bool WebRtcVoiceMediaChannel::SetAudioSend(uint32_t ssrc, bool enable, const AudioOptions* options, AudioSource* source) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); // TODO(solenberg): The state change should be fully rolled back if any one of // these calls fail. if (!SetLocalSource(ssrc, source)) { @@ -1921,7 +1927,7 @@ bool WebRtcVoiceMediaChannel::SetAudioSend(uint32_t ssrc, bool WebRtcVoiceMediaChannel::AddSendStream(const StreamParams& sp) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::AddSendStream"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "AddSendStream: " << sp.ToString(); uint32_t ssrc = sp.first_ssrc(); @@ -1960,7 +1966,7 @@ bool WebRtcVoiceMediaChannel::AddSendStream(const StreamParams& sp) { bool WebRtcVoiceMediaChannel::RemoveSendStream(uint32_t ssrc) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::RemoveSendStream"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "RemoveSendStream: " << ssrc; auto it = send_streams_.find(ssrc); @@ -1986,7 +1992,7 @@ bool WebRtcVoiceMediaChannel::RemoveSendStream(uint32_t ssrc) { bool WebRtcVoiceMediaChannel::AddRecvStream(const StreamParams& sp) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::AddRecvStream"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "AddRecvStream: " << sp.ToString(); if (!sp.has_ssrcs()) { @@ -2032,7 +2038,7 @@ bool WebRtcVoiceMediaChannel::AddRecvStream(const StreamParams& sp) { bool WebRtcVoiceMediaChannel::RemoveRecvStream(uint32_t ssrc) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::RemoveRecvStream"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "RemoveRecvStream: " << ssrc; const auto it = recv_streams_.find(ssrc); @@ -2051,7 +2057,7 @@ bool WebRtcVoiceMediaChannel::RemoveRecvStream(uint32_t ssrc) { } void WebRtcVoiceMediaChannel::ResetUnsignaledRecvStream() { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "ResetUnsignaledRecvStream."; unsignaled_stream_params_ = StreamParams(); // Create a copy since RemoveRecvStream will modify |unsignaled_recv_ssrcs_|. @@ -2085,7 +2091,7 @@ bool WebRtcVoiceMediaChannel::SetLocalSource(uint32_t ssrc, } bool WebRtcVoiceMediaChannel::SetOutputVolume(uint32_t ssrc, double volume) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << rtc::StringFormat("WRVMC::%s({ssrc=%u}, {volume=%.2f})", __func__, ssrc, volume); const auto it = recv_streams_.find(ssrc); @@ -2103,7 +2109,7 @@ bool WebRtcVoiceMediaChannel::SetOutputVolume(uint32_t ssrc, double volume) { } bool WebRtcVoiceMediaChannel::SetDefaultOutputVolume(double volume) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); default_recv_volume_ = volume; for (uint32_t ssrc : unsignaled_recv_ssrcs_) { const auto it = recv_streams_.find(ssrc); @@ -2120,7 +2126,7 @@ bool WebRtcVoiceMediaChannel::SetDefaultOutputVolume(double volume) { bool WebRtcVoiceMediaChannel::SetBaseMinimumPlayoutDelayMs(uint32_t ssrc, int delay_ms) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); std::vector ssrcs(1, ssrc); // SSRC of 0 represents the default receive stream. if (ssrc == 0) { @@ -2163,7 +2169,7 @@ bool WebRtcVoiceMediaChannel::CanInsertDtmf() { void WebRtcVoiceMediaChannel::SetFrameDecryptor( uint32_t ssrc, rtc::scoped_refptr frame_decryptor) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); auto matching_stream = recv_streams_.find(ssrc); if (matching_stream != recv_streams_.end()) { matching_stream->second->SetFrameDecryptor(frame_decryptor); @@ -2177,7 +2183,7 @@ void WebRtcVoiceMediaChannel::SetFrameDecryptor( void WebRtcVoiceMediaChannel::SetFrameEncryptor( uint32_t ssrc, rtc::scoped_refptr frame_encryptor) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); auto matching_stream = send_streams_.find(ssrc); if (matching_stream != send_streams_.end()) { matching_stream->second->SetFrameEncryptor(frame_encryptor); @@ -2187,7 +2193,7 @@ void WebRtcVoiceMediaChannel::SetFrameEncryptor( bool WebRtcVoiceMediaChannel::InsertDtmf(uint32_t ssrc, int event, int duration) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_INFO) << "WebRtcVoiceMediaChannel::InsertDtmf"; if (!CanInsertDtmf()) { return false; @@ -2210,78 +2216,88 @@ bool WebRtcVoiceMediaChannel::InsertDtmf(uint32_t ssrc, void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(&network_thread_checker_); + // TODO(bugs.webrtc.org/11993): This code is very similar to what + // WebRtcVideoChannel::OnPacketReceived does. For maintainability and + // consistency it would be good to move the interaction with call_->Receiver() + // to a common implementation and provide a callback on the worker thread + // for the exception case (DELIVERY_UNKNOWN_SSRC) and how retry is attempted. + worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, packet, + packet_time_us] { + RTC_DCHECK_RUN_ON(worker_thread_); - webrtc::PacketReceiver::DeliveryStatus delivery_result = - call_->Receiver()->DeliverPacket(webrtc::MediaType::AUDIO, packet, - packet_time_us); + webrtc::PacketReceiver::DeliveryStatus delivery_result = + call_->Receiver()->DeliverPacket(webrtc::MediaType::AUDIO, packet, + packet_time_us); - if (delivery_result != webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC) { - return; - } - - // Create an unsignaled receive stream for this previously not received ssrc. - // If there already is N unsignaled receive streams, delete the oldest. - // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=5208 - uint32_t ssrc = 0; - if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) { - return; - } - RTC_DCHECK(!absl::c_linear_search(unsignaled_recv_ssrcs_, ssrc)); - - // Add new stream. - StreamParams sp = unsignaled_stream_params_; - sp.ssrcs.push_back(ssrc); - RTC_LOG(LS_INFO) << "Creating unsignaled receive stream for SSRC=" << ssrc; - if (!AddRecvStream(sp)) { - RTC_LOG(LS_WARNING) << "Could not create unsignaled receive stream."; - return; - } - unsignaled_recv_ssrcs_.push_back(ssrc); - RTC_HISTOGRAM_COUNTS_LINEAR("WebRTC.Audio.NumOfUnsignaledStreams", - unsignaled_recv_ssrcs_.size(), 1, 100, 101); - - // Remove oldest unsignaled stream, if we have too many. - if (unsignaled_recv_ssrcs_.size() > kMaxUnsignaledRecvStreams) { - uint32_t remove_ssrc = unsignaled_recv_ssrcs_.front(); - RTC_DLOG(LS_INFO) << "Removing unsignaled receive stream with SSRC=" - << remove_ssrc; - RemoveRecvStream(remove_ssrc); - } - RTC_DCHECK_GE(kMaxUnsignaledRecvStreams, unsignaled_recv_ssrcs_.size()); - - SetOutputVolume(ssrc, default_recv_volume_); - SetBaseMinimumPlayoutDelayMs(ssrc, default_recv_base_minimum_delay_ms_); - - // The default sink can only be attached to one stream at a time, so we hook - // it up to the *latest* unsignaled stream we've seen, in order to support the - // case where the SSRC of one unsignaled stream changes. - if (default_sink_) { - for (uint32_t drop_ssrc : unsignaled_recv_ssrcs_) { - auto it = recv_streams_.find(drop_ssrc); - it->second->SetRawAudioSink(nullptr); + if (delivery_result != webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC) { + return; } - std::unique_ptr proxy_sink( - new ProxySink(default_sink_.get())); - SetRawAudioSink(ssrc, std::move(proxy_sink)); - } - delivery_result = call_->Receiver()->DeliverPacket(webrtc::MediaType::AUDIO, - packet, packet_time_us); - RTC_DCHECK_NE(webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC, delivery_result); + // Create an unsignaled receive stream for this previously not received + // ssrc. If there already is N unsignaled receive streams, delete the + // oldest. See: https://bugs.chromium.org/p/webrtc/issues/detail?id=5208 + uint32_t ssrc = 0; + if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) { + return; + } + RTC_DCHECK(!absl::c_linear_search(unsignaled_recv_ssrcs_, ssrc)); + + // Add new stream. + StreamParams sp = unsignaled_stream_params_; + sp.ssrcs.push_back(ssrc); + RTC_LOG(LS_INFO) << "Creating unsignaled receive stream for SSRC=" << ssrc; + if (!AddRecvStream(sp)) { + RTC_LOG(LS_WARNING) << "Could not create unsignaled receive stream."; + return; + } + unsignaled_recv_ssrcs_.push_back(ssrc); + RTC_HISTOGRAM_COUNTS_LINEAR("WebRTC.Audio.NumOfUnsignaledStreams", + unsignaled_recv_ssrcs_.size(), 1, 100, 101); + + // Remove oldest unsignaled stream, if we have too many. + if (unsignaled_recv_ssrcs_.size() > kMaxUnsignaledRecvStreams) { + uint32_t remove_ssrc = unsignaled_recv_ssrcs_.front(); + RTC_DLOG(LS_INFO) << "Removing unsignaled receive stream with SSRC=" + << remove_ssrc; + RemoveRecvStream(remove_ssrc); + } + RTC_DCHECK_GE(kMaxUnsignaledRecvStreams, unsignaled_recv_ssrcs_.size()); + + SetOutputVolume(ssrc, default_recv_volume_); + SetBaseMinimumPlayoutDelayMs(ssrc, default_recv_base_minimum_delay_ms_); + + // The default sink can only be attached to one stream at a time, so we hook + // it up to the *latest* unsignaled stream we've seen, in order to support + // the case where the SSRC of one unsignaled stream changes. + if (default_sink_) { + for (uint32_t drop_ssrc : unsignaled_recv_ssrcs_) { + auto it = recv_streams_.find(drop_ssrc); + it->second->SetRawAudioSink(nullptr); + } + std::unique_ptr proxy_sink( + new ProxySink(default_sink_.get())); + SetRawAudioSink(ssrc, std::move(proxy_sink)); + } + + delivery_result = call_->Receiver()->DeliverPacket(webrtc::MediaType::AUDIO, + packet, packet_time_us); + RTC_DCHECK_NE(webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC, + delivery_result); + })); } void WebRtcVoiceMediaChannel::OnNetworkRouteChanged( const std::string& transport_name, const rtc::NetworkRoute& network_route) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name, network_route); call_->OnAudioTransportOverheadChanged(network_route.packet_overhead); } bool WebRtcVoiceMediaChannel::MuteStream(uint32_t ssrc, bool muted) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); const auto it = send_streams_.find(ssrc); if (it == send_streams_.end()) { RTC_LOG(LS_WARNING) << "The specified ssrc " << ssrc << " is not in use."; @@ -2319,7 +2335,7 @@ bool WebRtcVoiceMediaChannel::SetMaxSendBitrate(int bps) { } void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready."); call_->SignalChannelNetworkState( webrtc::MediaType::AUDIO, @@ -2329,7 +2345,7 @@ void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) { bool WebRtcVoiceMediaChannel::GetStats(VoiceMediaInfo* info, bool get_and_clear_legacy_stats) { TRACE_EVENT0("webrtc", "WebRtcVoiceMediaChannel::GetStats"); - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK(info); // Get SSRC and stats for each sender. @@ -2460,7 +2476,7 @@ bool WebRtcVoiceMediaChannel::GetStats(VoiceMediaInfo* info, void WebRtcVoiceMediaChannel::SetRawAudioSink( uint32_t ssrc, std::unique_ptr sink) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_VERBOSE) << "WebRtcVoiceMediaChannel::SetRawAudioSink: ssrc:" << ssrc << " " << (sink ? "(ptr)" : "NULL"); const auto it = recv_streams_.find(ssrc); @@ -2473,7 +2489,7 @@ void WebRtcVoiceMediaChannel::SetRawAudioSink( void WebRtcVoiceMediaChannel::SetDefaultRawAudioSink( std::unique_ptr sink) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_LOG(LS_VERBOSE) << "WebRtcVoiceMediaChannel::SetDefaultRawAudioSink:"; if (!unsignaled_recv_ssrcs_.empty()) { std::unique_ptr proxy_sink( @@ -2497,7 +2513,7 @@ std::vector WebRtcVoiceMediaChannel::GetSources( void WebRtcVoiceMediaChannel::SetEncoderToPacketizerFrameTransformer( uint32_t ssrc, rtc::scoped_refptr frame_transformer) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); auto matching_stream = send_streams_.find(ssrc); if (matching_stream == send_streams_.end()) { RTC_LOG(LS_INFO) << "Attempting to set frame transformer for SSRC:" << ssrc @@ -2511,7 +2527,7 @@ void WebRtcVoiceMediaChannel::SetEncoderToPacketizerFrameTransformer( void WebRtcVoiceMediaChannel::SetDepacketizerToDecoderFrameTransformer( uint32_t ssrc, rtc::scoped_refptr frame_transformer) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); auto matching_stream = recv_streams_.find(ssrc); if (matching_stream == recv_streams_.end()) { RTC_LOG(LS_INFO) << "Attempting to set frame transformer for SSRC:" << ssrc @@ -2524,7 +2540,7 @@ void WebRtcVoiceMediaChannel::SetDepacketizerToDecoderFrameTransformer( bool WebRtcVoiceMediaChannel::MaybeDeregisterUnsignaledRecvStream( uint32_t ssrc) { - RTC_DCHECK(worker_thread_checker_.IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread_); auto it = absl::c_find(unsignaled_recv_ssrcs_, ssrc); if (it != unsignaled_recv_ssrcs_.end()) { unsignaled_recv_ssrcs_.erase(it); diff --git a/media/engine/webrtc_voice_engine.h b/media/engine/webrtc_voice_engine.h index 81254e3c9b..7cf184ce91 100644 --- a/media/engine/webrtc_voice_engine.h +++ b/media/engine/webrtc_voice_engine.h @@ -29,6 +29,7 @@ #include "rtc_base/buffer.h" #include "rtc_base/network_route.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread_checker.h" namespace webrtc { @@ -284,7 +285,9 @@ class WebRtcVoiceMediaChannel final : public VoiceMediaChannel, // unsignaled anymore (i.e. it is now removed, or signaled), and return true. bool MaybeDeregisterUnsignaledRecvStream(uint32_t ssrc); - rtc::ThreadChecker worker_thread_checker_; + webrtc::TaskQueueBase* const worker_thread_; + webrtc::ScopedTaskSafety task_safety_; + rtc::ThreadChecker network_thread_checker_; WebRtcVoiceEngine* const engine_ = nullptr; std::vector send_codecs_; diff --git a/media/engine/webrtc_voice_engine_unittest.cc b/media/engine/webrtc_voice_engine_unittest.cc index 87678be087..7fd138736a 100644 --- a/media/engine/webrtc_voice_engine_unittest.cc +++ b/media/engine/webrtc_voice_engine_unittest.cc @@ -277,6 +277,7 @@ class WebRtcVoiceEngineTestFake : public ::testing::TestWithParam { void DeliverPacket(const void* data, int len) { rtc::CopyOnWriteBuffer packet(reinterpret_cast(data), len); channel_->OnPacketReceived(packet, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); } void TearDown() override { delete channel_; } @@ -3443,6 +3444,8 @@ TEST_P(WebRtcVoiceEngineTestFake, DeliverAudioPacket_Call) { call_.GetAudioReceiveStream(kAudioSsrc); EXPECT_EQ(0, s->received_packets()); channel_->OnPacketReceived(kPcmuPacket, /* packet_time_us */ -1); + rtc::Thread::Current()->ProcessMessages(0); + EXPECT_EQ(1, s->received_packets()); } diff --git a/pc/channel.cc b/pc/channel.cc index ff983d13d5..4368be3c68 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -155,7 +155,7 @@ BaseChannel::~BaseChannel() { // Eats any outstanding messages or packets. alive_->SetNotAlive(); - worker_thread_->Clear(this); + signaling_thread_->Clear(this); // The media channel is destroyed at the end of the destructor, since it // is a std::unique_ptr. The transport channel (rtp_transport) must outlive // the media channel. @@ -534,13 +534,7 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { return; } - auto packet_buffer = parsed_packet.Buffer(); - - worker_thread_->PostTask( - ToQueuedTask(alive_, [this, packet_buffer, packet_time_us] { - RTC_DCHECK_RUN_ON(worker_thread()); - media_channel_->OnPacketReceived(packet_buffer, packet_time_us); - })); + media_channel_->OnPacketReceived(parsed_packet.Buffer(), packet_time_us); } void BaseChannel::EnableMedia_w() { diff --git a/pc/channel.h b/pc/channel.h index 8240582595..3d2d67f876 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -314,8 +314,6 @@ class BaseChannel : public ChannelInterface, // ChannelInterface overrides RtpHeaderExtensions GetNegotiatedRtpHeaderExtensions() const override; - bool has_received_packet_ = false; - private: bool ConnectToRtpTransport(); void DisconnectFromRtpTransport(); @@ -333,6 +331,8 @@ class BaseChannel : public ChannelInterface, const std::string content_name_; + bool has_received_packet_ = false; + // Won't be set when using raw packet transports. SDP-specific thing. // TODO(bugs.webrtc.org/12230): Written on network thread, read on // worker thread (at least).