From 2e94de596ef3b2a51de3ba60e5212405e9df26fb Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Tue, 16 Jun 2020 16:54:10 +0200 Subject: [PATCH] Add GetSctpStats to PeerConnectionInternal, remove sctp_data_channels() This removes code from DataChannelController that exposes an internal vector of data channels and puts the onus of returning stats for a data channel, on the data channel object itself. This will come in handy as we make threading changes to the data channel object. Change-Id: Ie164cc5823cd5f9782fc5c9a63aa4c76b8229639 Bug: webrtc:11547, webrtc:11687 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177244 Commit-Queue: Tommi Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/master@{#31533} --- pc/data_channel.cc | 11 ++++++- pc/data_channel.h | 14 +++++++++ pc/data_channel_controller.cc | 40 ++++++++++++++++-------- pc/data_channel_controller.h | 9 ++++-- pc/peer_connection.cc | 11 +++---- pc/peer_connection.h | 6 +--- pc/peer_connection_internal.h | 7 +++-- pc/rtc_stats_collector.cc | 25 +++++++-------- pc/stats_collector.cc | 15 ++++----- pc/test/fake_peer_connection_base.h | 5 --- pc/test/fake_peer_connection_for_stats.h | 9 ++++-- 11 files changed, 95 insertions(+), 57 deletions(-) diff --git a/pc/data_channel.cc b/pc/data_channel.cc index e4f658cbec..22655107ca 100644 --- a/pc/data_channel.cc +++ b/pc/data_channel.cc @@ -315,7 +315,6 @@ bool DataChannel::Send(const DataBuffer& buffer) { // thread. Bring buffer management etc to the network thread and keep the // operational state management on the signaling thread. - buffered_amount_ += buffer.size(); if (state_ != kOpen) { return false; } @@ -327,6 +326,8 @@ bool DataChannel::Send(const DataBuffer& buffer) { return true; } + buffered_amount_ += buffer.size(); + // If the queue is non-empty, we're waiting for SignalReadyToSend, // so just add to the end of the queue and keep waiting. if (!queued_send_data_.Empty()) { @@ -429,6 +430,14 @@ void DataChannel::OnTransportChannelClosed() { CloseAbruptlyWithError(std::move(error)); } +DataChannel::Stats DataChannel::GetStats() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + Stats stats{internal_id_, id(), label(), + protocol(), state(), messages_sent(), + messages_received(), bytes_sent(), bytes_received()}; + return stats; +} + // The remote peer request that this channel shall be closed. void DataChannel::RemotePeerRequestClose() { RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP); diff --git a/pc/data_channel.h b/pc/data_channel.h index e84325022d..c1e855d879 100644 --- a/pc/data_channel.h +++ b/pc/data_channel.h @@ -113,6 +113,18 @@ class SctpSidAllocator { // callback and transition to kClosed. class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { public: + struct Stats { + int internal_id; + int id; + std::string label; + std::string protocol; + DataState state; + uint32_t messages_sent; + uint32_t messages_received; + uint64_t bytes_sent; + uint64_t bytes_received; + }; + static rtc::scoped_refptr Create( DataChannelProviderInterface* provider, cricket::DataChannelType dct, @@ -205,6 +217,8 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { // to kClosed. void OnTransportChannelClosed(); + Stats GetStats() const; + /******************************************* * The following methods are for RTP only. * *******************************************/ diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index 9891d5025f..a8a1491b7f 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -174,6 +174,11 @@ void DataChannelController::OnTransportClosed() { void DataChannelController::SetupDataChannelTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); data_channel_transport_invoker_ = std::make_unique(); + + // There's a new data channel transport. This needs to be signaled to the + // |sctp_data_channels_| so that they can reopen and reconnect. This is + // necessary when bundling is applied. + NotifyDataChannelsOfTransportCreated(); } void DataChannelController::TeardownDataChannelTransport_n() { @@ -200,17 +205,21 @@ void DataChannelController::OnTransportChanged( // There's a new data channel transport. This needs to be signaled to the // |sctp_data_channels_| so that they can reopen and reconnect. This is // necessary when bundling is applied. - data_channel_transport_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - for (const auto& channel : sctp_data_channels_) { - channel->OnTransportChannelCreated(); - } - }); + NotifyDataChannelsOfTransportCreated(); } } } +std::vector DataChannelController::GetDataChannelStats() + const { + RTC_DCHECK_RUN_ON(signaling_thread()); + std::vector stats; + stats.reserve(sctp_data_channels_.size()); + for (const auto& channel : sctp_data_channels_) + stats.push_back(channel->GetStats()); + return stats; +} + bool DataChannelController::HandleOpenMessage_s( const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& buffer) { @@ -463,12 +472,6 @@ DataChannelController::rtp_data_channels() const { return &rtp_data_channels_; } -const std::vector>* -DataChannelController::sctp_data_channels() const { - RTC_DCHECK_RUN_ON(signaling_thread()); - return &sctp_data_channels_; -} - void DataChannelController::UpdateClosingRtpDataChannels( const std::vector& active_channels, bool is_local_update) { @@ -549,6 +552,17 @@ bool DataChannelController::DataChannelSendData( return false; } +void DataChannelController::NotifyDataChannelsOfTransportCreated() { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + for (const auto& channel : sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } + }); +} + rtc::Thread* DataChannelController::network_thread() const { return pc_->network_thread(); } diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index 156bbe557b..c3e64aba95 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -64,6 +64,9 @@ class DataChannelController : public DataChannelProviderInterface, void OnTransportChanged( DataChannelTransportInterface* data_channel_transport); + // Called from PeerConnection::GetDataChannelStats on the signaling thread. + std::vector GetDataChannelStats() const; + // Creates channel and adds it to the collection of DataChannels that will // be offered in a SessionDescription. rtc::scoped_refptr InternalCreateDataChannel( @@ -101,8 +104,6 @@ class DataChannelController : public DataChannelProviderInterface, void set_data_channel_transport(DataChannelTransportInterface* transport); const std::map>* rtp_data_channels() const; - const std::vector>* sctp_data_channels() - const; sigslot::signal1& SignalDataChannelCreated() { RTC_DCHECK_RUN_ON(signaling_thread()); @@ -137,6 +138,10 @@ class DataChannelController : public DataChannelProviderInterface, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result); + // Called when all data channels need to be notified of a transport channel + // (calls OnTransportChannelCreated on the signaling thread). + void NotifyDataChannelsOfTransportCreated(); + rtc::Thread* network_thread() const; rtc::Thread* signaling_thread() const; diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 76f87f270e..9b3b760f21 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -6225,6 +6225,11 @@ cricket::IceConfig PeerConnection::ParseIceConfig( return ice_config; } +std::vector PeerConnection::GetDataChannelStats() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + return data_channel_controller_.GetDataChannelStats(); +} + absl::optional PeerConnection::sctp_transport_name() const { RTC_DCHECK_RUN_ON(signaling_thread()); if (sctp_mid_s_ && transport_controller_) { @@ -6705,12 +6710,6 @@ bool PeerConnection::CreateDataChannel(const std::string& mid) { } else { return false; } - - // All non-RTP data channels must initialize |sctp_data_channels_|. - for (const auto& channel : - *data_channel_controller_.sctp_data_channels()) { - channel->OnTransportChannelCreated(); - } return true; case cricket::DCT_RTP: default: diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 3bb962bb1d..4425e1c4d2 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -280,11 +280,7 @@ class PeerConnection : public PeerConnectionInternal, return data_channel_controller_.rtp_data_channel(); } - std::vector> sctp_data_channels() - const override { - RTC_DCHECK_RUN_ON(signaling_thread()); - return *data_channel_controller_.sctp_data_channels(); - } + std::vector GetDataChannelStats() const override; absl::optional sctp_transport_name() const override; diff --git a/pc/peer_connection_internal.h b/pc/peer_connection_internal.h index 52ffe85c2c..66d585b592 100644 --- a/pc/peer_connection_internal.h +++ b/pc/peer_connection_internal.h @@ -46,8 +46,11 @@ class PeerConnectionInternal : public PeerConnectionInterface { // Only valid when using deprecated RTP data channels. virtual cricket::RtpDataChannel* rtp_data_channel() const = 0; - virtual std::vector> sctp_data_channels() - const = 0; + // Call on the network thread to fetch stats for all the data channels. + // TODO(tommi): Make pure virtual after downstream updates. + virtual std::vector GetDataChannelStats() const { + return {}; + } virtual absl::optional sctp_transport_name() const = 0; diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc index 5d6792ceb3..f66be30dd3 100644 --- a/pc/rtc_stats_collector.cc +++ b/pc/rtc_stats_collector.cc @@ -1275,22 +1275,21 @@ void RTCStatsCollector::ProduceCodecStats_n( void RTCStatsCollector::ProduceDataChannelStats_s( int64_t timestamp_us, RTCStatsReport* report) const { - RTC_DCHECK(signaling_thread_->IsCurrent()); - for (const rtc::scoped_refptr& data_channel : - pc_->sctp_data_channels()) { + RTC_DCHECK_RUN_ON(signaling_thread_); + std::vector data_stats = pc_->GetDataChannelStats(); + for (const auto& stats : data_stats) { std::unique_ptr data_channel_stats( new RTCDataChannelStats( - "RTCDataChannel_" + rtc::ToString(data_channel->internal_id()), + "RTCDataChannel_" + rtc::ToString(stats.internal_id), timestamp_us)); - data_channel_stats->label = data_channel->label(); - data_channel_stats->protocol = data_channel->protocol(); - data_channel_stats->data_channel_identifier = data_channel->id(); - data_channel_stats->state = - DataStateToRTCDataChannelState(data_channel->state()); - data_channel_stats->messages_sent = data_channel->messages_sent(); - data_channel_stats->bytes_sent = data_channel->bytes_sent(); - data_channel_stats->messages_received = data_channel->messages_received(); - data_channel_stats->bytes_received = data_channel->bytes_received(); + data_channel_stats->label = std::move(stats.label); + data_channel_stats->protocol = std::move(stats.protocol); + data_channel_stats->data_channel_identifier = stats.id; + data_channel_stats->state = DataStateToRTCDataChannelState(stats.state); + data_channel_stats->messages_sent = stats.messages_sent; + data_channel_stats->bytes_sent = stats.bytes_sent; + data_channel_stats->messages_received = stats.messages_received; + data_channel_stats->bytes_received = stats.bytes_received; report->AddStats(std::move(data_channel_stats)); } } diff --git a/pc/stats_collector.cc b/pc/stats_collector.cc index 0509c6dc19..317e4443d4 100644 --- a/pc/stats_collector.cc +++ b/pc/stats_collector.cc @@ -1146,19 +1146,20 @@ void StatsCollector::ExtractDataInfo() { rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; - for (const auto& dc : pc_->sctp_data_channels()) { + std::vector data_stats = pc_->GetDataChannelStats(); + for (const auto& stats : data_stats) { StatsReport::Id id(StatsReport::NewTypedIntId( - StatsReport::kStatsReportTypeDataChannel, dc->id())); + StatsReport::kStatsReportTypeDataChannel, stats.id)); StatsReport* report = reports_.ReplaceOrAddNew(id); report->set_timestamp(stats_gathering_started_); - report->AddString(StatsReport::kStatsValueNameLabel, dc->label()); + report->AddString(StatsReport::kStatsValueNameLabel, stats.label); // Filter out the initial id (-1). - if (dc->id() >= 0) { - report->AddInt(StatsReport::kStatsValueNameDataChannelId, dc->id()); + if (stats.id >= 0) { + report->AddInt(StatsReport::kStatsValueNameDataChannelId, stats.id); } - report->AddString(StatsReport::kStatsValueNameProtocol, dc->protocol()); + report->AddString(StatsReport::kStatsValueNameProtocol, stats.protocol); report->AddString(StatsReport::kStatsValueNameState, - DataChannelInterface::DataStateString(dc->state())); + DataChannelInterface::DataStateString(stats.state)); } } diff --git a/pc/test/fake_peer_connection_base.h b/pc/test/fake_peer_connection_base.h index f4b27f03e1..e1663e6d9f 100644 --- a/pc/test/fake_peer_connection_base.h +++ b/pc/test/fake_peer_connection_base.h @@ -254,11 +254,6 @@ class FakePeerConnectionBase : public PeerConnectionInternal { cricket::RtpDataChannel* rtp_data_channel() const override { return nullptr; } - std::vector> sctp_data_channels() - const override { - return {}; - } - absl::optional sctp_transport_name() const override { return absl::nullopt; } diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index f459552170..175a1ede15 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -259,9 +259,12 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { return transceivers_; } - std::vector> sctp_data_channels() - const override { - return sctp_data_channels_; + std::vector GetDataChannelStats() const override { + RTC_DCHECK_RUN_ON(signaling_thread()); + std::vector stats; + for (const auto& channel : sctp_data_channels_) + stats.push_back(channel->GetStats()); + return stats; } cricket::CandidateStatsList GetPooledCandidateStats() const override {