diff --git a/api/datagram_transport_interface.h b/api/datagram_transport_interface.h index 6205f0043f..d84090a0c4 100644 --- a/api/datagram_transport_interface.h +++ b/api/datagram_transport_interface.h @@ -98,6 +98,9 @@ class DatagramTransportInterface { // Datagrams larger than GetLargestDatagramSize() will fail and return error. // // Datagrams are sent in FIFO order. + // + // |datagram_id| is only used in ACK/LOST notifications in + // DatagramSinkInterface and does not need to be unique. virtual RTCError SendDatagram(rtc::ArrayView data, DatagramId datagram_id) = 0; diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 7ecc259bbe..8c54f7d5d9 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -97,6 +97,7 @@ rtc_static_library("rtc_pc_base") { "../media:rtc_h264_profile_id", "../media:rtc_media_base", "../media:rtc_media_config", + "../modules/rtp_rtcp:rtp_rtcp", "../modules/rtp_rtcp:rtp_rtcp_format", "../p2p:rtc_p2p", "../rtc_base", diff --git a/pc/datagram_dtls_adaptor.cc b/pc/datagram_dtls_adaptor.cc index 302a7425ce..0f20bf50f4 100644 --- a/pc/datagram_dtls_adaptor.cc +++ b/pc/datagram_dtls_adaptor.cc @@ -16,15 +16,20 @@ #include "absl/memory/memory.h" #include "absl/strings/string_view.h" +#include "absl/types/optional.h" #include "api/rtc_error.h" #include "logging/rtc_event_log/events/rtc_event_dtls_transport_state.h" #include "logging/rtc_event_log/events/rtc_event_dtls_writable_state.h" #include "logging/rtc_event_log/rtc_event_log.h" +#include "modules/rtp_rtcp/include/rtp_header_parser.h" +#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" +#include "modules/rtp_rtcp/source/rtp_packet.h" #include "p2p/base/dtls_transport_internal.h" #include "p2p/base/packet_transport_internal.h" #include "rtc_base/buffer.h" #include "rtc_base/checks.h" #include "rtc_base/dscp.h" +#include "rtc_base/flags.h" #include "rtc_base/logging.h" #include "rtc_base/message_queue.h" #include "rtc_base/rtc_certificate.h" @@ -42,7 +47,18 @@ constexpr bool kBypassDatagramDtlsTestOnly = false; namespace cricket { +// For RTCP packets we are not storing SentPacketInfo and not interested in +// Acks, so we will use special datagram id for RTCP packets to filter out +// datagram notifications coming from RTCP packets. +constexpr webrtc::DatagramId kRtcpDatagramId = -1; + +// Maximum packet size of RTCP feedback packet for allocation. We re-create RTCP +// feedback packets when we get ACK notifications from datagram transport. Our +// rtcp feedback packets contain only 1 ACK, so they are much smaller than 1250. +constexpr size_t kMaxRtcpFeedbackPacketSize = 1250; + DatagramDtlsAdaptor::DatagramDtlsAdaptor( + const std::vector& rtp_header_extensions, IceTransportInternal* ice_transport, webrtc::DatagramTransportInterface* datagram_transport, const webrtc::CryptoOptions& crypto_options, @@ -51,6 +67,20 @@ DatagramDtlsAdaptor::DatagramDtlsAdaptor( ice_transport_(ice_transport), datagram_transport_(datagram_transport), event_log_(event_log) { + // Save extension map for parsing RTP packets (we only need transport + // sequence numbers). + const webrtc::RtpExtension* transport_sequence_number_extension = + webrtc::RtpExtension::FindHeaderExtensionByUri( + rtp_header_extensions, webrtc::TransportSequenceNumber::kUri); + + if (transport_sequence_number_extension != nullptr) { + rtp_header_extension_map_.Register( + transport_sequence_number_extension->id); + } else { + RTC_LOG(LS_ERROR) << "Transport sequence numbers are not supported in " + "datagram transport connection"; + } + RTC_DCHECK(ice_transport_); RTC_DCHECK(datagram_transport_); ConnectToIceTransport(); @@ -63,17 +93,14 @@ void DatagramDtlsAdaptor::ConnectToIceTransport() { this, &DatagramDtlsAdaptor::OnReadyToSend); ice_transport_->SignalReceivingState.connect( this, &DatagramDtlsAdaptor::OnReceivingState); - // Datagram transport does not propagate network route change. ice_transport_->SignalNetworkRouteChanged.connect( this, &DatagramDtlsAdaptor::OnNetworkRouteChanged); - if (kBypassDatagramDtlsTestOnly) { // In bypass mode we have to subscribe to ICE read and sent events. // Test only case to use ICE directly instead of data transport. ice_transport_->SignalReadPacket.connect( this, &DatagramDtlsAdaptor::OnReadPacket); - ice_transport_->SignalSentPacket.connect( this, &DatagramDtlsAdaptor::OnSentPacket); } else { @@ -97,20 +124,77 @@ int DatagramDtlsAdaptor::SendPacket(const char* data, size_t len, const rtc::PacketOptions& options, int flags) { + RTC_DCHECK_RUN_ON(&thread_checker_); + // TODO(sukhanov): Handle options and flags. if (kBypassDatagramDtlsTestOnly) { // In bypass mode sent directly to ICE. return ice_transport_->SendPacket(data, len, options); } - // Send datagram with id equal to options.packet_id, so we get it back - // in DatagramDtlsAdaptor::OnDatagramSent() and propagate notification - // up. - webrtc::RTCError error = datagram_transport_->SendDatagram( - rtc::MakeArrayView(reinterpret_cast(data), len), - /*datagram_id=*/options.packet_id); + rtc::ArrayView original_data( + reinterpret_cast(data), len); + // RTCP packets are sent as is and they do not require datagram_id. + if (webrtc::RtpHeaderParser::IsRtcp(original_data.data(), + original_data.size())) { + return SendDatagram(original_data, /*datagram_id=*/kRtcpDatagramId); + } - return (error.ok() ? len : -1); + // Assign and increment datagram_id. + webrtc::DatagramId datagram_id = current_datagram_id_; + current_datagram_id_++; + + // Parse RTP packet. + webrtc::RtpPacket rtp_packet(&rtp_header_extension_map_); + if (!rtp_packet.Parse(original_data)) { + RTC_NOTREACHED() << "Failed to parse outgoing RtpPacket, len=" << len + << ", options.packet_id=" << options.packet_id; + return -1; + } + + // Try to get transport sequence number. + uint16_t transport_senquence_number; + if (!rtp_packet.GetExtension( + &transport_senquence_number)) { + // Save packet info without transport sequence number. + sent_rtp_packet_map_[datagram_id] = SentPacketInfo( + rtp_packet.Ssrc(), + /*transport_sequence_number=*/absl::nullopt, options.packet_id); + + RTC_LOG(LS_VERBOSE) + << "Sending rtp packet without transport sequence number, packet=" + << rtp_packet.ToString(); + + return SendDatagram(original_data, datagram_id); + } + + // Save packet info with sequence number. + sent_rtp_packet_map_[datagram_id] = SentPacketInfo( + rtp_packet.Ssrc(), transport_senquence_number, options.packet_id); + + // Since datagram transport provides feedback and timestamps, we do not need + // to send transport sequence number, so we remove it from RTP packet. Later + // when we get Ack for sent datagram, we will re-create RTCP feedback packet. + if (!rtp_packet.RemoveExtension(webrtc::TransportSequenceNumber::kId)) { + RTC_NOTREACHED() << "Failed to remove transport sequence number, packet=" + << rtp_packet.ToString(); + return -1; + } + + RTC_LOG(LS_VERBOSE) << "Removed transport_senquence_number=" + << transport_senquence_number + << " from packet=" << rtp_packet.ToString() + << ", saved bytes=" << len - rtp_packet.size(); + + return SendDatagram( + rtc::ArrayView(rtp_packet.data(), rtp_packet.size()), + datagram_id); +} + +int DatagramDtlsAdaptor::SendDatagram(rtc::ArrayView data, + webrtc::DatagramId datagram_id) { + webrtc::RTCError error = datagram_transport_->SendDatagram(data, datagram_id); + return (error.ok() ? data.size() : -1); } void DatagramDtlsAdaptor::OnReadPacket(rtc::PacketTransportInternal* transport, @@ -145,17 +229,134 @@ void DatagramDtlsAdaptor::OnDatagramReceived( } void DatagramDtlsAdaptor::OnDatagramSent(webrtc::DatagramId datagram_id) { - // When we called DatagramTransportInterface::SendDatagram, we passed - // packet_id as datagram_id, so we simply need to set it in sent_packet - // and propagate notification up the stack. + RTC_DCHECK_RUN_ON(&thread_checker_); + + // Sent notifications are not needed for RTCP packets. + if (datagram_id == kRtcpDatagramId) { + return; + } + + // Find packet_id and propagate OnPacketSent notification. + const auto& it = sent_rtp_packet_map_.find(datagram_id); + if (it == sent_rtp_packet_map_.end()) { + RTC_NOTREACHED() << "Did not find sent packet info for sent datagram_id=" + << datagram_id; + return; + } // Also see how DatagramDtlsAdaptor::OnSentPacket handles OnSentPacket // notification from ICE in bypass mode. - rtc::SentPacket sent_packet(/*packet_id=*/datagram_id, rtc::TimeMillis()); + rtc::SentPacket sent_packet(/*packet_id=*/it->second.packet_id, + rtc::TimeMillis()); PropagateOnSentNotification(sent_packet); } +bool DatagramDtlsAdaptor::GetAndRemoveSentPacketInfo( + webrtc::DatagramId datagram_id, + SentPacketInfo* sent_packet_info) { + RTC_DCHECK_NE(datagram_id, kRtcpDatagramId); + RTC_CHECK(sent_packet_info != nullptr); + + const auto& it = sent_rtp_packet_map_.find(datagram_id); + if (it == sent_rtp_packet_map_.end()) { + return false; + } + + *sent_packet_info = it->second; + sent_rtp_packet_map_.erase(it); + return true; +} + +void DatagramDtlsAdaptor::OnDatagramAcked(const webrtc::DatagramAck& ack) { + RTC_DCHECK_RUN_ON(&thread_checker_); + + // ACK notifications are not needed for RTCP packets and RTCP packets are not + // stored in SentPacketInfo map. + if (ack.datagram_id == kRtcpDatagramId) { + return; + } + + SentPacketInfo sent_packet_info; + if (!GetAndRemoveSentPacketInfo(ack.datagram_id, &sent_packet_info)) { + // TODO(sukhanov): If OnDatagramAck() can come after OnDatagramLost(), + // datagram_id is already deleted and we may need to relax the CHECK below. + // It's probably OK to ignore such datagrams, because it's been a few RTTs + // anyway since they were sent. + RTC_NOTREACHED() << "Did not find sent packet info for datagram_id=" + << ack.datagram_id; + return; + } + + RTC_LOG(LS_VERBOSE) << "Datagram acked, datagram_id=" << ack.datagram_id + << ", transport_sequence_number=" + << sent_packet_info.transport_sequence_number.value_or(-1) + << ", ssrc=" << sent_packet_info.ssrc + << ", receive_timestamp_ms=" + << ack.receive_timestamp.ms(); + + // If transport sequence number was not present in RTP packet, we do not need + // to propagate RTCP feedback. + if (!sent_packet_info.transport_sequence_number) { + return; + } + + // TODO(sukhanov): We noticed that datagram transport implementations can + // return zero timestamps in the middle of the call. This is workaround to + // avoid propagating zero timestamps, but we need to understand why we have + // them in the first place. + int64_t receive_timestamp_us = ack.receive_timestamp.us(); + + if (receive_timestamp_us == 0) { + receive_timestamp_us = previous_nonzero_timestamp_us_; + } else { + previous_nonzero_timestamp_us_ = receive_timestamp_us; + } + + // Recreate RTCP feedback packet. + webrtc::rtcp::TransportFeedback feedback_packet; + feedback_packet.SetMediaSsrc(sent_packet_info.ssrc); + + const uint16_t transport_sequence_number = + sent_packet_info.transport_sequence_number.value(); + + feedback_packet.SetBase(transport_sequence_number, receive_timestamp_us); + feedback_packet.AddReceivedPacket(transport_sequence_number, + receive_timestamp_us); + + rtc::Buffer buffer(kMaxRtcpFeedbackPacketSize); + size_t index = 0; + if (!feedback_packet.Create(buffer.data(), &index, buffer.capacity(), + nullptr)) { + RTC_NOTREACHED() << "Failed to create RTCP feedback packet"; + return; + } + + RTC_CHECK_GT(index, 0); + RTC_CHECK_LE(index, kMaxRtcpFeedbackPacketSize); + + // Propagage created RTCP packet as normal incoming packet. + buffer.SetSize(index); + PropagateReadPacket(buffer, /*packet_time_us=*/-1); +} + +void DatagramDtlsAdaptor::OnDatagramLost(webrtc::DatagramId datagram_id) { + RTC_DCHECK_RUN_ON(&thread_checker_); + + // RTCP packets are not stored in SentPacketInfo map. + if (datagram_id == kRtcpDatagramId) { + return; + } + + RTC_LOG(LS_INFO) << "Datagram lost, datagram_id=" << datagram_id; + + SentPacketInfo sent_packet_info; + if (!GetAndRemoveSentPacketInfo(datagram_id, &sent_packet_info)) { + RTC_NOTREACHED() << "Did not find sent packet info for lost datagram_id=" + << datagram_id; + } +} + void DatagramDtlsAdaptor::OnSentPacket(rtc::PacketTransportInternal* transport, const rtc::SentPacket& sent_packet) { // Only used in bypass mode. @@ -256,7 +457,7 @@ void DatagramDtlsAdaptor::OnWritableState( rtc::PacketTransportInternal* transport) { RTC_DCHECK_RUN_ON(&thread_checker_); RTC_DCHECK(transport == ice_transport_); - RTC_LOG(LS_VERBOSE) << ": ice_transport writable state changed to " + RTC_LOG(LS_VERBOSE) << "ice_transport writable state changed to " << ice_transport_->writable(); if (kBypassDatagramDtlsTestOnly) { diff --git a/pc/datagram_dtls_adaptor.h b/pc/datagram_dtls_adaptor.h index e027c7635b..c4b15e0081 100644 --- a/pc/datagram_dtls_adaptor.h +++ b/pc/datagram_dtls_adaptor.h @@ -11,12 +11,15 @@ #ifndef PC_DATAGRAM_DTLS_ADAPTOR_H_ #define PC_DATAGRAM_DTLS_ADAPTOR_H_ +#include #include #include #include #include "api/crypto/crypto_options.h" #include "api/datagram_transport_interface.h" +#include "modules/rtp_rtcp/include/rtp_header_extension_map.h" +#include "modules/rtp_rtcp/source/rtp_header_extensions.h" #include "p2p/base/dtls_transport_internal.h" #include "p2p/base/ice_transport_internal.h" #include "p2p/base/packet_transport_internal.h" @@ -42,10 +45,12 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal, // TODO(sukhanov): Taking crypto options, because DtlsTransportInternal // has a virtual getter crypto_options(). Consider removing getter and // removing crypto_options from DatagramDtlsAdaptor. - DatagramDtlsAdaptor(IceTransportInternal* ice_transport, - webrtc::DatagramTransportInterface* datagram_transport, - const webrtc::CryptoOptions& crypto_options, - webrtc::RtcEventLog* event_log); + DatagramDtlsAdaptor( + const std::vector& rtp_header_extensions, + IceTransportInternal* ice_transport, + webrtc::DatagramTransportInterface* datagram_transport, + const webrtc::CryptoOptions& crypto_options, + webrtc::RtcEventLog* event_log); ~DatagramDtlsAdaptor() override; @@ -60,6 +65,10 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal, void OnDatagramSent(webrtc::DatagramId datagram_id) override; + void OnDatagramAcked(const webrtc::DatagramAck& ack) override; + + void OnDatagramLost(webrtc::DatagramId datagram_id) override; + void OnStateChanged(webrtc::MediaTransportState state) override; // ===================================================== @@ -94,6 +103,37 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal, bool receiving() const override; private: + // Stored for each sent RTP packet. + struct SentPacketInfo { + SentPacketInfo(uint32_t ssrc, + absl::optional transport_sequence_number, + int64_t packet_id) + : ssrc(ssrc), + transport_sequence_number(transport_sequence_number), + packet_id(packet_id) {} + + SentPacketInfo() = default; + + uint32_t ssrc = 0; + + // Transport sequence number (if it was provided in outgoing RTP packet). + // It is used to re-create RTCP feedback packets from datagram ACKs. + absl::optional transport_sequence_number = 0; + + // Packet id from rtc::PacketOptions. It is required to propagage sent + // notification up the stack (SignalSentPacket). + int64_t packet_id = 0; + }; + + // Finds SentPacketInfo for given |datagram_id| and removes map entry. + // Returns false if entry was not found. + bool GetAndRemoveSentPacketInfo(webrtc::DatagramId datagram_id, + SentPacketInfo* sent_packet_info); + + // Sends datagram to datagram_transport. + int SendDatagram(rtc::ArrayView data, + webrtc::DatagramId datagram_id); + void set_receiving(bool receiving); void set_writable(bool writable); void set_dtls_state(DtlsTransportState state); @@ -145,6 +185,22 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal, DtlsTransportState dtls_state_ = DTLS_TRANSPORT_NEW; webrtc::RtcEventLog* const event_log_; + + // Extension map for parsing transport sequence numbers. + webrtc::RtpHeaderExtensionMap rtp_header_extension_map_; + + // Keeps information about sent RTP packet until they are Acked or Lost. + std::map sent_rtp_packet_map_; + + // Current datagram_id, incremented after each sent RTP packets. + // Datagram id is passed to datagram transport when we send datagram and we + // get it back in notifications about Sent, Acked and Lost datagrams. + int64_t current_datagram_id_ = 0; + + // TODO(sukhanov): Previous nonzero timestamp is required for workaround for + // zero timestamps received, which sometimes are received from datagram + // transport. Investigate if we can eliminate zero timestamps. + int64_t previous_nonzero_timestamp_us_ = 0; }; } // namespace cricket diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index 02006082ea..948b9fcfab 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -474,6 +474,7 @@ JsepTransportController::CreateIceTransport(const std::string transport_name, std::unique_ptr JsepTransportController::CreateDtlsTransport( + const cricket::ContentInfo& content_info, cricket::IceTransportInternal* ice, DatagramTransportInterface* datagram_transport) { RTC_DCHECK(network_thread_->IsCurrent()); @@ -485,7 +486,8 @@ JsepTransportController::CreateDtlsTransport( // Create DTLS wrapper around DatagramTransportInterface. dtls = absl::make_unique( - ice, datagram_transport, config_.crypto_options, config_.event_log); + content_info.media_description()->rtp_header_extensions(), ice, + datagram_transport, config_.crypto_options, config_.event_log); } else if (config_.media_transport_factory && config_.use_media_transport_for_media && config_.use_media_transport_for_data_channels) { @@ -1164,11 +1166,11 @@ RTCError JsepTransportController::MaybeCreateJsepTransport( if (datagram_transport) { datagram_transport->Connect(ice.get()); datagram_dtls_transport = - CreateDtlsTransport(ice.get(), datagram_transport.get()); + CreateDtlsTransport(content_info, ice.get(), datagram_transport.get()); } std::unique_ptr rtp_dtls_transport = - CreateDtlsTransport(ice.get(), nullptr); + CreateDtlsTransport(content_info, ice.get(), nullptr); std::unique_ptr rtcp_dtls_transport; std::unique_ptr unencrypted_rtp_transport; @@ -1183,7 +1185,7 @@ RTCError JsepTransportController::MaybeCreateJsepTransport( RTC_DCHECK(media_transport == nullptr); RTC_DCHECK(datagram_transport == nullptr); rtcp_ice = CreateIceTransport(content_info.name, /*rtcp=*/true); - rtcp_dtls_transport = CreateDtlsTransport(rtcp_ice.get(), + rtcp_dtls_transport = CreateDtlsTransport(content_info, rtcp_ice.get(), /*datagram_transport=*/nullptr); } diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h index 23d4485a6d..70795b0de6 100644 --- a/pc/jsep_transport_controller.h +++ b/pc/jsep_transport_controller.h @@ -355,6 +355,7 @@ class JsepTransportController : public sigslot::has_slots<> { bool local); std::unique_ptr CreateDtlsTransport( + const cricket::ContentInfo& content_info, cricket::IceTransportInternal* ice, DatagramTransportInterface* datagram_transport); std::unique_ptr CreateIceTransport(