From 236d7e7e46dd5bc0151f24909fde649b77382f4a Mon Sep 17 00:00:00 2001 From: Tommi Date: Wed, 26 Jan 2022 11:11:06 +0100 Subject: [PATCH] Factor out access to `Call::receive_rtp_config_` This CL adds a SequenceChecker, receive_11993_checker_, specifically for variables that need to move to the network thread. Once migrated, the checker will be replaced with a check for the network thread. In the meantime, the checker will match with one of worker [x]or network threads. As a first step, this checker is used to isolate access to `receive_rtp_config_` which is used from object factory paths (Create/ Destroy routines) as well as paths that handle network packets. Bug: webrtc:11993 Change-Id: Ia58423583cf99492018f218eb1640535e3919193 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249080 Reviewed-by: Niels Moller Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#35800} --- call/call.cc | 111 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 45 deletions(-) diff --git a/call/call.cc b/call/call.cc index d86acdf263..3b7c071e75 100644 --- a/call/call.cc +++ b/call/call.cc @@ -349,9 +349,15 @@ class Call final : public webrtc::Call, void ConfigureSync(const std::string& sync_group) RTC_RUN_ON(worker_thread_); void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, - MediaType media_type) + MediaType media_type, + bool use_send_side_bwe) RTC_RUN_ON(worker_thread_); + bool IdentifyReceivedPacket(RtpPacketReceived& packet, + bool* use_send_side_bwe = nullptr); + bool RegisterReceiveStream(uint32_t ssrc, ReceiveStream* stream); + bool UnregisterReceiveStream(uint32_t ssrc); + void UpdateAggregateNetworkState(); // Ensure that necessary process threads are started, and any required @@ -402,10 +408,12 @@ class Call final : public webrtc::Call, // This extra map is used for receive processing which is // independent of media type. + RTC_NO_UNIQUE_ADDRESS SequenceChecker receive_11993_checker_; + // TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the // network thread. std::map receive_rtp_config_ - RTC_GUARDED_BY(worker_thread_); + RTC_GUARDED_BY(&receive_11993_checker_); // Audio and Video send streams are owned by the client that creates them. std::map audio_send_ssrcs_ @@ -468,6 +476,8 @@ class Call final : public webrtc::Call, bool is_started_ RTC_GUARDED_BY(worker_thread_) = false; + // Sequence checker for outgoing network traffic. Could be the network thread. + // Could also be a pacer owned thread or TQ such as the TaskQueuePacedSender. RTC_NO_UNIQUE_ADDRESS SequenceChecker sent_packet_sequence_checker_; absl::optional last_sent_packet_ RTC_GUARDED_BY(sent_packet_sequence_checker_); @@ -818,6 +828,7 @@ Call::Call(Clock* clock, RTC_DCHECK(network_thread_); RTC_DCHECK(worker_thread_->IsCurrent()); + receive_11993_checker_.Detach(); send_transport_sequence_checker_.Detach(); sent_packet_sequence_checker_.Detach(); @@ -965,7 +976,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( // TODO(bugs.webrtc.org/11993): Update the below on the network thread. // We could possibly set up the audio_receiver_controller_ association up // as part of the async setup. - receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream); + RegisterReceiveStream(config.rtp.remote_ssrc, receive_stream); ConfigureSync(config.sync_group); @@ -1003,7 +1014,7 @@ void Call::DestroyAudioReceiveStream( sync_stream_mapping_.erase(it); ConfigureSync(config.sync_group); } - receive_rtp_config_.erase(ssrc); + UnregisterReceiveStream(ssrc); UpdateAggregateNetworkState(); // TODO(bugs.webrtc.org/11993): Consider if deleting `audio_receive_stream` @@ -1148,9 +1159,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( // stream. Since the transport_send_cc negotiation is per payload // type, we may get an incorrect value for the rtx stream, but // that is unlikely to matter in practice. - receive_rtp_config_.emplace(rtp.rtx_ssrc, receive_stream); + RegisterReceiveStream(rtp.rtx_ssrc, receive_stream); } - receive_rtp_config_.emplace(rtp.remote_ssrc, receive_stream); + RegisterReceiveStream(rtp.remote_ssrc, receive_stream); video_receive_streams_.insert(receive_stream); ConfigureSync(receive_stream->sync_group()); @@ -1175,9 +1186,9 @@ void Call::DestroyVideoReceiveStream( // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a // separate SSRC there can be either one or two. - receive_rtp_config_.erase(rtp.remote_ssrc); + UnregisterReceiveStream(rtp.remote_ssrc); if (rtp.rtx_ssrc) { - receive_rtp_config_.erase(rtp.rtx_ssrc); + UnregisterReceiveStream(rtp.rtx_ssrc); } video_receive_streams_.erase(receive_stream_impl); ConfigureSync(receive_stream_impl->sync_group()); @@ -1210,10 +1221,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network // thread. receive_stream->RegisterWithTransport(&video_receiver_controller_); - - RTC_DCHECK(receive_rtp_config_.find(config.rtp.remote_ssrc) == - receive_rtp_config_.end()); - receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream); + RegisterReceiveStream(config.rtp.remote_ssrc, receive_stream); // TODO(brandtr): Store config in RtcEventLog here. @@ -1231,7 +1239,7 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { RTC_DCHECK(receive_stream != nullptr); const FlexfecReceiveStream::RtpConfig& rtp = receive_stream->rtp_config(); - receive_rtp_config_.erase(rtp.remote_ssrc); + UnregisterReceiveStream(rtp.remote_ssrc); // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be // destroyed. @@ -1583,22 +1591,11 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO || is_keep_alive_packet); - auto it = receive_rtp_config_.find(parsed_packet.Ssrc()); - if (it == receive_rtp_config_.end()) { - RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc " - << parsed_packet.Ssrc(); - // Destruction of the receive stream, including deregistering from the - // RtpDemuxer, is not protected by the `worker_thread_`. - // But deregistering in the `receive_rtp_config_` map is. So by not passing - // the packet on to demuxing in this case, we prevent incoming packets to be - // passed on via the demuxer to a receive stream which is being torned down. + bool use_send_side_bwe = false; + if (!IdentifyReceivedPacket(parsed_packet, &use_send_side_bwe)) return DELIVERY_UNKNOWN_SSRC; - } - parsed_packet.IdentifyExtensions( - RtpHeaderExtensionMap(it->second->rtp_config().extensions)); - - NotifyBweOfReceivedPacket(parsed_packet, media_type); + NotifyBweOfReceivedPacket(parsed_packet, media_type, use_send_side_bwe); // RateCounters expect input parameter as int, save it as int, // instead of converting each time it is passed to RateCounter::Add below. @@ -1649,20 +1646,8 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { parsed_packet.set_recovered(true); - auto it = receive_rtp_config_.find(parsed_packet.Ssrc()); - if (it == receive_rtp_config_.end()) { - RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc " - << parsed_packet.Ssrc(); - // Destruction of the receive stream, including deregistering from the - // RtpDemuxer, is not protected by the `worker_thread_`. - // But deregistering in the `receive_rtp_config_` map is. - // So by not passing the packet on to demuxing in this case, we prevent - // incoming packets to be passed on via the demuxer to a receive stream - // which is being torn down. + if (!IdentifyReceivedPacket(parsed_packet)) return; - } - parsed_packet.IdentifyExtensions( - RtpHeaderExtensionMap(it->second->rtp_config().extensions)); // TODO(brandtr): Update here when we support protecting audio packets too. parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency); @@ -1671,11 +1656,8 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { // RTC_RUN_ON(worker_thread_) void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, - MediaType media_type) { - auto it = receive_rtp_config_.find(packet.Ssrc()); - bool use_send_side_bwe = (it != receive_rtp_config_.end()) && - UseSendSideBwe(it->second->rtp_config()); - + MediaType media_type, + bool use_send_side_bwe) { RTPHeader header; packet.GetHeader(&header); @@ -1706,6 +1688,45 @@ void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, } } +bool Call::IdentifyReceivedPacket(RtpPacketReceived& packet, + bool* use_send_side_bwe /*= nullptr*/) { + RTC_DCHECK_RUN_ON(&receive_11993_checker_); + auto it = receive_rtp_config_.find(packet.Ssrc()); + if (it == receive_rtp_config_.end()) { + RTC_DLOG(LS_WARNING) << "receive_rtp_config_ lookup failed for ssrc " + << packet.Ssrc(); + return false; + } + + packet.IdentifyExtensions( + RtpHeaderExtensionMap(it->second->rtp_config().extensions)); + + if (use_send_side_bwe) { + *use_send_side_bwe = UseSendSideBwe(it->second->rtp_config()); + } + + return true; +} + +bool Call::RegisterReceiveStream(uint32_t ssrc, ReceiveStream* stream) { + RTC_DCHECK_RUN_ON(&receive_11993_checker_); + RTC_DCHECK(stream); + auto inserted = receive_rtp_config_.emplace(ssrc, stream); + if (!inserted.second) { + RTC_DLOG(LS_WARNING) << "ssrc already registered: " << ssrc; + } + return inserted.second; +} + +bool Call::UnregisterReceiveStream(uint32_t ssrc) { + RTC_DCHECK_RUN_ON(&receive_11993_checker_); + size_t erased = receive_rtp_config_.erase(ssrc); + if (!erased) { + RTC_DLOG(LS_WARNING) << "ssrc wasn't registered: " << ssrc; + } + return erased != 0u; +} + } // namespace internal } // namespace webrtc