Remove receive_crit_ from Call.

This lock isn't needed anymore which has been clarified with recent
refactoring. Updated thread guards and related comments.

Change-Id: Ia7a1e59c45b67597264e73158654e4dffe6c4fc5
Bug: webrtc:11610
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176120
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31351}
This commit is contained in:
Tommi 2020-05-26 11:38:36 +02:00 committed by Commit Bot
parent de2049e85a
commit 31001a610d

View File

@ -243,16 +243,18 @@ class Call final : public webrtc::Call,
private:
DeliveryStatus DeliverRtcp(MediaType media_type,
const uint8_t* packet,
size_t length);
size_t length)
RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_);
DeliveryStatus DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us);
int64_t packet_time_us)
RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_);
void ConfigureSync(const std::string& sync_group)
RTC_EXCLUSIVE_LOCKS_REQUIRED(receive_crit_);
RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_);
void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type)
RTC_SHARED_LOCKS_REQUIRED(receive_crit_);
RTC_SHARED_LOCKS_REQUIRED(configuration_sequence_checker_);
void UpdateSendHistograms(Timestamp first_sent_packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_);
@ -281,16 +283,15 @@ class Call final : public webrtc::Call,
NetworkState video_network_state_;
bool aggregate_network_up_ RTC_GUARDED_BY(configuration_sequence_checker_);
std::unique_ptr<RWLockWrapper> receive_crit_;
// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
std::set<AudioReceiveStream*> audio_receive_streams_
RTC_GUARDED_BY(receive_crit_);
RTC_GUARDED_BY(configuration_sequence_checker_);
std::set<VideoReceiveStream2*> video_receive_streams_
RTC_GUARDED_BY(receive_crit_);
RTC_GUARDED_BY(configuration_sequence_checker_);
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
RTC_GUARDED_BY(receive_crit_);
RTC_GUARDED_BY(configuration_sequence_checker_);
// TODO(nisse): Should eventually be injected at creation,
// with a single object in the bundled case.
@ -324,7 +325,7 @@ class Call final : public webrtc::Call,
const bool use_send_side_bwe;
};
std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
RTC_GUARDED_BY(receive_crit_);
RTC_GUARDED_BY(configuration_sequence_checker_);
std::unique_ptr<RWLockWrapper> send_crit_;
// Audio and Video send streams are owned by the client that creates them.
@ -557,7 +558,6 @@ Call::Call(Clock* clock,
audio_network_state_(kNetworkDown),
video_network_state_(kNetworkDown),
aggregate_network_up_(false),
receive_crit_(RWLockWrapper::CreateRWLock()),
send_crit_(RWLockWrapper::CreateRWLock()),
event_log_(config.event_log),
received_bytes_per_second_counter_(clock_, nullptr, true),
@ -745,14 +745,13 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
audio_send_ssrcs_.end());
audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
}
{
ReadLockScoped read_lock(*receive_crit_);
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
stream->AssociateSendStream(send_stream);
}
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
stream->AssociateSendStream(send_stream);
}
}
UpdateAggregateNetworkState();
return send_stream;
}
@ -773,14 +772,13 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
RTC_DCHECK_EQ(1, num_deleted);
}
{
ReadLockScoped read_lock(*receive_crit_);
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == ssrc) {
stream->AssociateSendStream(nullptr);
}
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == ssrc) {
stream->AssociateSendStream(nullptr);
}
}
UpdateAggregateNetworkState();
delete send_stream;
}
@ -796,14 +794,12 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
module_process_thread_->process_thread(), config_.neteq_factory, config,
config_.audio_state, event_log_);
{
WriteLockScoped write_lock(*receive_crit_);
receive_rtp_config_.emplace(config.rtp.remote_ssrc,
ReceiveRtpConfig(config));
audio_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
}
receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
audio_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
{
ReadLockScoped read_lock(*send_crit_);
auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
@ -822,22 +818,20 @@ void Call::DestroyAudioReceiveStream(
RTC_DCHECK(receive_stream != nullptr);
webrtc::internal::AudioReceiveStream* audio_receive_stream =
static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
{
WriteLockScoped write_lock(*receive_crit_);
const AudioReceiveStream::Config& config = audio_receive_stream->config();
uint32_t ssrc = config.rtp.remote_ssrc;
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(ssrc);
audio_receive_streams_.erase(audio_receive_stream);
const std::string& sync_group = audio_receive_stream->config().sync_group;
const auto it = sync_stream_mapping_.find(sync_group);
if (it != sync_stream_mapping_.end() &&
it->second == audio_receive_stream) {
sync_stream_mapping_.erase(it);
ConfigureSync(sync_group);
}
receive_rtp_config_.erase(ssrc);
const AudioReceiveStream::Config& config = audio_receive_stream->config();
uint32_t ssrc = config.rtp.remote_ssrc;
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(ssrc);
audio_receive_streams_.erase(audio_receive_stream);
const std::string& sync_group = audio_receive_stream->config().sync_group;
const auto it = sync_stream_mapping_.find(sync_group);
if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
sync_stream_mapping_.erase(it);
ConfigureSync(sync_group);
}
receive_rtp_config_.erase(ssrc);
UpdateAggregateNetworkState();
delete audio_receive_stream;
}
@ -955,21 +949,17 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
new VCMTiming(clock_));
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
{
WriteLockScoped write_lock(*receive_crit_);
if (config.rtp.rtx_ssrc) {
// We record identical config for the rtx stream as for the main
// stream. Since the transport_send_cc negotiation is per payload
// type, we may get an incorrect value for the rtx stream, but
// that is unlikely to matter in practice.
receive_rtp_config_.emplace(config.rtp.rtx_ssrc,
ReceiveRtpConfig(config));
}
receive_rtp_config_.emplace(config.rtp.remote_ssrc,
ReceiveRtpConfig(config));
video_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
if (config.rtp.rtx_ssrc) {
// We record identical config for the rtx stream as for the main
// stream. Since the transport_send_cc negotiation is per payload
// type, we may get an incorrect value for the rtx stream, but
// that is unlikely to matter in practice.
receive_rtp_config_.emplace(config.rtp.rtx_ssrc, ReceiveRtpConfig(config));
}
receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
video_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
receive_stream->SignalNetworkState(video_network_state_);
UpdateAggregateNetworkState();
event_log_->Log(std::make_unique<RtcEventVideoReceiveStreamConfig>(
@ -985,17 +975,15 @@ void Call::DestroyVideoReceiveStream(
VideoReceiveStream2* receive_stream_impl =
static_cast<VideoReceiveStream2*>(receive_stream);
const VideoReceiveStream::Config& config = receive_stream_impl->config();
{
WriteLockScoped write_lock(*receive_crit_);
// Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
// separate SSRC there can be either one or two.
receive_rtp_config_.erase(config.rtp.remote_ssrc);
if (config.rtp.rtx_ssrc) {
receive_rtp_config_.erase(config.rtp.rtx_ssrc);
}
video_receive_streams_.erase(receive_stream_impl);
ConfigureSync(config.sync_group);
// Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
// separate SSRC there can be either one or two.
receive_rtp_config_.erase(config.rtp.remote_ssrc);
if (config.rtp.rtx_ssrc) {
receive_rtp_config_.erase(config.rtp.rtx_ssrc);
}
video_receive_streams_.erase(receive_stream_impl);
ConfigureSync(config.sync_group);
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(config.rtp.remote_ssrc);
@ -1012,26 +1000,20 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
RecoveredPacketReceiver* recovered_packet_receiver = this;
FlexfecReceiveStreamImpl* receive_stream;
{
WriteLockScoped write_lock(*receive_crit_);
// Unlike the video and audio receive streams,
// FlexfecReceiveStream implements RtpPacketSinkInterface itself,
// and hence its constructor passes its |this| pointer to
// video_receiver_controller_->CreateStream(). Calling the
// constructor while holding |receive_crit_| ensures that we don't
// call OnRtpPacket until the constructor is finished and the
// object is in a valid state.
// TODO(nisse): Fix constructor so that it can be moved outside of
// this locked scope.
receive_stream = new FlexfecReceiveStreamImpl(
clock_, &video_receiver_controller_, config, recovered_packet_receiver,
call_stats_->AsRtcpRttStats(),
module_process_thread_->process_thread());
RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
receive_rtp_config_.end());
receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config));
}
// Unlike the video and audio receive streams, FlexfecReceiveStream implements
// RtpPacketSinkInterface itself, and hence its constructor passes its |this|
// pointer to video_receiver_controller_->CreateStream(). Calling the
// constructor while on the worker thread ensures that we don't call
// OnRtpPacket until the constructor is finished and the object is
// in a valid state, since OnRtpPacket runs on the same thread.
receive_stream = new FlexfecReceiveStreamImpl(
clock_, &video_receiver_controller_, config, recovered_packet_receiver,
call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
receive_rtp_config_.end());
receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config));
// TODO(brandtr): Store config in RtcEventLog here.
@ -1043,18 +1025,14 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
RTC_DCHECK(receive_stream != nullptr);
{
WriteLockScoped write_lock(*receive_crit_);
const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
uint32_t ssrc = config.remote_ssrc;
receive_rtp_config_.erase(ssrc);
const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
uint32_t ssrc = config.remote_ssrc;
receive_rtp_config_.erase(ssrc);
// Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
// destroyed.
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(ssrc);
}
// Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
// destroyed.
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(ssrc);
delete receive_stream;
}
@ -1118,11 +1096,8 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
}
UpdateAggregateNetworkState();
{
ReadLockScoped read_lock(*receive_crit_);
for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
video_receive_stream->SignalNetworkState(video_network_state_);
}
for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
video_receive_stream->SignalNetworkState(video_network_state_);
}
}
@ -1145,13 +1120,12 @@ void Call::UpdateAggregateNetworkState() {
if (!video_send_ssrcs_.empty())
have_video = true;
}
{
ReadLockScoped read_lock(*receive_crit_);
if (!audio_receive_streams_.empty())
have_audio = true;
if (!video_receive_streams_.empty())
have_video = true;
}
if (!audio_receive_streams_.empty())
have_audio = true;
if (!video_receive_streams_.empty())
have_video = true;
bool aggregate_network_up =
((have_video && video_network_state_ == kNetworkUp) ||
@ -1299,14 +1273,12 @@ PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type,
}
bool rtcp_delivered = false;
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
ReadLockScoped read_lock(*receive_crit_);
for (VideoReceiveStream2* stream : video_receive_streams_) {
if (stream->DeliverRtcp(packet, length))
rtcp_delivered = true;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
ReadLockScoped read_lock(*receive_crit_);
for (AudioReceiveStream* stream : audio_receive_streams_) {
stream->DeliverRtcp(packet, length);
rtcp_delivered = true;
@ -1364,17 +1336,15 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
is_keep_alive_packet);
ReadLockScoped read_lock(*receive_crit_);
auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
if (it == receive_rtp_config_.end()) {
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
// RtpDemuxer, is not protected by the |receive_crit_| lock. But
// deregistering in the |receive_rtp_config_| map is protected by that lock.
// So by not passing the packet on to demuxing in this case, we prevent
// incoming packets to be passed on via the demuxer to a receive stream
// which is being torned down.
// RtpDemuxer, is not protected by the |configuration_sequence_checker_|.
// But deregistering in the |receive_rtp_config_| map is. So by not passing
// the packet on to demuxing in this case, we prevent incoming packets to be
// passed on via the demuxer to a receive stream which is being torned down.
return DELIVERY_UNKNOWN_SSRC;
}
@ -1428,20 +1398,20 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
}
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(packet, length))
return;
parsed_packet.set_recovered(true);
ReadLockScoped read_lock(*receive_crit_);
auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
if (it == receive_rtp_config_.end()) {
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
// RtpDemuxer, is not protected by the |receive_crit_| lock. But
// deregistering in the |receive_rtp_config_| map is protected by that lock.
// RtpDemuxer, is not protected by the |configuration_sequence_checker_|.
// But deregistering in the |receive_rtp_config_| map is.
// So by not passing the packet on to demuxing in this case, we prevent
// incoming packets to be passed on via the demuxer to a receive stream
// which is being torn down.