/* * Copyright 2019 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ // This file contains the implementation of the class // webrtc::PeerConnection::DataChannelController. // // The intent is that this should be webrtc::DataChannelController, but // as a migration stage, it is simpler to have it as an inner class, // declared in the header file pc/peer_connection.h #include "pc/peer_connection.h" #include "pc/sctp_utils.h" namespace webrtc { bool PeerConnection::DataChannelController::SendData( const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) { // RTC_DCHECK_RUN_ON(signaling_thread()); if (data_channel_transport()) { SendDataParams send_params; send_params.type = ToWebrtcDataMessageType(params.type); send_params.ordered = params.ordered; if (params.max_rtx_count >= 0) { send_params.max_rtx_count = params.max_rtx_count; } else if (params.max_rtx_ms >= 0) { send_params.max_rtx_ms = params.max_rtx_ms; } RTCError error = network_thread()->Invoke( RTC_FROM_HERE, [this, params, send_params, payload] { return data_channel_transport()->SendData(params.sid, send_params, payload); }); if (error.ok()) { *result = cricket::SendDataResult::SDR_SUCCESS; return true; } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) { // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked. // TODO(mellem): Stop using RTCError here and get rid of the mapping. *result = cricket::SendDataResult::SDR_BLOCK; return false; } *result = cricket::SendDataResult::SDR_ERROR; return false; } else if (rtp_data_channel()) { return rtp_data_channel()->SendData(params, payload, result); } RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; return false; } bool PeerConnection::DataChannelController::ConnectDataChannel( DataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); if (!rtp_data_channel() && !data_channel_transport()) { // Don't log an error here, because DataChannels are expected to call // ConnectDataChannel in this state. It's the only way to initially tell // whether or not the underlying transport is ready. return false; } if (data_channel_transport()) { SignalDataChannelTransportWritable_s.connect(webrtc_data_channel, &DataChannel::OnChannelReady); SignalDataChannelTransportReceivedData_s.connect( webrtc_data_channel, &DataChannel::OnDataReceived); SignalDataChannelTransportChannelClosing_s.connect( webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); SignalDataChannelTransportChannelClosed_s.connect( webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); } if (rtp_data_channel()) { rtp_data_channel()->SignalReadyToSendData.connect( webrtc_data_channel, &DataChannel::OnChannelReady); rtp_data_channel()->SignalDataReceived.connect( webrtc_data_channel, &DataChannel::OnDataReceived); } return true; } void PeerConnection::DataChannelController::DisconnectDataChannel( DataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); if (!rtp_data_channel() && !data_channel_transport()) { RTC_LOG(LS_ERROR) << "DisconnectDataChannel called when rtp_data_channel_ and " "sctp_transport_ are NULL."; return; } if (data_channel_transport()) { SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); } if (rtp_data_channel()) { rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel); rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel); } } void PeerConnection::DataChannelController::AddSctpDataStream(int sid) { if (data_channel_transport()) { network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { if (data_channel_transport()) { data_channel_transport()->OpenChannel(sid); } }); } } void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) { if (data_channel_transport()) { network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { if (data_channel_transport()) { data_channel_transport()->CloseChannel(sid); } }); } } bool PeerConnection::DataChannelController::ReadyToSendData() const { RTC_DCHECK_RUN_ON(signaling_thread()); return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) || (data_channel_transport() && data_channel_transport_ready_to_send_); } void PeerConnection::DataChannelController::OnDataReceived( int channel_id, DataMessageType type, const rtc::CopyOnWriteBuffer& buffer) { RTC_DCHECK_RUN_ON(network_thread()); cricket::ReceiveDataParams params; params.sid = channel_id; params.type = ToCricketDataMessageType(type); data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { RTC_DCHECK_RUN_ON(signaling_thread()); if (!HandleOpenMessage_s(params, buffer)) { SignalDataChannelTransportReceivedData_s(params, buffer); } }); } void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this, channel_id] { RTC_DCHECK_RUN_ON(signaling_thread()); SignalDataChannelTransportChannelClosing_s(channel_id); }); } void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this, channel_id] { RTC_DCHECK_RUN_ON(signaling_thread()); SignalDataChannelTransportChannelClosed_s(channel_id); }); } void PeerConnection::DataChannelController::OnReadyToSend() { RTC_DCHECK_RUN_ON(network_thread()); data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this] { RTC_DCHECK_RUN_ON(signaling_thread()); data_channel_transport_ready_to_send_ = true; SignalDataChannelTransportWritable_s( data_channel_transport_ready_to_send_); }); } void PeerConnection::DataChannelController::SetupDataChannelTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); data_channel_transport_invoker_ = std::make_unique(); } void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); data_channel_transport_invoker_ = nullptr; if (data_channel_transport()) { data_channel_transport()->SetDataSink(nullptr); } set_data_channel_transport(nullptr); } void PeerConnection::DataChannelController::OnTransportChanged( DataChannelTransportInterface* new_data_channel_transport) { RTC_DCHECK_RUN_ON(network_thread()); if (data_channel_transport() && data_channel_transport() != new_data_channel_transport) { // Changed which data channel transport is used for |sctp_mid_| (eg. now // it's bundled). data_channel_transport()->SetDataSink(nullptr); set_data_channel_transport(new_data_channel_transport); if (new_data_channel_transport) { new_data_channel_transport->SetDataSink(this); // There's a new data channel transport. This needs to be signaled to the // |sctp_data_channels_| so that they can reopen and reconnect. This is // necessary when bundling is applied. data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this] { RTC_DCHECK_RUN_ON(pc_->signaling_thread()); for (auto channel : pc_->sctp_data_channels_) { channel->OnTransportChannelCreated(); } }); } } } bool PeerConnection::DataChannelController::HandleOpenMessage_s( const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& buffer) { if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) { // Received OPEN message; parse and signal that a new data channel should // be created. std::string label; InternalDataChannelInit config; config.id = params.ssrc; if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc " << params.ssrc; return true; } config.open_handshake_role = InternalDataChannelInit::kAcker; OnDataChannelOpenMessage(label, config); return true; } return false; } void PeerConnection::DataChannelController::OnDataChannelOpenMessage( const std::string& label, const InternalDataChannelInit& config) { rtc::scoped_refptr channel( InternalCreateDataChannel(label, &config)); if (!channel.get()) { RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."; return; } rtc::scoped_refptr proxy_channel = DataChannelProxy::Create(signaling_thread(), channel); { RTC_DCHECK_RUN_ON(pc_->signaling_thread()); pc_->Observer()->OnDataChannel(std::move(proxy_channel)); pc_->NoteUsageEvent(UsageEvent::DATA_ADDED); } } rtc::scoped_refptr PeerConnection::DataChannelController::InternalCreateDataChannel( const std::string& label, const InternalDataChannelInit* config) { RTC_DCHECK_RUN_ON(pc_->signaling_thread()); if (pc_->IsClosed()) { return nullptr; } if (pc_->data_channel_type() == cricket::DCT_NONE) { RTC_LOG(LS_ERROR) << "InternalCreateDataChannel: Data is not supported in this call."; return nullptr; } InternalDataChannelInit new_config = config ? (*config) : InternalDataChannelInit(); if (DataChannel::IsSctpLike(pc_->data_channel_type_)) { if (new_config.id < 0) { rtc::SSLRole role; if ((pc_->GetSctpSslRole(&role)) && !sid_allocator_.AllocateSid(role, &new_config.id)) { RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel."; return nullptr; } } else if (!sid_allocator_.ReserveSid(new_config.id)) { RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel " "because the id is already in use or out of range."; return nullptr; } } rtc::scoped_refptr channel( DataChannel::Create(this, pc_->data_channel_type(), label, new_config)); if (!channel) { sid_allocator_.ReleaseSid(new_config.id); return nullptr; } if (channel->data_channel_type() == cricket::DCT_RTP) { if (pc_->rtp_data_channels_.find(channel->label()) != pc_->rtp_data_channels_.end()) { RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label() << " already exists."; return nullptr; } pc_->rtp_data_channels_[channel->label()] = channel; } else { RTC_DCHECK(DataChannel::IsSctpLike(pc_->data_channel_type_)); pc_->sctp_data_channels_.push_back(channel); channel->SignalClosed.connect(pc_, &PeerConnection::OnSctpDataChannelClosed); } pc_->SignalDataChannelCreated_(channel.get()); return channel; } void PeerConnection::DataChannelController::AllocateSctpSids( rtc::SSLRole role) { RTC_DCHECK_RUN_ON(pc_->signaling_thread()); std::vector> channels_to_close; for (const auto& channel : pc_->sctp_data_channels_) { if (channel->id() < 0) { int sid; if (!sid_allocator_.AllocateSid(role, &sid)) { RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel."; channels_to_close.push_back(channel); continue; } channel->SetSctpSid(sid); } } // Since closing modifies the list of channels, we have to do the actual // closing outside the loop. for (const auto& channel : channels_to_close) { channel->CloseAbruptly(); } } void PeerConnection::DataChannelController::OnSctpDataChannelClosed( DataChannel* channel) { RTC_DCHECK_RUN_ON(pc_->signaling_thread()); for (auto it = pc_->sctp_data_channels_.begin(); it != pc_->sctp_data_channels_.end(); ++it) { if (it->get() == channel) { if (channel->id() >= 0) { // After the closing procedure is done, it's safe to use this ID for // another data channel. sid_allocator_.ReleaseSid(channel->id()); } // Since this method is triggered by a signal from the DataChannel, // we can't free it directly here; we need to free it asynchronously. pc_->sctp_data_channels_to_free_.push_back(*it); pc_->sctp_data_channels_.erase(it); pc_->SignalFreeDataChannels(); return; } } } } // namespace webrtc