diff --git a/webrtc/audio/audio_receive_stream.h b/webrtc/audio/audio_receive_stream.h index 20ed4613a7..d0b5a4d1cb 100644 --- a/webrtc/audio/audio_receive_stream.h +++ b/webrtc/audio/audio_receive_stream.h @@ -19,6 +19,7 @@ #include "webrtc/base/constructormagic.h" #include "webrtc/base/thread_checker.h" #include "webrtc/call/audio_receive_stream.h" +#include "webrtc/call/rtp_demuxer.h" #include "webrtc/call/syncable.h" namespace webrtc { @@ -35,7 +36,8 @@ class AudioSendStream; class AudioReceiveStream final : public webrtc::AudioReceiveStream, public AudioMixer::Source, - public Syncable { + public Syncable, + public RtpPacketSinkInterface { public: AudioReceiveStream(PacketRouter* packet_router, const webrtc::AudioReceiveStream::Config& config, @@ -52,8 +54,8 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, void SetGain(float gain) override; std::vector GetSources() const override; - // TODO(nisse): Intended to be part of an RtpPacketReceiver interface. - void OnRtpPacket(const RtpPacketReceived& packet); + // RtpPacketSinkInterface. + void OnRtpPacket(const RtpPacketReceived& packet) override; // AudioMixer::Source AudioFrameInfo GetAudioFrameWithInfo(int sample_rate_hz, diff --git a/webrtc/call/BUILD.gn b/webrtc/call/BUILD.gn index 1f29e61822..0d87b2f9f7 100644 --- a/webrtc/call/BUILD.gn +++ b/webrtc/call/BUILD.gn @@ -16,6 +16,7 @@ rtc_source_set("call_interfaces") { "audio_state.h", "call.h", "flexfec_receive_stream.h", + "rtp_demuxer.h", "rtp_transport_controller_send_interface.h", "syncable.cc", "syncable.h", @@ -38,6 +39,7 @@ rtc_static_library("call") { "call.cc", "flexfec_receive_stream_impl.cc", "flexfec_receive_stream_impl.h", + "rtp_demuxer.cc", "rtp_transport_controller_send.cc", "rtp_transport_controller_send.h", ] diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc index d178406b29..802778eb4d 100644 --- a/webrtc/call/call.cc +++ b/webrtc/call/call.cc @@ -34,6 +34,7 @@ #include "webrtc/call/bitrate_allocator.h" #include "webrtc/call/call.h" #include "webrtc/call/flexfec_receive_stream_impl.h" +#include "webrtc/call/rtp_demuxer.h" #include "webrtc/call/rtp_transport_controller_send.h" #include "webrtc/config.h" #include "webrtc/logging/rtc_event_log/rtc_event_log.h" @@ -204,23 +205,19 @@ class Call : public webrtc::Call, std::unique_ptr receive_crit_; // Audio, Video, and FlexFEC receive streams are owned by the client that // creates them. - std::map audio_receive_ssrcs_ - GUARDED_BY(receive_crit_); - std::map video_receive_ssrcs_ + std::set audio_receive_streams_ GUARDED_BY(receive_crit_); std::set video_receive_streams_ GUARDED_BY(receive_crit_); - // Each media stream could conceivably be protected by multiple FlexFEC - // streams. - std::multimap - flexfec_receive_ssrcs_media_ GUARDED_BY(receive_crit_); - std::map - flexfec_receive_ssrcs_protection_ GUARDED_BY(receive_crit_); - std::set flexfec_receive_streams_ - GUARDED_BY(receive_crit_); + std::map sync_stream_mapping_ GUARDED_BY(receive_crit_); + // TODO(nisse): Should eventually be part of injected + // RtpTransportControllerReceive, with a single demuxer in the bundled case. + RtpDemuxer audio_rtp_demuxer_ GUARDED_BY(receive_crit_); + RtpDemuxer video_rtp_demuxer_ GUARDED_BY(receive_crit_); + // This extra map is used for receive processing which is // independent of media type. @@ -377,8 +374,7 @@ Call::~Call() { RTC_CHECK(audio_send_ssrcs_.empty()); RTC_CHECK(video_send_ssrcs_.empty()); RTC_CHECK(video_send_streams_.empty()); - RTC_CHECK(audio_receive_ssrcs_.empty()); - RTC_CHECK(video_receive_ssrcs_.empty()); + RTC_CHECK(audio_receive_streams_.empty()); RTC_CHECK(video_receive_streams_.empty()); pacer_thread_->Stop(); @@ -520,9 +516,9 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } { ReadLockScoped read_lock(*receive_crit_); - for (const auto& kv : audio_receive_ssrcs_) { - if (kv.second->config().rtp.local_ssrc == config.rtp.ssrc) { - kv.second->AssociateSendStream(send_stream); + for (AudioReceiveStream* stream : audio_receive_streams_) { + if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { + stream->AssociateSendStream(send_stream); } } } @@ -548,9 +544,9 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { } { ReadLockScoped read_lock(*receive_crit_); - for (const auto& kv : audio_receive_ssrcs_) { - if (kv.second->config().rtp.local_ssrc == ssrc) { - kv.second->AssociateSendStream(nullptr); + for (AudioReceiveStream* stream : audio_receive_streams_) { + if (stream->config().rtp.local_ssrc == ssrc) { + stream->AssociateSendStream(nullptr); } } } @@ -568,11 +564,10 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( config_.audio_state, event_log_); { WriteLockScoped write_lock(*receive_crit_); - RTC_DCHECK(audio_receive_ssrcs_.find(config.rtp.remote_ssrc) == - audio_receive_ssrcs_.end()); - audio_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream; + audio_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream); receive_rtp_config_[config.rtp.remote_ssrc] = ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config)); + audio_receive_streams_.insert(receive_stream); ConfigureSync(config.sync_group); } @@ -601,8 +596,9 @@ void Call::DestroyAudioReceiveStream( uint32_t ssrc = config.rtp.remote_ssrc; receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(ssrc); - size_t num_deleted = audio_receive_ssrcs_.erase(ssrc); + size_t num_deleted = audio_rtp_demuxer_.RemoveSink(audio_receive_stream); RTC_DCHECK(num_deleted == 1); + audio_receive_streams_.erase(audio_receive_stream); const std::string& sync_group = audio_receive_stream->config().sync_group; const auto it = sync_stream_mapping_.find(sync_group); if (it != sync_stream_mapping_.end() && @@ -699,11 +695,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( UseSendSideBwe(config)); { WriteLockScoped write_lock(*receive_crit_); - RTC_DCHECK(video_receive_ssrcs_.find(config.rtp.remote_ssrc) == - video_receive_ssrcs_.end()); - video_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream; + video_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream); if (config.rtp.rtx_ssrc) { - video_receive_ssrcs_[config.rtp.rtx_ssrc] = receive_stream; + video_rtp_demuxer_.AddSink(config.rtp.rtx_ssrc, receive_stream); // We record identical config for the rtx stream as for the main // stream. Since the transport_send_cc negotiation is per payload // type, we may get an incorrect value for the rtx stream, but @@ -725,28 +719,22 @@ void Call::DestroyVideoReceiveStream( TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); RTC_DCHECK(receive_stream != nullptr); - VideoReceiveStream* receive_stream_impl = nullptr; + VideoReceiveStream* receive_stream_impl = + static_cast(receive_stream); + const VideoReceiveStream::Config& config = receive_stream_impl->config(); { WriteLockScoped write_lock(*receive_crit_); // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a // separate SSRC there can be either one or two. - auto it = video_receive_ssrcs_.begin(); - while (it != video_receive_ssrcs_.end()) { - if (it->second == static_cast(receive_stream)) { - if (receive_stream_impl != nullptr) - RTC_DCHECK(receive_stream_impl == it->second); - receive_stream_impl = it->second; - receive_rtp_config_.erase(it->first); - it = video_receive_ssrcs_.erase(it); - } else { - ++it; - } + size_t num_deleted = video_rtp_demuxer_.RemoveSink(receive_stream_impl); + RTC_DCHECK_GE(num_deleted, 1); + receive_rtp_config_.erase(config.rtp.remote_ssrc); + if (config.rtp.rtx_ssrc) { + receive_rtp_config_.erase(config.rtp.rtx_ssrc); } video_receive_streams_.erase(receive_stream_impl); - RTC_CHECK(receive_stream_impl != nullptr); - ConfigureSync(receive_stream_impl->config().sync_group); + ConfigureSync(config.sync_group); } - const VideoReceiveStream::Config& config = receive_stream_impl->config(); receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(config.rtp.remote_ssrc); @@ -767,17 +755,10 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( { WriteLockScoped write_lock(*receive_crit_); - - RTC_DCHECK(flexfec_receive_streams_.find(receive_stream) == - flexfec_receive_streams_.end()); - flexfec_receive_streams_.insert(receive_stream); + video_rtp_demuxer_.AddSink(config.remote_ssrc, receive_stream); for (auto ssrc : config.protected_media_ssrcs) - flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream)); - - RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.remote_ssrc) == - flexfec_receive_ssrcs_protection_.end()); - flexfec_receive_ssrcs_protection_[config.remote_ssrc] = receive_stream; + video_rtp_demuxer_.AddSink(ssrc, receive_stream); RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == receive_rtp_config_.end()); @@ -809,25 +790,9 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be // destroyed. - auto prot_it = flexfec_receive_ssrcs_protection_.begin(); - while (prot_it != flexfec_receive_ssrcs_protection_.end()) { - if (prot_it->second == receive_stream_impl) - prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it); - else - ++prot_it; - } - auto media_it = flexfec_receive_ssrcs_media_.begin(); - while (media_it != flexfec_receive_ssrcs_media_.end()) { - if (media_it->second == receive_stream_impl) - media_it = flexfec_receive_ssrcs_media_.erase(media_it); - else - ++media_it; - } - + video_rtp_demuxer_.RemoveSink(receive_stream_impl); receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(ssrc); - - flexfec_receive_streams_.erase(receive_stream_impl); } delete receive_stream_impl; @@ -914,11 +879,11 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { } { ReadLockScoped read_lock(*receive_crit_); - for (auto& kv : audio_receive_ssrcs_) { - kv.second->SignalNetworkState(audio_network_state_); + for (AudioReceiveStream* audio_receive_stream : audio_receive_streams_) { + audio_receive_stream->SignalNetworkState(audio_network_state_); } - for (auto& kv : video_receive_ssrcs_) { - kv.second->SignalNetworkState(video_network_state_); + for (VideoReceiveStream* video_receive_stream : video_receive_streams_) { + video_receive_stream->SignalNetworkState(video_network_state_); } } } @@ -1000,9 +965,9 @@ void Call::UpdateAggregateNetworkState() { } { ReadLockScoped read_lock(*receive_crit_); - if (audio_receive_ssrcs_.size() > 0) + if (audio_receive_streams_.size() > 0) have_audio = true; - if (video_receive_ssrcs_.size() > 0) + if (video_receive_streams_.size() > 0) have_video = true; } @@ -1093,15 +1058,15 @@ void Call::ConfigureSync(const std::string& sync_group) { sync_audio_stream = it->second; } else { // No configured audio stream, see if we can find one. - for (const auto& kv : audio_receive_ssrcs_) { - if (kv.second->config().sync_group == sync_group) { + for (AudioReceiveStream* stream : audio_receive_streams_) { + if (stream->config().sync_group == sync_group) { if (sync_audio_stream != nullptr) { LOG(LS_WARNING) << "Attempting to sync more than one audio stream " "within the same sync group. This is not " "supported in the current implementation."; break; } - sync_audio_stream = kv.second; + sync_audio_stream = stream; } } } @@ -1151,8 +1116,8 @@ PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type, } if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { ReadLockScoped read_lock(*receive_crit_); - for (auto& kv : audio_receive_ssrcs_) { - if (kv.second->DeliverRtcp(packet, length)) + for (AudioReceiveStream* stream : audio_receive_streams_) { + if (stream->DeliverRtcp(packet, length)) rtcp_delivered = true; } } @@ -1196,41 +1161,17 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, NotifyBweOfReceivedPacket(*parsed_packet, media_type); - uint32_t ssrc = parsed_packet->Ssrc(); - if (media_type == MediaType::AUDIO) { - auto it = audio_receive_ssrcs_.find(ssrc); - if (it != audio_receive_ssrcs_.end()) { + if (audio_rtp_demuxer_.OnRtpPacket(*parsed_packet)) { received_bytes_per_second_counter_.Add(static_cast(length)); received_audio_bytes_per_second_counter_.Add(static_cast(length)); - it->second->OnRtpPacket(*parsed_packet); event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); return DELIVERY_OK; } - } - if (media_type == MediaType::VIDEO) { - auto it = video_receive_ssrcs_.find(ssrc); - if (it != video_receive_ssrcs_.end()) { + } else if (media_type == MediaType::VIDEO) { + if (video_rtp_demuxer_.OnRtpPacket(*parsed_packet)) { received_bytes_per_second_counter_.Add(static_cast(length)); received_video_bytes_per_second_counter_.Add(static_cast(length)); - it->second->OnRtpPacket(*parsed_packet); - - // Deliver media packets to FlexFEC subsystem. - auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc); - for (auto it = it_bounds.first; it != it_bounds.second; ++it) - it->second->OnRtpPacket(*parsed_packet); - - event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); - return DELIVERY_OK; - } - } - if (media_type == MediaType::VIDEO) { - received_bytes_per_second_counter_.Add(static_cast(length)); - // TODO(brandtr): Update here when FlexFEC supports protecting audio. - received_video_bytes_per_second_counter_.Add(static_cast(length)); - auto it = flexfec_receive_ssrcs_protection_.find(ssrc); - if (it != flexfec_receive_ssrcs_protection_.end()) { - it->second->OnRtpPacket(*parsed_packet); event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); return DELIVERY_OK; } @@ -1264,11 +1205,7 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { parsed_packet->set_recovered(true); - auto it = video_receive_ssrcs_.find(parsed_packet->Ssrc()); - if (it == video_receive_ssrcs_.end()) - return; - - it->second->OnRtpPacket(*parsed_packet); + video_rtp_demuxer_.OnRtpPacket(*parsed_packet); } void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, diff --git a/webrtc/call/flexfec_receive_stream_impl.h b/webrtc/call/flexfec_receive_stream_impl.h index 705182906c..39fcaa7614 100644 --- a/webrtc/call/flexfec_receive_stream_impl.h +++ b/webrtc/call/flexfec_receive_stream_impl.h @@ -15,6 +15,7 @@ #include "webrtc/base/criticalsection.h" #include "webrtc/call/flexfec_receive_stream.h" +#include "webrtc/call/rtp_demuxer.h" namespace webrtc { @@ -26,7 +27,8 @@ class RtcpRttStats; class RtpPacketReceived; class RtpRtcp; -class FlexfecReceiveStreamImpl : public FlexfecReceiveStream { +class FlexfecReceiveStreamImpl : public FlexfecReceiveStream, + public RtpPacketSinkInterface { public: FlexfecReceiveStreamImpl(const Config& config, RecoveredPacketReceiver* recovered_packet_receiver, @@ -36,8 +38,8 @@ class FlexfecReceiveStreamImpl : public FlexfecReceiveStream { const Config& GetConfig() const { return config_; } - // TODO(nisse): Intended to be part of an RtpPacketReceiver interface. - void OnRtpPacket(const RtpPacketReceived& packet); + // RtpPacketSinkInterface. + void OnRtpPacket(const RtpPacketReceived& packet) override; // Implements FlexfecReceiveStream. void Start() override; diff --git a/webrtc/call/rtp_demuxer.cc b/webrtc/call/rtp_demuxer.cc new file mode 100644 index 0000000000..aea414e08d --- /dev/null +++ b/webrtc/call/rtp_demuxer.cc @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/checks.h" +#include "webrtc/call/rtp_demuxer.h" +#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h" + +namespace webrtc { + +RtpDemuxer::RtpDemuxer() {} + +RtpDemuxer::~RtpDemuxer() { + RTC_DCHECK(sinks_.empty()); +} + +void RtpDemuxer::AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) { + RTC_DCHECK(sink); + sinks_.emplace(ssrc, sink); +} + +size_t RtpDemuxer::RemoveSink(const RtpPacketSinkInterface* sink) { + RTC_DCHECK(sink); + size_t count = 0; + for (auto it = sinks_.begin(); it != sinks_.end(); ) { + if (it->second == sink) { + it = sinks_.erase(it); + ++count; + } else { + ++it; + } + } + return count; +} + +bool RtpDemuxer::OnRtpPacket(const RtpPacketReceived& packet) { + bool found = false; + auto it_range = sinks_.equal_range(packet.Ssrc()); + for (auto it = it_range.first; it != it_range.second; ++it) { + found = true; + it->second->OnRtpPacket(packet); + } + return found; +} + +} // namespace webrtc diff --git a/webrtc/call/rtp_demuxer.h b/webrtc/call/rtp_demuxer.h new file mode 100644 index 0000000000..e1a81603dd --- /dev/null +++ b/webrtc/call/rtp_demuxer.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef WEBRTC_CALL_RTP_DEMUXER_H_ +#define WEBRTC_CALL_RTP_DEMUXER_H_ + +#include + +namespace webrtc { + +class RtpPacketReceived; + +// This class represents a receiver of an already parsed RTP packets. +class RtpPacketSinkInterface { + public: + virtual ~RtpPacketSinkInterface() {} + virtual void OnRtpPacket(const RtpPacketReceived& packet) = 0; +}; + +// This class represents the RTP demuxing, for a single RTP session (i.e., one +// ssrc space, see RFC 7656). It isn't thread aware, leaving responsibility of +// multithreading issues to the user of this class. +// TODO(nisse): Should be extended to also do MID-based demux and payload-type +// demux. +class RtpDemuxer { + public: + RtpDemuxer(); + ~RtpDemuxer(); + + // Registers a sink. The same sink can be registered for multiple ssrcs, and + // the same ssrc can have multiple sinks. Null pointer is not allowed. + void AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink); + // Removes a sink. Returns deletion count (a sink may be registered + // for multiple ssrcs). Null pointer is not allowed. + size_t RemoveSink(const RtpPacketSinkInterface* sink); + + // Returns true if at least one matching sink was found, otherwise false. + bool OnRtpPacket(const RtpPacketReceived& packet); + + private: + std::multimap sinks_; +}; + +} // namespace webrtc + +#endif // WEBRTC_CALL_RTP_DEMUXER_H_ diff --git a/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc b/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc index 47534e480e..a365fcbec4 100644 --- a/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc +++ b/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc @@ -132,9 +132,11 @@ bool FlexfecReceiver::ProcessReceivedPackets() { continue; } ++packet_counter_.num_recovered_packets; + // Set this flag first, since OnRecoveredPacket may end up here + // again, with the same packet. + recovered_packet->returned = true; recovered_packet_receiver_->OnRecoveredPacket( recovered_packet->pkt->data, recovered_packet->pkt->length); - recovered_packet->returned = true; // Periodically log the incoming packets. int64_t now_ms = clock_->TimeInMilliseconds(); if (now_ms - last_recovered_packet_ms_ > kPacketLogIntervalMs) { diff --git a/webrtc/video/video_receive_stream.h b/webrtc/video/video_receive_stream.h index ca797e2a3e..6656b34963 100644 --- a/webrtc/video/video_receive_stream.h +++ b/webrtc/video/video_receive_stream.h @@ -15,6 +15,7 @@ #include #include "webrtc/base/thread_checker.h" +#include "webrtc/call/rtp_demuxer.h" #include "webrtc/call/syncable.h" #include "webrtc/common_video/include/incoming_video_stream.h" #include "webrtc/common_video/libyuv/include/webrtc_libyuv.h" @@ -46,7 +47,8 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, public NackSender, public KeyFrameRequestSender, public video_coding::OnCompleteFrameCallback, - public Syncable { + public Syncable, + public RtpPacketSinkInterface { public: VideoReceiveStream(int num_cpu_cores, PacketRouter* packet_router, @@ -76,8 +78,8 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, void EnableEncodedFrameRecording(rtc::PlatformFile file, size_t byte_limit) override; - // TODO(nisse): Intended to be part of an RtpPacketReceiver interface. - void OnRtpPacket(const RtpPacketReceived& packet); + // RtpPacketSinkInterface. + void OnRtpPacket(const RtpPacketReceived& packet) override; // Implements rtc::VideoSinkInterface. void OnFrame(const VideoFrame& video_frame) override;