Add GetSctpStats to PeerConnectionInternal, remove sctp_data_channels()

This removes code from DataChannelController that exposes
an internal vector of data channels and puts the onus of
returning stats for a data channel, on the data channel
object itself. This will come in handy as we make threading
changes to the data channel object.

Change-Id: Ie164cc5823cd5f9782fc5c9a63aa4c76b8229639
Bug: webrtc:11547, webrtc:11687
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177244
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31533}
This commit is contained in:
Tomas Gunnarsson 2020-06-16 16:54:10 +02:00 committed by Commit Bot
parent 103a73ea1f
commit 2e94de596e
11 changed files with 95 additions and 57 deletions

View File

@ -315,7 +315,6 @@ bool DataChannel::Send(const DataBuffer& buffer) {
// thread. Bring buffer management etc to the network thread and keep the
// operational state management on the signaling thread.
buffered_amount_ += buffer.size();
if (state_ != kOpen) {
return false;
}
@ -327,6 +326,8 @@ bool DataChannel::Send(const DataBuffer& buffer) {
return true;
}
buffered_amount_ += buffer.size();
// If the queue is non-empty, we're waiting for SignalReadyToSend,
// so just add to the end of the queue and keep waiting.
if (!queued_send_data_.Empty()) {
@ -429,6 +430,14 @@ void DataChannel::OnTransportChannelClosed() {
CloseAbruptlyWithError(std::move(error));
}
DataChannel::Stats DataChannel::GetStats() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
Stats stats{internal_id_, id(), label(),
protocol(), state(), messages_sent(),
messages_received(), bytes_sent(), bytes_received()};
return stats;
}
// The remote peer request that this channel shall be closed.
void DataChannel::RemotePeerRequestClose() {
RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);

View File

@ -113,6 +113,18 @@ class SctpSidAllocator {
// callback and transition to kClosed.
class DataChannel : public DataChannelInterface, public sigslot::has_slots<> {
public:
struct Stats {
int internal_id;
int id;
std::string label;
std::string protocol;
DataState state;
uint32_t messages_sent;
uint32_t messages_received;
uint64_t bytes_sent;
uint64_t bytes_received;
};
static rtc::scoped_refptr<DataChannel> Create(
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
@ -205,6 +217,8 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> {
// to kClosed.
void OnTransportChannelClosed();
Stats GetStats() const;
/*******************************************
* The following methods are for RTP only. *
*******************************************/

View File

@ -174,6 +174,11 @@ void DataChannelController::OnTransportClosed() {
void DataChannelController::SetupDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>();
// There's a new data channel transport. This needs to be signaled to the
// |sctp_data_channels_| so that they can reopen and reconnect. This is
// necessary when bundling is applied.
NotifyDataChannelsOfTransportCreated();
}
void DataChannelController::TeardownDataChannelTransport_n() {
@ -200,17 +205,21 @@ void DataChannelController::OnTransportChanged(
// There's a new data channel transport. This needs to be signaled to the
// |sctp_data_channels_| so that they can reopen and reconnect. This is
// necessary when bundling is applied.
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
for (const auto& channel : sctp_data_channels_) {
channel->OnTransportChannelCreated();
}
});
NotifyDataChannelsOfTransportCreated();
}
}
}
std::vector<DataChannel::Stats> DataChannelController::GetDataChannelStats()
const {
RTC_DCHECK_RUN_ON(signaling_thread());
std::vector<DataChannel::Stats> stats;
stats.reserve(sctp_data_channels_.size());
for (const auto& channel : sctp_data_channels_)
stats.push_back(channel->GetStats());
return stats;
}
bool DataChannelController::HandleOpenMessage_s(
const cricket::ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& buffer) {
@ -463,12 +472,6 @@ DataChannelController::rtp_data_channels() const {
return &rtp_data_channels_;
}
const std::vector<rtc::scoped_refptr<DataChannel>>*
DataChannelController::sctp_data_channels() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return &sctp_data_channels_;
}
void DataChannelController::UpdateClosingRtpDataChannels(
const std::vector<std::string>& active_channels,
bool is_local_update) {
@ -549,6 +552,17 @@ bool DataChannelController::DataChannelSendData(
return false;
}
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
for (const auto& channel : sctp_data_channels_) {
channel->OnTransportChannelCreated();
}
});
}
rtc::Thread* DataChannelController::network_thread() const {
return pc_->network_thread();
}

View File

@ -64,6 +64,9 @@ class DataChannelController : public DataChannelProviderInterface,
void OnTransportChanged(
DataChannelTransportInterface* data_channel_transport);
// Called from PeerConnection::GetDataChannelStats on the signaling thread.
std::vector<DataChannel::Stats> GetDataChannelStats() const;
// Creates channel and adds it to the collection of DataChannels that will
// be offered in a SessionDescription.
rtc::scoped_refptr<DataChannel> InternalCreateDataChannel(
@ -101,8 +104,6 @@ class DataChannelController : public DataChannelProviderInterface,
void set_data_channel_transport(DataChannelTransportInterface* transport);
const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
rtp_data_channels() const;
const std::vector<rtc::scoped_refptr<DataChannel>>* sctp_data_channels()
const;
sigslot::signal1<DataChannel*>& SignalDataChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread());
@ -137,6 +138,10 @@ class DataChannelController : public DataChannelProviderInterface,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result);
// Called when all data channels need to be notified of a transport channel
// (calls OnTransportChannelCreated on the signaling thread).
void NotifyDataChannelsOfTransportCreated();
rtc::Thread* network_thread() const;
rtc::Thread* signaling_thread() const;

View File

@ -6225,6 +6225,11 @@ cricket::IceConfig PeerConnection::ParseIceConfig(
return ice_config;
}
std::vector<DataChannel::Stats> PeerConnection::GetDataChannelStats() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return data_channel_controller_.GetDataChannelStats();
}
absl::optional<std::string> PeerConnection::sctp_transport_name() const {
RTC_DCHECK_RUN_ON(signaling_thread());
if (sctp_mid_s_ && transport_controller_) {
@ -6705,12 +6710,6 @@ bool PeerConnection::CreateDataChannel(const std::string& mid) {
} else {
return false;
}
// All non-RTP data channels must initialize |sctp_data_channels_|.
for (const auto& channel :
*data_channel_controller_.sctp_data_channels()) {
channel->OnTransportChannelCreated();
}
return true;
case cricket::DCT_RTP:
default:

View File

@ -280,11 +280,7 @@ class PeerConnection : public PeerConnectionInternal,
return data_channel_controller_.rtp_data_channel();
}
std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
const override {
RTC_DCHECK_RUN_ON(signaling_thread());
return *data_channel_controller_.sctp_data_channels();
}
std::vector<DataChannel::Stats> GetDataChannelStats() const override;
absl::optional<std::string> sctp_transport_name() const override;

View File

@ -46,8 +46,11 @@ class PeerConnectionInternal : public PeerConnectionInterface {
// Only valid when using deprecated RTP data channels.
virtual cricket::RtpDataChannel* rtp_data_channel() const = 0;
virtual std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
const = 0;
// Call on the network thread to fetch stats for all the data channels.
// TODO(tommi): Make pure virtual after downstream updates.
virtual std::vector<DataChannel::Stats> GetDataChannelStats() const {
return {};
}
virtual absl::optional<std::string> sctp_transport_name() const = 0;

View File

@ -1275,22 +1275,21 @@ void RTCStatsCollector::ProduceCodecStats_n(
void RTCStatsCollector::ProduceDataChannelStats_s(
int64_t timestamp_us,
RTCStatsReport* report) const {
RTC_DCHECK(signaling_thread_->IsCurrent());
for (const rtc::scoped_refptr<DataChannel>& data_channel :
pc_->sctp_data_channels()) {
RTC_DCHECK_RUN_ON(signaling_thread_);
std::vector<DataChannel::Stats> data_stats = pc_->GetDataChannelStats();
for (const auto& stats : data_stats) {
std::unique_ptr<RTCDataChannelStats> data_channel_stats(
new RTCDataChannelStats(
"RTCDataChannel_" + rtc::ToString(data_channel->internal_id()),
"RTCDataChannel_" + rtc::ToString(stats.internal_id),
timestamp_us));
data_channel_stats->label = data_channel->label();
data_channel_stats->protocol = data_channel->protocol();
data_channel_stats->data_channel_identifier = data_channel->id();
data_channel_stats->state =
DataStateToRTCDataChannelState(data_channel->state());
data_channel_stats->messages_sent = data_channel->messages_sent();
data_channel_stats->bytes_sent = data_channel->bytes_sent();
data_channel_stats->messages_received = data_channel->messages_received();
data_channel_stats->bytes_received = data_channel->bytes_received();
data_channel_stats->label = std::move(stats.label);
data_channel_stats->protocol = std::move(stats.protocol);
data_channel_stats->data_channel_identifier = stats.id;
data_channel_stats->state = DataStateToRTCDataChannelState(stats.state);
data_channel_stats->messages_sent = stats.messages_sent;
data_channel_stats->bytes_sent = stats.bytes_sent;
data_channel_stats->messages_received = stats.messages_received;
data_channel_stats->bytes_received = stats.bytes_received;
report->AddStats(std::move(data_channel_stats));
}
}

View File

@ -1146,19 +1146,20 @@ void StatsCollector::ExtractDataInfo() {
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
for (const auto& dc : pc_->sctp_data_channels()) {
std::vector<DataChannel::Stats> data_stats = pc_->GetDataChannelStats();
for (const auto& stats : data_stats) {
StatsReport::Id id(StatsReport::NewTypedIntId(
StatsReport::kStatsReportTypeDataChannel, dc->id()));
StatsReport::kStatsReportTypeDataChannel, stats.id));
StatsReport* report = reports_.ReplaceOrAddNew(id);
report->set_timestamp(stats_gathering_started_);
report->AddString(StatsReport::kStatsValueNameLabel, dc->label());
report->AddString(StatsReport::kStatsValueNameLabel, stats.label);
// Filter out the initial id (-1).
if (dc->id() >= 0) {
report->AddInt(StatsReport::kStatsValueNameDataChannelId, dc->id());
if (stats.id >= 0) {
report->AddInt(StatsReport::kStatsValueNameDataChannelId, stats.id);
}
report->AddString(StatsReport::kStatsValueNameProtocol, dc->protocol());
report->AddString(StatsReport::kStatsValueNameProtocol, stats.protocol);
report->AddString(StatsReport::kStatsValueNameState,
DataChannelInterface::DataStateString(dc->state()));
DataChannelInterface::DataStateString(stats.state));
}
}

View File

@ -254,11 +254,6 @@ class FakePeerConnectionBase : public PeerConnectionInternal {
cricket::RtpDataChannel* rtp_data_channel() const override { return nullptr; }
std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
const override {
return {};
}
absl::optional<std::string> sctp_transport_name() const override {
return absl::nullopt;
}

View File

@ -259,9 +259,12 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase {
return transceivers_;
}
std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
const override {
return sctp_data_channels_;
std::vector<DataChannel::Stats> GetDataChannelStats() const override {
RTC_DCHECK_RUN_ON(signaling_thread());
std::vector<DataChannel::Stats> stats;
for (const auto& channel : sctp_data_channels_)
stats.push_back(channel->GetStats());
return stats;
}
cricket::CandidateStatsList GetPooledCandidateStats() const override {