From 949c2f04b4156095090e02f3f13613aadacce88d Mon Sep 17 00:00:00 2001 From: mflodman Date: Fri, 16 Oct 2015 02:31:11 -0700 Subject: [PATCH] Move ownership of send ViEChannels and ViEEncoder to VideoSendStream. This is the first CL to get ready for adapting audio bitrate based on BWE. I've kept this CL as small as possible and had to add a few getters to ChannelManager. The next CL will do the same for receive ViEChannels. The getters are a bit uggly, but is an in-between-state. Let's discuss future ownership of the different modules and what do do with ChannelGroup. BUG=5079 Review URL: https://codereview.webrtc.org/1394243006 Cr-Commit-Position: refs/heads/master@{#10298} --- webrtc/video/video_send_stream.cc | 67 +++++++++-- webrtc/video/video_send_stream.h | 5 +- webrtc/video_engine/vie_channel.cc | 5 +- webrtc/video_engine/vie_channel.h | 2 +- webrtc/video_engine/vie_channel_group.cc | 137 +++++++---------------- webrtc/video_engine/vie_channel_group.h | 28 ++--- 6 files changed, 117 insertions(+), 127 deletions(-) diff --git a/webrtc/video/video_send_stream.cc b/webrtc/video/video_send_stream.cc index af6ae8e4aa..33605162b4 100644 --- a/webrtc/video/video_send_stream.cc +++ b/webrtc/video/video_send_stream.cc @@ -21,6 +21,8 @@ #include "webrtc/system_wrappers/interface/logging.h" #include "webrtc/system_wrappers/interface/trace_event.h" #include "webrtc/video/video_capture_input.h" +#include "webrtc/video_engine/call_stats.h" +#include "webrtc/video_engine/payload_router.h" #include "webrtc/video_engine/vie_channel.h" #include "webrtc/video_engine/vie_channel_group.h" #include "webrtc/video_engine/vie_defines.h" @@ -28,6 +30,12 @@ #include "webrtc/video_send_stream.h" namespace webrtc { + +class BitrateAllocator; +class PacedSender; +class RtcpIntraFrameObserver; +class TransportFeedbackObserver; + std::string VideoSendStream::Config::EncoderSettings::ToString() const { std::stringstream ss; @@ -114,16 +122,45 @@ VideoSendStream::VideoSendStream( suspended_ssrcs_(suspended_ssrcs), module_process_thread_(module_process_thread), channel_group_(channel_group), - channel_id_(channel_id), use_config_bitrate_(true), stats_proxy_(Clock::GetRealTimeClock(), config) { LOG(LS_INFO) << "VideoSendStream: " << config_.ToString(); RTC_DCHECK(!config_.rtp.ssrcs.empty()); - RTC_CHECK(channel_group->CreateSendChannel( - channel_id_, &transport_adapter_, &stats_proxy_, - config.pre_encode_callback, num_cpu_cores, config_)); - vie_channel_ = channel_group_->GetChannel(channel_id_); - vie_encoder_ = channel_group_->GetEncoder(channel_id_); + + // Set up Call-wide sequence numbers, if configured for this send stream. + TransportFeedbackObserver* transport_feedback_observer = nullptr; + for (const RtpExtension& extension : config.rtp.extensions) { + if (extension.name == RtpExtension::kTransportSequenceNumber) { + transport_feedback_observer = + channel_group_->GetTransportFeedbackObserver(); + break; + } + } + + const std::vector& ssrcs = config.rtp.ssrcs; + + vie_encoder_.reset(new ViEEncoder( + channel_id, num_cpu_cores, module_process_thread_, &stats_proxy_, + config.pre_encode_callback, channel_group_->pacer(), + channel_group_->bitrate_allocator())); + RTC_CHECK(vie_encoder_->Init()); + + vie_channel_.reset(new ViEChannel( + num_cpu_cores, config.send_transport, module_process_thread_, + channel_group_->GetRtcpIntraFrameObserver(), + channel_group_->GetBitrateController()->CreateRtcpBandwidthObserver(), + transport_feedback_observer, + channel_group_->GetRemoteBitrateEstimator(), + channel_group_->GetCallStats()->rtcp_rtt_stats(), channel_group_->pacer(), + channel_group_->packet_router(), ssrcs.size(), true)); + RTC_CHECK(vie_channel_->Init() == 0); + + vie_encoder_->StartThreadsAndSetSharedMembers( + vie_channel_->send_payload_router(), + vie_channel_->vcm_protection_callback()); + + std::vector first_ssrc(1, ssrcs[0]); + vie_encoder_->SetSsrcs(first_ssrc); for (size_t i = 0; i < config_.rtp.extensions.size(); ++i) { const std::string& extension = config_.rtp.extensions[i].name; @@ -145,7 +182,7 @@ VideoSendStream::VideoSendStream( } // TODO(pbos): Consider configuring REMB in Call. - channel_group_->SetChannelRembStatus(true, false, vie_channel_); + channel_group_->SetChannelRembStatus(true, false, vie_channel_.get()); // Enable NACK, FEC or both. const bool enable_protection_nack = config_.rtp.nack.rtp_history_ms > 0; @@ -162,7 +199,7 @@ VideoSendStream::VideoSendStream( vie_channel_->SetRTCPCName(config_.rtp.c_name.c_str()); input_.reset(new internal::VideoCaptureInput( - module_process_thread_, vie_encoder_, config_.local_renderer, + module_process_thread_, vie_encoder_.get(), config_.local_renderer, &stats_proxy_, this, config_.encoding_time_observer)); // 28 to match packet overhead in ModuleRtpRtcpImpl. @@ -187,6 +224,8 @@ VideoSendStream::VideoSendStream( if (config_.suspend_below_min_bitrate) vie_encoder_->SuspendBelowMinBitrate(); + channel_group_->AddEncoder(ssrcs, vie_encoder_.get()); + vie_channel_->RegisterSendChannelRtcpStatisticsCallback(&stats_proxy_); vie_channel_->RegisterSendChannelRtpStatisticsCallback(&stats_proxy_); vie_channel_->RegisterRtcpPacketTypeCounterObserver(&stats_proxy_); @@ -209,7 +248,17 @@ VideoSendStream::~VideoSendStream() { vie_encoder_->DeRegisterExternalEncoder( config_.encoder_settings.payload_type); - channel_group_->DeleteChannel(channel_id_); + channel_group_->GetCallStats()->DeregisterStatsObserver( + vie_channel_->GetStatsObserver()); + channel_group_->SetChannelRembStatus(false, false, vie_channel_.get()); + + // Remove the feedback, stop all encoding threads and processing. This must be + // done before deleting the channel. + channel_group_->RemoveEncoder(vie_encoder_.get()); + vie_encoder_->StopThreadsAndRemoveSharedMembers(); + + uint32_t remote_ssrc = vie_channel_->GetRemoteSSRC(); + channel_group_->GetRemoteBitrateEstimator()->RemoveStream(remote_ssrc); } VideoCaptureInput* VideoSendStream::Input() { diff --git a/webrtc/video/video_send_stream.h b/webrtc/video/video_send_stream.h index 94f5f5a5b8..874d00ff05 100644 --- a/webrtc/video/video_send_stream.h +++ b/webrtc/video/video_send_stream.h @@ -78,11 +78,10 @@ class VideoSendStream : public webrtc::VideoSendStream, ProcessThread* const module_process_thread_; ChannelGroup* const channel_group_; - const int channel_id_; rtc::scoped_ptr input_; - ViEChannel* vie_channel_; - ViEEncoder* vie_encoder_; + rtc::scoped_ptr vie_channel_; + rtc::scoped_ptr vie_encoder_; // Used as a workaround to indicate that we should be using the configured // start bitrate initially, instead of the one reported by VideoEngine (which diff --git a/webrtc/video_engine/vie_channel.cc b/webrtc/video_engine/vie_channel.cc index bd722cfeda..ba6d524b77 100644 --- a/webrtc/video_engine/vie_channel.cc +++ b/webrtc/video_engine/vie_channel.cc @@ -725,9 +725,8 @@ int32_t ViEChannel::GetLocalSSRC(uint8_t idx, unsigned int* ssrc) { return 0; } -int32_t ViEChannel::GetRemoteSSRC(uint32_t* ssrc) { - *ssrc = vie_receiver_.GetRemoteSsrc(); - return 0; +uint32_t ViEChannel::GetRemoteSSRC() { + return vie_receiver_.GetRemoteSsrc(); } int ViEChannel::SetRtxSendPayloadType(int payload_type, diff --git a/webrtc/video_engine/vie_channel.h b/webrtc/video_engine/vie_channel.h index b36ce2149e..488923d7e9 100644 --- a/webrtc/video_engine/vie_channel.h +++ b/webrtc/video_engine/vie_channel.h @@ -130,7 +130,7 @@ class ViEChannel : public VCMFrameTypeCallback, int32_t GetLocalSSRC(uint8_t idx, unsigned int* ssrc); // Gets SSRC for the incoming stream. - int32_t GetRemoteSSRC(uint32_t* ssrc); + uint32_t GetRemoteSSRC(); int SetRtxSendPayloadType(int payload_type, int associated_payload_type); void SetRtxReceivePayloadType(int payload_type, int associated_payload_type); diff --git a/webrtc/video_engine/vie_channel_group.cc b/webrtc/video_engine/vie_channel_group.cc index a76c50ac99..51ef83cbcb 100644 --- a/webrtc/video_engine/vie_channel_group.cc +++ b/webrtc/video_engine/vie_channel_group.cc @@ -193,62 +193,7 @@ ChannelGroup::~ChannelGroup() { call_stats_->DeregisterStatsObserver(transport_feedback_adapter_.get()); RTC_DCHECK(channel_map_.empty()); RTC_DCHECK(!remb_->InUse()); - RTC_DCHECK(vie_encoder_map_.empty()); -} - -bool ChannelGroup::CreateSendChannel(int channel_id, - Transport* transport, - SendStatisticsProxy* stats_proxy, - I420FrameCallback* pre_encode_callback, - int number_of_cores, - const VideoSendStream::Config& config) { - TransportFeedbackObserver* transport_feedback_observer = nullptr; - bool transport_seq_enabled = false; - for (const RtpExtension& extension : config.rtp.extensions) { - if (extension.name == RtpExtension::kTransportSequenceNumber) { - transport_seq_enabled = true; - break; - } - } - if (transport_seq_enabled) { - if (transport_feedback_adapter_.get() == nullptr) { - transport_feedback_adapter_.reset(new TransportFeedbackAdapter( - bitrate_controller_->CreateRtcpBandwidthObserver(), - Clock::GetRealTimeClock(), process_thread_)); - transport_feedback_adapter_->SetBitrateEstimator( - new RemoteBitrateEstimatorAbsSendTime( - transport_feedback_adapter_.get(), Clock::GetRealTimeClock())); - transport_feedback_adapter_->GetBitrateEstimator()->SetMinBitrate( - min_bitrate_bps_); - call_stats_->RegisterStatsObserver(transport_feedback_adapter_.get()); - } - transport_feedback_observer = transport_feedback_adapter_.get(); - } - - const std::vector& ssrcs = config.rtp.ssrcs; - RTC_DCHECK(!ssrcs.empty()); - rtc::scoped_ptr vie_encoder(new ViEEncoder( - channel_id, number_of_cores, process_thread_, stats_proxy, - pre_encode_callback, pacer_.get(), bitrate_allocator_.get())); - if (!vie_encoder->Init()) { - return false; - } - ViEEncoder* encoder = vie_encoder.get(); - if (!CreateChannel(channel_id, transport, number_of_cores, - vie_encoder.release(), ssrcs.size(), true, - remote_bitrate_estimator_.get(), - transport_feedback_observer)) { - return false; - } - ViEChannel* channel = channel_map_[channel_id]; - // Connect the encoder with the send packet router, to enable sending. - encoder->StartThreadsAndSetSharedMembers(channel->send_payload_router(), - channel->vcm_protection_callback()); - - encoder_state_feedback_->AddEncoder(ssrcs, encoder); - std::vector first_ssrc(1, ssrcs[0]); - encoder->SetSsrcs(first_ssrc); - return true; + RTC_DCHECK(encoders_.empty()); } bool ChannelGroup::CreateReceiveChannel( @@ -270,14 +215,13 @@ bool ChannelGroup::CreateReceiveChannel( } else { bitrate_estimator = remote_bitrate_estimator_.get(); } - return CreateChannel(channel_id, transport, number_of_cores, nullptr, 1, - false, bitrate_estimator, nullptr); + return CreateChannel(channel_id, transport, number_of_cores, 1, false, + bitrate_estimator, nullptr); } bool ChannelGroup::CreateChannel(int channel_id, Transport* transport, int number_of_cores, - ViEEncoder* vie_encoder, size_t max_rtp_streams, bool sender, RemoteBitrateEstimator* bitrate_estimator, @@ -295,46 +239,23 @@ bool ChannelGroup::CreateChannel(int channel_id, // Register the channel to receive stats updates. call_stats_->RegisterStatsObserver(channel->GetStatsObserver()); - // Store the channel, add it to the channel group and save the vie_encoder. + // Store the channel and add it to the channel group. channel_map_[channel_id] = channel.release(); - if (vie_encoder) { - rtc::CritScope lock(&encoder_map_crit_); - vie_encoder_map_[channel_id] = vie_encoder; - } - return true; } void ChannelGroup::DeleteChannel(int channel_id) { ViEChannel* vie_channel = PopChannel(channel_id); - ViEEncoder* vie_encoder = GetEncoder(channel_id); - call_stats_->DeregisterStatsObserver(vie_channel->GetStatsObserver()); SetChannelRembStatus(false, false, vie_channel); - // If we're a sender, remove the feedback and stop all encoding threads and - // processing. This must be done before deleting the channel. - if (vie_encoder) { - encoder_state_feedback_->RemoveEncoder(vie_encoder); - vie_encoder->StopThreadsAndRemoveSharedMembers(); - } - - unsigned int remote_ssrc = 0; - vie_channel->GetRemoteSSRC(&remote_ssrc); + unsigned int remote_ssrc = vie_channel->GetRemoteSSRC(); channel_map_.erase(channel_id); remote_bitrate_estimator_->RemoveStream(remote_ssrc); delete vie_channel; - if (vie_encoder) { - { - rtc::CritScope lock(&encoder_map_crit_); - vie_encoder_map_.erase(vie_encoder_map_.find(channel_id)); - } - delete vie_encoder; - } - LOG(LS_VERBOSE) << "Channel deleted " << channel_id; } @@ -347,12 +268,22 @@ ViEChannel* ChannelGroup::GetChannel(int channel_id) const { return it->second; } -ViEEncoder* ChannelGroup::GetEncoder(int channel_id) const { - rtc::CritScope lock(&encoder_map_crit_); - EncoderMap::const_iterator it = vie_encoder_map_.find(channel_id); - if (it == vie_encoder_map_.end()) - return nullptr; - return it->second; +void ChannelGroup::AddEncoder(const std::vector& ssrcs, + ViEEncoder* encoder) { + encoder_state_feedback_->AddEncoder(ssrcs, encoder); + rtc::CritScope lock(&encoder_crit_); + encoders_.push_back(encoder); +} + +void ChannelGroup::RemoveEncoder(ViEEncoder* encoder) { + encoder_state_feedback_->RemoveEncoder(encoder); + rtc::CritScope lock(&encoder_crit_); + for (auto it = encoders_.begin(); it != encoders_.end(); ++it) { + if (*it == encoder) { + encoders_.erase(it); + return; + } + } } ViEChannel* ChannelGroup::PopChannel(int channel_id) { @@ -395,8 +326,23 @@ CallStats* ChannelGroup::GetCallStats() const { return call_stats_.get(); } -EncoderStateFeedback* ChannelGroup::GetEncoderStateFeedback() const { - return encoder_state_feedback_.get(); +TransportFeedbackObserver* ChannelGroup::GetTransportFeedbackObserver() { + if (transport_feedback_adapter_.get() == nullptr) { + transport_feedback_adapter_.reset(new TransportFeedbackAdapter( + bitrate_controller_->CreateRtcpBandwidthObserver(), + Clock::GetRealTimeClock(), process_thread_)); + transport_feedback_adapter_->SetBitrateEstimator( + new RemoteBitrateEstimatorAbsSendTime( + transport_feedback_adapter_.get(), Clock::GetRealTimeClock())); + transport_feedback_adapter_->GetBitrateEstimator()->SetMinBitrate( + min_bitrate_bps_); + call_stats_->RegisterStatsObserver(transport_feedback_adapter_.get()); + } + return transport_feedback_adapter_.get(); +} + +RtcpIntraFrameObserver* ChannelGroup::GetRtcpIntraFrameObserver() const { + return encoder_state_feedback_->GetRtcpIntraFrameObserver(); } int64_t ChannelGroup::GetPacerQueuingDelayMs() const { @@ -430,15 +376,16 @@ void ChannelGroup::SignalNetworkState(NetworkState state) { } } +// TODO(mflodman): Move this logic out from ChannelGroup. void ChannelGroup::OnNetworkChanged(uint32_t target_bitrate_bps, uint8_t fraction_loss, int64_t rtt) { bitrate_allocator_->OnNetworkChanged(target_bitrate_bps, fraction_loss, rtt); int pad_up_to_bitrate_bps = 0; { - rtc::CritScope lock(&encoder_map_crit_); - for (const auto& encoder : vie_encoder_map_) - pad_up_to_bitrate_bps += encoder.second->GetPaddingNeededBps(); + rtc::CritScope lock(&encoder_crit_); + for (const auto& encoder : encoders_) + pad_up_to_bitrate_bps += encoder->GetPaddingNeededBps(); } pacer_->UpdateBitrate( target_bitrate_bps / 1000, diff --git a/webrtc/video_engine/vie_channel_group.h b/webrtc/video_engine/vie_channel_group.h index bb1a08ef85..90c6bdd535 100644 --- a/webrtc/video_engine/vie_channel_group.h +++ b/webrtc/video_engine/vie_channel_group.h @@ -42,27 +42,20 @@ class ViEEncoder; class VieRemb; class VoEVideoSync; -typedef std::list ChannelList; - // Channel group contains data common for several channels. All channels in the // group are assumed to send/receive data to the same end-point. class ChannelGroup : public BitrateObserver { public: explicit ChannelGroup(ProcessThread* process_thread); ~ChannelGroup(); - bool CreateSendChannel(int channel_id, - Transport* transport, - SendStatisticsProxy* stats_proxy, - I420FrameCallback* pre_encode_callback, - int number_of_cores, - const VideoSendStream::Config& config); bool CreateReceiveChannel(int channel_id, Transport* transport, int number_of_cores, const VideoReceiveStream::Config& config); void DeleteChannel(int channel_id); ViEChannel* GetChannel(int channel_id) const; - ViEEncoder* GetEncoder(int channel_id) const; + void AddEncoder(const std::vector& ssrcs, ViEEncoder* encoder); + void RemoveEncoder(ViEEncoder* encoder); void SetSyncInterface(VoEVideoSync* sync_interface); void SetBweBitrates(int min_bitrate_bps, int start_bitrate_bps, @@ -73,10 +66,15 @@ class ChannelGroup : public BitrateObserver { void SignalNetworkState(NetworkState state); BitrateController* GetBitrateController() const; - CallStats* GetCallStats() const; RemoteBitrateEstimator* GetRemoteBitrateEstimator() const; - EncoderStateFeedback* GetEncoderStateFeedback() const; + CallStats* GetCallStats() const; int64_t GetPacerQueuingDelayMs() const; + PacedSender* pacer() const { return pacer_.get(); } + PacketRouter* packet_router() const { return packet_router_.get(); } + BitrateAllocator* bitrate_allocator() const { + return bitrate_allocator_.get(); } + TransportFeedbackObserver* GetTransportFeedbackObserver(); + RtcpIntraFrameObserver* GetRtcpIntraFrameObserver() const; // Implements BitrateObserver. void OnNetworkChanged(uint32_t target_bitrate_bps, @@ -87,12 +85,10 @@ class ChannelGroup : public BitrateObserver { private: typedef std::map ChannelMap; - typedef std::map EncoderMap; bool CreateChannel(int channel_id, Transport* transport, int number_of_cores, - ViEEncoder* vie_encoder, size_t max_rtp_streams, bool sender, RemoteBitrateEstimator* bitrate_estimator, @@ -108,9 +104,9 @@ class ChannelGroup : public BitrateObserver { rtc::scoped_ptr remote_estimator_proxy_; rtc::scoped_ptr encoder_state_feedback_; ChannelMap channel_map_; - // Maps Channel id -> ViEEncoder. - mutable rtc::CriticalSection encoder_map_crit_; - EncoderMap vie_encoder_map_ GUARDED_BY(encoder_map_crit_); + + mutable rtc::CriticalSection encoder_crit_; + std::vector encoders_ GUARDED_BY(encoder_crit_); // Registered at construct time and assumed to outlive this class. ProcessThread* const process_thread_;