diff --git a/webrtc/audio/BUILD.gn b/webrtc/audio/BUILD.gn index 48a6706f2d..414a7c65bd 100644 --- a/webrtc/audio/BUILD.gn +++ b/webrtc/audio/BUILD.gn @@ -79,6 +79,7 @@ if (rtc_include_tests) { "../api:mock_audio_mixer", "../base:rtc_base_approved", "../base:rtc_task_queue", + "../call:rtp_receiver", "../modules/audio_device:mock_audio_device", "../modules/audio_mixer:audio_mixer_impl", "../modules/congestion_controller:congestion_controller", diff --git a/webrtc/audio/audio_receive_stream.cc b/webrtc/audio/audio_receive_stream.cc index cb90a68a0f..079e971786 100644 --- a/webrtc/audio/audio_receive_stream.cc +++ b/webrtc/audio/audio_receive_stream.cc @@ -20,6 +20,7 @@ #include "webrtc/base/checks.h" #include "webrtc/base/logging.h" #include "webrtc/base/timeutils.h" +#include "webrtc/call/rtp_stream_receiver_controller_interface.h" #include "webrtc/modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" #include "webrtc/modules/rtp_rtcp/include/rtp_receiver.h" #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h" @@ -62,12 +63,12 @@ std::string AudioReceiveStream::Config::ToString() const { namespace internal { AudioReceiveStream::AudioReceiveStream( + RtpStreamReceiverControllerInterface* receiver_controller, PacketRouter* packet_router, const webrtc::AudioReceiveStream::Config& config, const rtc::scoped_refptr& audio_state, webrtc::RtcEventLog* event_log) - : config_(config), - audio_state_(audio_state) { + : config_(config), audio_state_(audio_state) { LOG(LS_INFO) << "AudioReceiveStream: " << config_.ToString(); RTC_DCHECK_NE(config_.voe_channel_id, -1); RTC_DCHECK(audio_state_.get()); @@ -107,6 +108,11 @@ AudioReceiveStream::AudioReceiveStream( } // Configure bandwidth estimation. channel_proxy_->RegisterReceiverCongestionControlObjects(packet_router); + + // Register with transport. + rtp_stream_receiver_ = + receiver_controller->CreateReceiver(config_.rtp.remote_ssrc, + channel_proxy_.get()); } AudioReceiveStream::~AudioReceiveStream() { diff --git a/webrtc/audio/audio_receive_stream.h b/webrtc/audio/audio_receive_stream.h index 7dcc6d3c53..92c4763b51 100644 --- a/webrtc/audio/audio_receive_stream.h +++ b/webrtc/audio/audio_receive_stream.h @@ -26,6 +26,8 @@ namespace webrtc { class PacketRouter; class RtcEventLog; class RtpPacketReceived; +class RtpStreamReceiverControllerInterface; +class RtpStreamReceiverInterface; namespace voe { class ChannelProxy; @@ -36,10 +38,10 @@ class AudioSendStream; class AudioReceiveStream final : public webrtc::AudioReceiveStream, public AudioMixer::Source, - public Syncable, - public RtpPacketSinkInterface { + public Syncable { public: - AudioReceiveStream(PacketRouter* packet_router, + AudioReceiveStream(RtpStreamReceiverControllerInterface* receiver_controller, + PacketRouter* packet_router, const webrtc::AudioReceiveStream::Config& config, const rtc::scoped_refptr& audio_state, webrtc::RtcEventLog* event_log); @@ -54,8 +56,11 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, void SetGain(float gain) override; std::vector GetSources() const override; - // RtpPacketSinkInterface. - void OnRtpPacket(const RtpPacketReceived& packet) override; + // TODO(nisse): We don't formally implement RtpPacketSinkInterface, and this + // method shouldn't be needed. But it's currently used by the + // AudioReceiveStreamTest.ReceiveRtpPacket unittest. Figure out if that test + // shuld be refactored or deleted, and then delete this method. + void OnRtpPacket(const RtpPacketReceived& packet); // AudioMixer::Source AudioFrameInfo GetAudioFrameWithInfo(int sample_rate_hz, @@ -87,6 +92,8 @@ class AudioReceiveStream final : public webrtc::AudioReceiveStream, bool playing_ ACCESS_ON(worker_thread_checker_) = false; + std::unique_ptr rtp_stream_receiver_; + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(AudioReceiveStream); }; } // namespace internal diff --git a/webrtc/audio/audio_receive_stream_unittest.cc b/webrtc/audio/audio_receive_stream_unittest.cc index f600b857b6..84efb20b91 100644 --- a/webrtc/audio/audio_receive_stream_unittest.cc +++ b/webrtc/audio/audio_receive_stream_unittest.cc @@ -15,6 +15,7 @@ #include "webrtc/api/test/mock_audio_mixer.h" #include "webrtc/audio/audio_receive_stream.h" #include "webrtc/audio/conversion.h" +#include "webrtc/call/rtp_stream_receiver_controller.h" #include "webrtc/logging/rtc_event_log/mock/mock_rtc_event_log.h" #include "webrtc/modules/bitrate_controller/include/mock/mock_bitrate_controller.h" #include "webrtc/modules/pacing/packet_router.h" @@ -137,6 +138,9 @@ struct ConfigHelper { rtc::scoped_refptr audio_mixer() { return audio_mixer_; } MockVoiceEngine& voice_engine() { return voice_engine_; } MockVoEChannelProxy* channel_proxy() { return channel_proxy_; } + RtpStreamReceiverControllerInterface* rtp_stream_receiver_controller() { + return &rtp_stream_receiver_controller_; + } void SetupMockForGetStats() { using testing::DoAll; @@ -166,6 +170,7 @@ struct ConfigHelper { rtc::scoped_refptr audio_mixer_; AudioReceiveStream::Config stream_config_; testing::StrictMock* channel_proxy_ = nullptr; + RtpStreamReceiverController rtp_stream_receiver_controller_; }; void BuildOneByteExtension(std::vector::iterator it, @@ -238,6 +243,7 @@ TEST(AudioReceiveStreamTest, ConfigToString) { TEST(AudioReceiveStreamTest, ConstructDestruct) { ConfigHelper helper; internal::AudioReceiveStream recv_stream( + helper.rtp_stream_receiver_controller(), helper.packet_router(), helper.config(), helper.audio_state(), helper.event_log()); } @@ -246,6 +252,7 @@ TEST(AudioReceiveStreamTest, ReceiveRtpPacket) { ConfigHelper helper; helper.config().rtp.transport_cc = true; internal::AudioReceiveStream recv_stream( + helper.rtp_stream_receiver_controller(), helper.packet_router(), helper.config(), helper.audio_state(), helper.event_log()); const int kTransportSequenceNumberValue = 1234; @@ -267,6 +274,7 @@ TEST(AudioReceiveStreamTest, ReceiveRtcpPacket) { ConfigHelper helper; helper.config().rtp.transport_cc = true; internal::AudioReceiveStream recv_stream( + helper.rtp_stream_receiver_controller(), helper.packet_router(), helper.config(), helper.audio_state(), helper.event_log()); @@ -280,6 +288,7 @@ TEST(AudioReceiveStreamTest, ReceiveRtcpPacket) { TEST(AudioReceiveStreamTest, GetStats) { ConfigHelper helper; internal::AudioReceiveStream recv_stream( + helper.rtp_stream_receiver_controller(), helper.packet_router(), helper.config(), helper.audio_state(), helper.event_log()); helper.SetupMockForGetStats(); @@ -325,6 +334,7 @@ TEST(AudioReceiveStreamTest, GetStats) { TEST(AudioReceiveStreamTest, SetGain) { ConfigHelper helper; internal::AudioReceiveStream recv_stream( + helper.rtp_stream_receiver_controller(), helper.packet_router(), helper.config(), helper.audio_state(), helper.event_log()); EXPECT_CALL(*helper.channel_proxy(), @@ -335,6 +345,7 @@ TEST(AudioReceiveStreamTest, SetGain) { TEST(AudioReceiveStreamTest, StreamShouldNotBeAddedToMixerWhenVoEReturnsError) { ConfigHelper helper; internal::AudioReceiveStream recv_stream( + helper.rtp_stream_receiver_controller(), helper.packet_router(), helper.config(), helper.audio_state(), helper.event_log()); @@ -347,6 +358,7 @@ TEST(AudioReceiveStreamTest, StreamShouldNotBeAddedToMixerWhenVoEReturnsError) { TEST(AudioReceiveStreamTest, StreamShouldBeAddedToMixerOnStart) { ConfigHelper helper; internal::AudioReceiveStream recv_stream( + helper.rtp_stream_receiver_controller(), helper.packet_router(), helper.config(), helper.audio_state(), helper.event_log()); diff --git a/webrtc/call/BUILD.gn b/webrtc/call/BUILD.gn index 9807c6433e..aa98053cb7 100644 --- a/webrtc/call/BUILD.gn +++ b/webrtc/call/BUILD.gn @@ -38,6 +38,7 @@ rtc_source_set("call_interfaces") { rtc_source_set("rtp_interfaces") { sources = [ "rtp_packet_sink_interface.h", + "rtp_stream_receiver_controller_interface.h", "rtp_transport_controller_send_interface.h", ] } @@ -46,6 +47,8 @@ rtc_source_set("rtp_receiver") { sources = [ "rtp_demuxer.cc", "rtp_demuxer.h", + "rtp_stream_receiver_controller.cc", + "rtp_stream_receiver_controller.h", "rtx_receive_stream.cc", "rtx_receive_stream.h", ] diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc index c0861ddc3e..b4a9456d77 100644 --- a/webrtc/call/call.cc +++ b/webrtc/call/call.cc @@ -34,7 +34,7 @@ #include "webrtc/call/bitrate_allocator.h" #include "webrtc/call/call.h" #include "webrtc/call/flexfec_receive_stream_impl.h" -#include "webrtc/call/rtp_demuxer.h" +#include "webrtc/call/rtp_stream_receiver_controller.h" #include "webrtc/call/rtp_transport_controller_send.h" #include "webrtc/config.h" #include "webrtc/logging/rtc_event_log/rtc_event_log.h" @@ -275,10 +275,10 @@ class Call : public webrtc::Call, std::map sync_stream_mapping_ GUARDED_BY(receive_crit_); - // TODO(nisse): Should eventually be part of injected - // RtpTransportControllerReceive, with a single demuxer in the bundled case. - RtpDemuxer audio_rtp_demuxer_ GUARDED_BY(receive_crit_); - RtpDemuxer video_rtp_demuxer_ GUARDED_BY(receive_crit_); + // TODO(nisse): Should eventually be injected at creation, + // with a single object in the bundled case. + RtpStreamReceiverController audio_receiver_controller; + RtpStreamReceiverController video_receiver_controller; // This extra map is used for receive processing which is // independent of media type. @@ -486,10 +486,6 @@ rtc::Optional Call::ParseRtpPacket( if (!parsed_packet.Parse(packet, length)) return rtc::Optional(); - auto it = receive_rtp_config_.find(parsed_packet.Ssrc()); - if (it != receive_rtp_config_.end()) - parsed_packet.IdentifyExtensions(it->second.extensions); - int64_t arrival_time_ms; if (packet_time && packet_time->timestamp != -1) { arrival_time_ms = (packet_time->timestamp + 500) / 1000; @@ -646,12 +642,11 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream"); RTC_DCHECK_RUN_ON(&configuration_thread_checker_); event_log_->LogAudioReceiveStreamConfig(CreateRtcLogStreamConfig(config)); - AudioReceiveStream* receive_stream = - new AudioReceiveStream(transport_send_->packet_router(), config, - config_.audio_state, event_log_); + AudioReceiveStream* receive_stream = new AudioReceiveStream( + &audio_receiver_controller, transport_send_->packet_router(), config, + config_.audio_state, event_log_); { WriteLockScoped write_lock(*receive_crit_); - audio_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream); receive_rtp_config_[config.rtp.remote_ssrc] = ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config)); audio_receive_streams_.insert(receive_stream); @@ -683,8 +678,6 @@ void Call::DestroyAudioReceiveStream( uint32_t ssrc = config.rtp.remote_ssrc; receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(ssrc); - size_t num_deleted = audio_rtp_demuxer_.RemoveSink(audio_receive_stream); - RTC_DCHECK(num_deleted == 1); audio_receive_streams_.erase(audio_receive_stream); const std::string& sync_group = audio_receive_stream->config().sync_group; const auto it = sync_stream_mapping_.find(sync_group); @@ -776,19 +769,17 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream"); RTC_DCHECK_RUN_ON(&configuration_thread_checker_); - VideoReceiveStream* receive_stream = - new VideoReceiveStream(num_cpu_cores_, transport_send_->packet_router(), - std::move(configuration), - module_process_thread_.get(), call_stats_.get()); + VideoReceiveStream* receive_stream = new VideoReceiveStream( + &video_receiver_controller, num_cpu_cores_, + transport_send_->packet_router(), std::move(configuration), + module_process_thread_.get(), call_stats_.get()); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); ReceiveRtpConfig receive_config(config.rtp.extensions, UseSendSideBwe(config)); { WriteLockScoped write_lock(*receive_crit_); - video_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream); if (config.rtp.rtx_ssrc) { - video_rtp_demuxer_.AddSink(config.rtp.rtx_ssrc, receive_stream); // We record identical config for the rtx stream as for the main // stream. Since the transport_send_cc negotiation is per payload // type, we may get an incorrect value for the rtx stream, but @@ -817,8 +808,6 @@ void Call::DestroyVideoReceiveStream( WriteLockScoped write_lock(*receive_crit_); // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a // separate SSRC there can be either one or two. - size_t num_deleted = video_rtp_demuxer_.RemoveSink(receive_stream_impl); - RTC_DCHECK_GE(num_deleted, 1); receive_rtp_config_.erase(config.rtp.remote_ssrc); if (config.rtp.rtx_ssrc) { receive_rtp_config_.erase(config.rtp.rtx_ssrc); @@ -840,16 +829,22 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( RTC_DCHECK_RUN_ON(&configuration_thread_checker_); RecoveredPacketReceiver* recovered_packet_receiver = this; - FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl( - config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(), - module_process_thread_.get()); + FlexfecReceiveStreamImpl* receive_stream; { WriteLockScoped write_lock(*receive_crit_); - video_rtp_demuxer_.AddSink(config.remote_ssrc, receive_stream); - - for (auto ssrc : config.protected_media_ssrcs) - video_rtp_demuxer_.AddSink(ssrc, receive_stream); + // Unlike the video and audio receive streams, + // FlexfecReceiveStream implements RtpPacketSinkInterface itself, + // and hence its constructor passes its |this| pointer to + // video_receiver_controller->CreateStream(). Calling the + // constructor while holding |receive_crit_| ensures that we don't + // call OnRtpPacket until the constructor is finished and the + // object is in a valid state. + // TODO(nisse): Fix constructor so that it can be moved outside of + // this locked scope. + receive_stream = new FlexfecReceiveStreamImpl( + &video_receiver_controller, config, recovered_packet_receiver, + call_stats_->rtcp_rtt_stats(), module_process_thread_.get()); RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == receive_rtp_config_.end()); @@ -881,7 +876,6 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be // destroyed. - video_rtp_demuxer_.RemoveSink(receive_stream_impl); receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(ssrc); } @@ -1302,17 +1296,31 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, if (!parsed_packet) return DELIVERY_PACKET_ERROR; + auto it = receive_rtp_config_.find(parsed_packet->Ssrc()); + if (it == receive_rtp_config_.end()) { + LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc " + << parsed_packet->Ssrc(); + // Destruction of the receive stream, including deregistering from the + // RtpDemuxer, is not protected by the |receive_crit_| lock. But + // deregistering in the |receive_rtp_config_| map is protected by that lock. + // So by not passing the packet on to demuxing in this case, we prevent + // incoming packets to be passed on via the demuxer to a receive stream + // which is being torned down. + return DELIVERY_UNKNOWN_SSRC; + } + parsed_packet->IdentifyExtensions(it->second.extensions); + NotifyBweOfReceivedPacket(*parsed_packet, media_type); if (media_type == MediaType::AUDIO) { - if (audio_rtp_demuxer_.OnRtpPacket(*parsed_packet)) { + if (audio_receiver_controller.OnRtpPacket(*parsed_packet)) { received_bytes_per_second_counter_.Add(static_cast(length)); received_audio_bytes_per_second_counter_.Add(static_cast(length)); event_log_->LogRtpHeader(kIncomingPacket, packet, length); return DELIVERY_OK; } } else if (media_type == MediaType::VIDEO) { - if (video_rtp_demuxer_.OnRtpPacket(*parsed_packet)) { + if (video_receiver_controller.OnRtpPacket(*parsed_packet)) { received_bytes_per_second_counter_.Add(static_cast(length)); received_video_bytes_per_second_counter_.Add(static_cast(length)); event_log_->LogRtpHeader(kIncomingPacket, packet, length); @@ -1348,7 +1356,7 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { parsed_packet->set_recovered(true); - video_rtp_demuxer_.OnRtpPacket(*parsed_packet); + video_receiver_controller.OnRtpPacket(*parsed_packet); } void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, diff --git a/webrtc/call/flexfec_receive_stream_impl.cc b/webrtc/call/flexfec_receive_stream_impl.cc index f010433498..c73d9e98e5 100644 --- a/webrtc/call/flexfec_receive_stream_impl.cc +++ b/webrtc/call/flexfec_receive_stream_impl.cc @@ -15,6 +15,7 @@ #include "webrtc/base/checks.h" #include "webrtc/base/location.h" #include "webrtc/base/logging.h" +#include "webrtc/call/rtp_stream_receiver_controller_interface.h" #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h" #include "webrtc/modules/rtp_rtcp/include/receive_statistics.h" #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h" @@ -122,6 +123,7 @@ std::unique_ptr CreateRtpRtcpModule( } // namespace FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl( + RtpStreamReceiverControllerInterface* receiver_controller, const Config& config, RecoveredPacketReceiver* recovered_packet_receiver, RtcpRttStats* rtt_stats, @@ -141,6 +143,22 @@ FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl( rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode); rtp_rtcp_->SetSSRC(config_.local_ssrc); process_thread_->RegisterModule(rtp_rtcp_.get(), RTC_FROM_HERE); + + // Register with transport. + // TODO(nisse): OnRtpPacket in this class delegates all real work to + // |receiver_|. So maybe we don't need to implement RtpPacketSinkInterface + // here at all, we'd then delete the OnRtpPacket method and instead register + // |receiver_| as the RtpPacketSinkInterface for this stream. + // TODO(nisse): Passing |this| from the constructor to the RtpDemuxer, before + // the object is fully initialized, is risky. But it works in this case + // because locking in our caller, Call::CreateFlexfecReceiveStream, ensures + // that the demuxer doesn't call OnRtpPacket before this object is fully + // constructed. Registering |receiver_| instead of |this| would solve this + // problem too. + rtp_stream_receiver_ = + receiver_controller->CreateReceiver(config_.remote_ssrc, this); + for (uint32_t ssrc : config.protected_media_ssrcs) + receiver_controller->AddSink(ssrc, this); } FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() { diff --git a/webrtc/call/flexfec_receive_stream_impl.h b/webrtc/call/flexfec_receive_stream_impl.h index e4c22942e6..a89940f8c3 100644 --- a/webrtc/call/flexfec_receive_stream_impl.h +++ b/webrtc/call/flexfec_receive_stream_impl.h @@ -26,14 +26,18 @@ class RecoveredPacketReceiver; class RtcpRttStats; class RtpPacketReceived; class RtpRtcp; +class RtpStreamReceiverControllerInterface; +class RtpStreamReceiverInterface; class FlexfecReceiveStreamImpl : public FlexfecReceiveStream, public RtpPacketSinkInterface { public: - FlexfecReceiveStreamImpl(const Config& config, - RecoveredPacketReceiver* recovered_packet_receiver, - RtcpRttStats* rtt_stats, - ProcessThread* process_thread); + FlexfecReceiveStreamImpl( + RtpStreamReceiverControllerInterface* receiver_controller, + const Config& config, + RecoveredPacketReceiver* recovered_packet_receiver, + RtcpRttStats* rtt_stats, + ProcessThread* process_thread); ~FlexfecReceiveStreamImpl() override; const Config& GetConfig() const { return config_; } @@ -59,6 +63,8 @@ class FlexfecReceiveStreamImpl : public FlexfecReceiveStream, const std::unique_ptr rtp_receive_statistics_; const std::unique_ptr rtp_rtcp_; ProcessThread* process_thread_; + + std::unique_ptr rtp_stream_receiver_; }; } // namespace webrtc diff --git a/webrtc/call/flexfec_receive_stream_unittest.cc b/webrtc/call/flexfec_receive_stream_unittest.cc index 46bd7c382c..ba41406094 100644 --- a/webrtc/call/flexfec_receive_stream_unittest.cc +++ b/webrtc/call/flexfec_receive_stream_unittest.cc @@ -12,6 +12,7 @@ #include "webrtc/base/array_view.h" #include "webrtc/call/flexfec_receive_stream_impl.h" +#include "webrtc/call/rtp_stream_receiver_controller.h" #include "webrtc/modules/pacing/packet_router.h" #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h" #include "webrtc/modules/rtp_rtcp/mocks/mock_recovered_packet_receiver.h" @@ -77,7 +78,8 @@ class FlexfecReceiveStreamTest : public ::testing::Test { protected: FlexfecReceiveStreamTest() : config_(CreateDefaultConfig(&rtcp_send_transport_)), - receive_stream_(config_, + receive_stream_(&rtp_stream_receiver_controller_, + config_, &recovered_packet_receiver_, &rtt_stats_, &process_thread_) {} @@ -87,7 +89,7 @@ class FlexfecReceiveStreamTest : public ::testing::Test { MockRecoveredPacketReceiver recovered_packet_receiver_; MockRtcpRttStats rtt_stats_; MockProcessThread process_thread_; - + RtpStreamReceiverController rtp_stream_receiver_controller_; FlexfecReceiveStreamImpl receive_stream_; }; @@ -134,7 +136,8 @@ TEST_F(FlexfecReceiveStreamTest, RecoversPacketWhenStarted) { // clang-format on testing::StrictMock recovered_packet_receiver; - FlexfecReceiveStreamImpl receive_stream(config_, &recovered_packet_receiver, + FlexfecReceiveStreamImpl receive_stream(&rtp_stream_receiver_controller_, + config_, &recovered_packet_receiver, &rtt_stats_, &process_thread_); // Do not call back before being started. diff --git a/webrtc/call/rtp_stream_receiver_controller.cc b/webrtc/call/rtp_stream_receiver_controller.cc new file mode 100644 index 0000000000..a4b1e36ae2 --- /dev/null +++ b/webrtc/call/rtp_stream_receiver_controller.cc @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/call/rtp_stream_receiver_controller.h" +#include "webrtc/base/ptr_util.h" + +namespace webrtc { + +RtpStreamReceiverController::Receiver::Receiver( + RtpStreamReceiverController* controller, + uint32_t ssrc, + RtpPacketSinkInterface* sink) + : controller_(controller), sink_(sink) { + controller_->AddSink(ssrc, sink_); +} + +RtpStreamReceiverController::Receiver::~Receiver() { + // Don't require return value > 0, since for RTX we currently may + // have multiple Receiver objects with the same sink. + // TODO(nisse): Consider adding a DCHECK when RtxReceiveStream is wired up. + controller_->RemoveSink(sink_); +} + +RtpStreamReceiverController::RtpStreamReceiverController() = default; +RtpStreamReceiverController::~RtpStreamReceiverController() = default; + +std::unique_ptr +RtpStreamReceiverController::CreateReceiver( + uint32_t ssrc, + RtpPacketSinkInterface* sink) { + return rtc::MakeUnique(this, ssrc, sink); +} + +bool RtpStreamReceiverController::OnRtpPacket(const RtpPacketReceived& packet) { + rtc::CritScope cs(&lock_); + return demuxer_.OnRtpPacket(packet); +} + +void RtpStreamReceiverController::AddSink(uint32_t ssrc, + RtpPacketSinkInterface* sink) { + rtc::CritScope cs(&lock_); + return demuxer_.AddSink(ssrc, sink); +} + +size_t RtpStreamReceiverController::RemoveSink( + const RtpPacketSinkInterface* sink) { + rtc::CritScope cs(&lock_); + return demuxer_.RemoveSink(sink); +} + +} // namespace webrtc diff --git a/webrtc/call/rtp_stream_receiver_controller.h b/webrtc/call/rtp_stream_receiver_controller.h new file mode 100644 index 0000000000..5c8ed6775e --- /dev/null +++ b/webrtc/call/rtp_stream_receiver_controller.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_ +#define WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_ + +#include + +#include "webrtc/base/criticalsection.h" +#include "webrtc/call/rtp_demuxer.h" +#include "webrtc/call/rtp_stream_receiver_controller_interface.h" + +namespace webrtc { + +class RtpPacketReceived; + +// This class represents the RTP receive parsing and demuxing, for a +// single RTP session. +// TODO(nisse): Add RTCP processing, we should aim to terminate RTCP +// and not leave any RTCP processing to individual receive streams. +// TODO(nisse): Extract per-packet processing, including parsing and +// demuxing, into a separate class. +class RtpStreamReceiverController + : public RtpStreamReceiverControllerInterface { + public: + RtpStreamReceiverController(); + ~RtpStreamReceiverController() override; + + // Implements RtpStreamReceiverControllerInterface. + std::unique_ptr CreateReceiver( + uint32_t ssrc, + RtpPacketSinkInterface* sink) override; + + // Thread-safe wrappers for the corresponding RtpDemuxer methods. + void AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) override; + size_t RemoveSink(const RtpPacketSinkInterface* sink) override; + + // TODO(nisse): Not yet responsible for parsing. + bool OnRtpPacket(const RtpPacketReceived& packet); + + private: + class Receiver : public RtpStreamReceiverInterface { + public: + Receiver(RtpStreamReceiverController* controller, + uint32_t ssrc, + RtpPacketSinkInterface* sink); + + ~Receiver() override; + + private: + RtpStreamReceiverController* const controller_; + RtpPacketSinkInterface* const sink_; + }; + + // TODO(nisse): Move to a TaskQueue for synchronization. When used + // by Call, we expect construction and all methods but OnRtpPacket + // to be called on the same thread, and OnRtpPacket to be called + // by a single, but possibly distinct, thread. But applications not + // using Call may have use threads differently. + rtc::CriticalSection lock_; + RtpDemuxer demuxer_ GUARDED_BY(&lock_); +}; + +} // namespace webrtc + +#endif // WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_ diff --git a/webrtc/call/rtp_stream_receiver_controller_interface.h b/webrtc/call/rtp_stream_receiver_controller_interface.h new file mode 100644 index 0000000000..51d25a525e --- /dev/null +++ b/webrtc/call/rtp_stream_receiver_controller_interface.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_ +#define WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_ + +#include + +#include "webrtc/call/rtp_packet_sink_interface.h" + +namespace webrtc { + +// An RtpStreamReceiver is responsible for the rtp-specific but +// media-independent state needed for receiving an RTP stream. +// TODO(nisse): Currently, only owns the association between ssrc and +// the stream's RtpPacketSinkInterface. Ownership of corresponding +// objects from modules/rtp_rtcp/ should move to this class (or +// rather, the corresponding implementation class). We should add +// methods for getting rtp receive stats, and for sending RTCP +// messages related to the receive stream. +class RtpStreamReceiverInterface { + public: + virtual ~RtpStreamReceiverInterface() {} +}; + +// This class acts as a factory for RtpStreamReceiver objects. +class RtpStreamReceiverControllerInterface { + public: + virtual ~RtpStreamReceiverControllerInterface() {} + + virtual std::unique_ptr CreateReceiver( + uint32_t ssrc, + RtpPacketSinkInterface* sink) = 0; + // For registering additional sinks, needed for FlexFEC. + virtual void AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) = 0; + virtual size_t RemoveSink(const RtpPacketSinkInterface* sink) = 0; +}; + +} // namespace webrtc + +#endif // WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_ diff --git a/webrtc/video/BUILD.gn b/webrtc/video/BUILD.gn index da31bcad7f..58f58dc260 100644 --- a/webrtc/video/BUILD.gn +++ b/webrtc/video/BUILD.gn @@ -260,6 +260,7 @@ if (rtc_include_tests) { "../base:rtc_base_approved", "../base:rtc_base_tests_utils", "../call:call_interfaces", + "../call:rtp_receiver", "../common_video", "../logging:rtc_event_log_api", "../media:rtc_media", diff --git a/webrtc/video/rtp_video_stream_receiver.h b/webrtc/video/rtp_video_stream_receiver.h index 736f513697..924f1be86c 100644 --- a/webrtc/video/rtp_video_stream_receiver.h +++ b/webrtc/video/rtp_video_stream_receiver.h @@ -19,6 +19,7 @@ #include "webrtc/base/constructormagic.h" #include "webrtc/base/criticalsection.h" +#include "webrtc/call/rtp_packet_sink_interface.h" #include "webrtc/modules/include/module_common_types.h" #include "webrtc/modules/rtp_rtcp/include/receive_statistics.h" #include "webrtc/modules/rtp_rtcp/include/remote_ntp_time_estimator.h" @@ -58,6 +59,7 @@ class VideoReceiver; class RtpVideoStreamReceiver : public RtpData, public RecoveredPacketReceiver, public RtpFeedback, + public RtpPacketSinkInterface, public VCMFrameTypeCallback, public VCMPacketRequestCallback, public video_coding::OnReceivedFrameCallback, @@ -96,8 +98,8 @@ class RtpVideoStreamReceiver : public RtpData, void SignalNetworkState(NetworkState state); - // TODO(nisse): Intended to be part of an RtpPacketReceiver interface. - void OnRtpPacket(const RtpPacketReceived& packet); + // Implements RtpPacketSinkInterface. + void OnRtpPacket(const RtpPacketReceived& packet) override; // Implements RtpData. int32_t OnReceivedPayloadData(const uint8_t* payload_data, diff --git a/webrtc/video/video_receive_stream.cc b/webrtc/video/video_receive_stream.cc index 9eceb009c4..acc497b616 100644 --- a/webrtc/video/video_receive_stream.cc +++ b/webrtc/video/video_receive_stream.cc @@ -21,6 +21,7 @@ #include "webrtc/base/logging.h" #include "webrtc/base/optional.h" #include "webrtc/base/trace_event.h" +#include "webrtc/call/rtp_stream_receiver_controller_interface.h" #include "webrtc/common_types.h" #include "webrtc/common_video/h264/profile_level_id.h" #include "webrtc/common_video/libyuv/include/webrtc_libyuv.h" @@ -168,11 +169,13 @@ VideoCodec CreateDecoderVideoCodec(const VideoReceiveStream::Decoder& decoder) { namespace internal { -VideoReceiveStream::VideoReceiveStream(int num_cpu_cores, - PacketRouter* packet_router, - VideoReceiveStream::Config config, - ProcessThread* process_thread, - CallStats* call_stats) +VideoReceiveStream::VideoReceiveStream( + RtpStreamReceiverControllerInterface* receiver_controller, + int num_cpu_cores, + PacketRouter* packet_router, + VideoReceiveStream::Config config, + ProcessThread* process_thread, + CallStats* call_stats) : transport_adapter_(config.rtcp_send_transport), config_(std::move(config)), num_cpu_cores_(num_cpu_cores), @@ -222,6 +225,14 @@ VideoReceiveStream::VideoReceiveStream(int num_cpu_cores, clock_, jitter_estimator_.get(), timing_.get(), &stats_proxy_)); process_thread_->RegisterModule(&rtp_stream_sync_, RTC_FROM_HERE); + + // Register with RtpStreamReceiverController. + media_receiver_ = receiver_controller->CreateReceiver( + config_.rtp.remote_ssrc, &rtp_video_stream_receiver_); + if (config.rtp.rtx_ssrc) { + rtx_receiver_ = receiver_controller->CreateReceiver( + config_.rtp.rtx_ssrc, &rtp_video_stream_receiver_); + } } VideoReceiveStream::~VideoReceiveStream() { @@ -241,10 +252,6 @@ bool VideoReceiveStream::DeliverRtcp(const uint8_t* packet, size_t length) { return rtp_video_stream_receiver_.DeliverRtcp(packet, length); } -void VideoReceiveStream::OnRtpPacket(const RtpPacketReceived& packet) { - rtp_video_stream_receiver_.OnRtpPacket(packet); -} - void VideoReceiveStream::SetSync(Syncable* audio_syncable) { RTC_DCHECK_RUN_ON(&worker_thread_checker_); rtp_stream_sync_.ConfigureSync(audio_syncable); diff --git a/webrtc/video/video_receive_stream.h b/webrtc/video/video_receive_stream.h index 6020d3f88d..a32573de97 100644 --- a/webrtc/video/video_receive_stream.h +++ b/webrtc/video/video_receive_stream.h @@ -36,6 +36,8 @@ class CallStats; class IvfFileWriter; class ProcessThread; class RTPFragmentationHeader; +class RtpStreamReceiverInterface; +class RtpStreamReceiverControllerInterface; class VCMTiming; class VCMJitterEstimator; @@ -47,10 +49,10 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, public NackSender, public KeyFrameRequestSender, public video_coding::OnCompleteFrameCallback, - public Syncable, - public RtpPacketSinkInterface { + public Syncable { public: - VideoReceiveStream(int num_cpu_cores, + VideoReceiveStream(RtpStreamReceiverControllerInterface* receiver_controller, + int num_cpu_cores, PacketRouter* packet_router, VideoReceiveStream::Config config, ProcessThread* process_thread, @@ -78,9 +80,6 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, void EnableEncodedFrameRecording(rtc::PlatformFile file, size_t byte_limit) override; - // RtpPacketSinkInterface. - void OnRtpPacket(const RtpPacketReceived& packet) override; - // Implements rtc::VideoSinkInterface. void OnFrame(const VideoFrame& video_frame) override; @@ -137,6 +136,9 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, // Members for the new jitter buffer experiment. std::unique_ptr jitter_estimator_; std::unique_ptr frame_buffer_; + + std::unique_ptr media_receiver_; + std::unique_ptr rtx_receiver_; }; } // namespace internal } // namespace webrtc diff --git a/webrtc/video/video_receive_stream_unittest.cc b/webrtc/video/video_receive_stream_unittest.cc index 237eed40b8..a1b644e666 100644 --- a/webrtc/video/video_receive_stream_unittest.cc +++ b/webrtc/video/video_receive_stream_unittest.cc @@ -16,6 +16,7 @@ #include "webrtc/api/video_codecs/video_decoder.h" #include "webrtc/base/criticalsection.h" #include "webrtc/base/event.h" +#include "webrtc/call/rtp_stream_receiver_controller.h" #include "webrtc/media/base/fakevideorenderer.h" #include "webrtc/modules/pacing/packet_router.h" #include "webrtc/modules/rtp_rtcp/source/rtp_packet_to_send.h" @@ -25,15 +26,14 @@ #include "webrtc/system_wrappers/include/clock.h" #include "webrtc/test/field_trial.h" +namespace webrtc { +namespace { + using testing::_; using testing::Invoke; constexpr int kDefaultTimeOutMs = 50; -namespace webrtc { - -namespace { - const char kNewJitterBufferFieldTrialEnabled[] = "WebRTC-NewVideoJitterBuffer/Enabled/"; @@ -91,7 +91,7 @@ class VideoReceiveStreamTest : public testing::Test { config_.decoders.push_back(null_decoder); video_receive_stream_.reset(new webrtc::internal::VideoReceiveStream( - kDefaultNumCpuCores, + &rtp_stream_receiver_controller_, kDefaultNumCpuCores, &packet_router_, config_.Copy(), process_thread_.get(), &call_stats_)); } @@ -105,6 +105,7 @@ class VideoReceiveStreamTest : public testing::Test { MockTransport mock_transport_; PacketRouter packet_router_; std::unique_ptr process_thread_; + RtpStreamReceiverController rtp_stream_receiver_controller_; std::unique_ptr video_receive_stream_; }; @@ -130,9 +131,10 @@ TEST_F(VideoReceiveStreamTest, CreateFrameFromH264FmtpSpropAndIdr) { EXPECT_CALL(mock_h264_video_decoder_, Decode(_, false, _, _, _)); RtpPacketReceived parsed_packet; ASSERT_TRUE(parsed_packet.Parse(rtppacket.data(), rtppacket.size())); - video_receive_stream_->OnRtpPacket(parsed_packet); + rtp_stream_receiver_controller_.OnRtpPacket(parsed_packet); EXPECT_CALL(mock_h264_video_decoder_, Release()); // Make sure the decoder thread had a chance to run. init_decode_event_.Wait(kDefaultTimeOutMs); } + } // namespace webrtc diff --git a/webrtc/voice_engine/channel_proxy.h b/webrtc/voice_engine/channel_proxy.h index 35d0df5360..f5417f254a 100644 --- a/webrtc/voice_engine/channel_proxy.h +++ b/webrtc/voice_engine/channel_proxy.h @@ -17,6 +17,7 @@ #include "webrtc/base/constructormagic.h" #include "webrtc/base/race_checker.h" #include "webrtc/base/thread_checker.h" +#include "webrtc/call/rtp_packet_sink_interface.h" #include "webrtc/voice_engine/channel_manager.h" #include "webrtc/voice_engine/include/voe_rtp_rtcp.h" @@ -50,7 +51,7 @@ class Channel; // voe::Channel class. // 2. Provide a refined interface for the stream classes, including assumptions // on return values and input adaptation. -class ChannelProxy { +class ChannelProxy : public RtpPacketSinkInterface { public: ChannelProxy(); explicit ChannelProxy(const ChannelOwner& channel_owner); @@ -94,7 +95,9 @@ class ChannelProxy { virtual void SetInputMute(bool muted); virtual void RegisterExternalTransport(Transport* transport); virtual void DeRegisterExternalTransport(); - virtual void OnRtpPacket(const RtpPacketReceived& packet); + + // Implements RtpPacketSinkInterface + void OnRtpPacket(const RtpPacketReceived& packet) override; virtual bool ReceivedRTCPPacket(const uint8_t* packet, size_t length); virtual const rtc::scoped_refptr& GetAudioDecoderFactory() const;