Factor out access to Call::receive_rtp_config_

This CL adds a SequenceChecker, receive_11993_checker_, specifically for
variables that need to move to the network thread. Once migrated,
the checker will be replaced with a check for the network thread.

In the meantime, the checker will match with one of worker [x]or
network threads.

As a first step, this checker is used to isolate access to
`receive_rtp_config_` which is used from object factory paths (Create/
Destroy routines) as well as paths that handle network packets.

Bug: webrtc:11993
Change-Id: Ia58423583cf99492018f218eb1640535e3919193
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249080
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35800}
This commit is contained in:
Tommi 2022-01-26 11:11:06 +01:00 committed by WebRTC LUCI CQ
parent e20865b3ef
commit 236d7e7e46

View File

@ -349,9 +349,15 @@ class Call final : public webrtc::Call,
void ConfigureSync(const std::string& sync_group) RTC_RUN_ON(worker_thread_);
void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type)
MediaType media_type,
bool use_send_side_bwe)
RTC_RUN_ON(worker_thread_);
bool IdentifyReceivedPacket(RtpPacketReceived& packet,
bool* use_send_side_bwe = nullptr);
bool RegisterReceiveStream(uint32_t ssrc, ReceiveStream* stream);
bool UnregisterReceiveStream(uint32_t ssrc);
void UpdateAggregateNetworkState();
// Ensure that necessary process threads are started, and any required
@ -402,10 +408,12 @@ class Call final : public webrtc::Call,
// This extra map is used for receive processing which is
// independent of media type.
RTC_NO_UNIQUE_ADDRESS SequenceChecker receive_11993_checker_;
// TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the
// network thread.
std::map<uint32_t, ReceiveStream*> receive_rtp_config_
RTC_GUARDED_BY(worker_thread_);
RTC_GUARDED_BY(&receive_11993_checker_);
// Audio and Video send streams are owned by the client that creates them.
std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_
@ -468,6 +476,8 @@ class Call final : public webrtc::Call,
bool is_started_ RTC_GUARDED_BY(worker_thread_) = false;
// Sequence checker for outgoing network traffic. Could be the network thread.
// Could also be a pacer owned thread or TQ such as the TaskQueuePacedSender.
RTC_NO_UNIQUE_ADDRESS SequenceChecker sent_packet_sequence_checker_;
absl::optional<rtc::SentPacket> last_sent_packet_
RTC_GUARDED_BY(sent_packet_sequence_checker_);
@ -818,6 +828,7 @@ Call::Call(Clock* clock,
RTC_DCHECK(network_thread_);
RTC_DCHECK(worker_thread_->IsCurrent());
receive_11993_checker_.Detach();
send_transport_sequence_checker_.Detach();
sent_packet_sequence_checker_.Detach();
@ -965,7 +976,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
// TODO(bugs.webrtc.org/11993): Update the below on the network thread.
// We could possibly set up the audio_receiver_controller_ association up
// as part of the async setup.
receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream);
RegisterReceiveStream(config.rtp.remote_ssrc, receive_stream);
ConfigureSync(config.sync_group);
@ -1003,7 +1014,7 @@ void Call::DestroyAudioReceiveStream(
sync_stream_mapping_.erase(it);
ConfigureSync(config.sync_group);
}
receive_rtp_config_.erase(ssrc);
UnregisterReceiveStream(ssrc);
UpdateAggregateNetworkState();
// TODO(bugs.webrtc.org/11993): Consider if deleting `audio_receive_stream`
@ -1148,9 +1159,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
// stream. Since the transport_send_cc negotiation is per payload
// type, we may get an incorrect value for the rtx stream, but
// that is unlikely to matter in practice.
receive_rtp_config_.emplace(rtp.rtx_ssrc, receive_stream);
RegisterReceiveStream(rtp.rtx_ssrc, receive_stream);
}
receive_rtp_config_.emplace(rtp.remote_ssrc, receive_stream);
RegisterReceiveStream(rtp.remote_ssrc, receive_stream);
video_receive_streams_.insert(receive_stream);
ConfigureSync(receive_stream->sync_group());
@ -1175,9 +1186,9 @@ void Call::DestroyVideoReceiveStream(
// Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
// separate SSRC there can be either one or two.
receive_rtp_config_.erase(rtp.remote_ssrc);
UnregisterReceiveStream(rtp.remote_ssrc);
if (rtp.rtx_ssrc) {
receive_rtp_config_.erase(rtp.rtx_ssrc);
UnregisterReceiveStream(rtp.rtx_ssrc);
}
video_receive_streams_.erase(receive_stream_impl);
ConfigureSync(receive_stream_impl->sync_group());
@ -1210,10 +1221,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
// TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network
// thread.
receive_stream->RegisterWithTransport(&video_receiver_controller_);
RTC_DCHECK(receive_rtp_config_.find(config.rtp.remote_ssrc) ==
receive_rtp_config_.end());
receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream);
RegisterReceiveStream(config.rtp.remote_ssrc, receive_stream);
// TODO(brandtr): Store config in RtcEventLog here.
@ -1231,7 +1239,7 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
RTC_DCHECK(receive_stream != nullptr);
const FlexfecReceiveStream::RtpConfig& rtp = receive_stream->rtp_config();
receive_rtp_config_.erase(rtp.remote_ssrc);
UnregisterReceiveStream(rtp.remote_ssrc);
// Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
// destroyed.
@ -1583,22 +1591,11 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
is_keep_alive_packet);
auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
if (it == receive_rtp_config_.end()) {
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
// RtpDemuxer, is not protected by the `worker_thread_`.
// But deregistering in the `receive_rtp_config_` map is. So by not passing
// the packet on to demuxing in this case, we prevent incoming packets to be
// passed on via the demuxer to a receive stream which is being torned down.
bool use_send_side_bwe = false;
if (!IdentifyReceivedPacket(parsed_packet, &use_send_side_bwe))
return DELIVERY_UNKNOWN_SSRC;
}
parsed_packet.IdentifyExtensions(
RtpHeaderExtensionMap(it->second->rtp_config().extensions));
NotifyBweOfReceivedPacket(parsed_packet, media_type);
NotifyBweOfReceivedPacket(parsed_packet, media_type, use_send_side_bwe);
// RateCounters expect input parameter as int, save it as int,
// instead of converting each time it is passed to RateCounter::Add below.
@ -1649,20 +1646,8 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
parsed_packet.set_recovered(true);
auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
if (it == receive_rtp_config_.end()) {
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
// RtpDemuxer, is not protected by the `worker_thread_`.
// But deregistering in the `receive_rtp_config_` map is.
// So by not passing the packet on to demuxing in this case, we prevent
// incoming packets to be passed on via the demuxer to a receive stream
// which is being torn down.
if (!IdentifyReceivedPacket(parsed_packet))
return;
}
parsed_packet.IdentifyExtensions(
RtpHeaderExtensionMap(it->second->rtp_config().extensions));
// TODO(brandtr): Update here when we support protecting audio packets too.
parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
@ -1671,11 +1656,8 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
// RTC_RUN_ON(worker_thread_)
void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type) {
auto it = receive_rtp_config_.find(packet.Ssrc());
bool use_send_side_bwe = (it != receive_rtp_config_.end()) &&
UseSendSideBwe(it->second->rtp_config());
MediaType media_type,
bool use_send_side_bwe) {
RTPHeader header;
packet.GetHeader(&header);
@ -1706,6 +1688,45 @@ void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
}
}
bool Call::IdentifyReceivedPacket(RtpPacketReceived& packet,
bool* use_send_side_bwe /*= nullptr*/) {
RTC_DCHECK_RUN_ON(&receive_11993_checker_);
auto it = receive_rtp_config_.find(packet.Ssrc());
if (it == receive_rtp_config_.end()) {
RTC_DLOG(LS_WARNING) << "receive_rtp_config_ lookup failed for ssrc "
<< packet.Ssrc();
return false;
}
packet.IdentifyExtensions(
RtpHeaderExtensionMap(it->second->rtp_config().extensions));
if (use_send_side_bwe) {
*use_send_side_bwe = UseSendSideBwe(it->second->rtp_config());
}
return true;
}
bool Call::RegisterReceiveStream(uint32_t ssrc, ReceiveStream* stream) {
RTC_DCHECK_RUN_ON(&receive_11993_checker_);
RTC_DCHECK(stream);
auto inserted = receive_rtp_config_.emplace(ssrc, stream);
if (!inserted.second) {
RTC_DLOG(LS_WARNING) << "ssrc already registered: " << ssrc;
}
return inserted.second;
}
bool Call::UnregisterReceiveStream(uint32_t ssrc) {
RTC_DCHECK_RUN_ON(&receive_11993_checker_);
size_t erased = receive_rtp_config_.erase(ssrc);
if (!erased) {
RTC_DLOG(LS_WARNING) << "ssrc wasn't registered: " << ssrc;
}
return erased != 0u;
}
} // namespace internal
} // namespace webrtc