Integrate datagram feedback loop

This change removes RTCP Feedback loop if we are using datagram transport by removing transport sequence numbers from RTP packets and recreating RTCP Feedback from Datagram ACKs and Timestamps.

- For outgoing RTP packets, remove transport sequence number and store it with datagram_id. Note that removing transport sequence numbers does not only save 4-8 bytes per packet, but also prevents generation of feedback packets on the receiver side.

- When datagram ACKs, we re-created RTCP feedback with timestamp.

- Replacing previous assumption that datagram_id was the same as packet_id by storing incremental counter of datagram ids (I noticed some packets come without packet_id, which is a bit strange, but easy to support and it's also good not to rely on packet_ids being unique across multiple ssrcs).

Bug: webrtc:9719
Change-Id: Iecfe938ecea1a74e7c9e1484f0e985d72643d4a1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/145269
Commit-Queue: Anton Sukhanov <sukhanov@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Reviewed-by: Bjorn Mellem <mellem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28542}
This commit is contained in:
Anton Sukhanov 2019-07-10 15:44:56 -07:00 committed by Commit Bot
parent 1e19db1d7e
commit ac6c09634f
6 changed files with 287 additions and 23 deletions

View File

@ -98,6 +98,9 @@ class DatagramTransportInterface {
// Datagrams larger than GetLargestDatagramSize() will fail and return error.
//
// Datagrams are sent in FIFO order.
//
// |datagram_id| is only used in ACK/LOST notifications in
// DatagramSinkInterface and does not need to be unique.
virtual RTCError SendDatagram(rtc::ArrayView<const uint8_t> data,
DatagramId datagram_id) = 0;

View File

@ -97,6 +97,7 @@ rtc_static_library("rtc_pc_base") {
"../media:rtc_h264_profile_id",
"../media:rtc_media_base",
"../media:rtc_media_config",
"../modules/rtp_rtcp:rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../p2p:rtc_p2p",
"../rtc_base",

View File

@ -16,15 +16,20 @@
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/rtc_error.h"
#include "logging/rtc_event_log/events/rtc_event_dtls_transport_state.h"
#include "logging/rtc_event_log/events/rtc_event_dtls_writable_state.h"
#include "logging/rtc_event_log/rtc_event_log.h"
#include "modules/rtp_rtcp/include/rtp_header_parser.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "p2p/base/dtls_transport_internal.h"
#include "p2p/base/packet_transport_internal.h"
#include "rtc_base/buffer.h"
#include "rtc_base/checks.h"
#include "rtc_base/dscp.h"
#include "rtc_base/flags.h"
#include "rtc_base/logging.h"
#include "rtc_base/message_queue.h"
#include "rtc_base/rtc_certificate.h"
@ -42,7 +47,18 @@ constexpr bool kBypassDatagramDtlsTestOnly = false;
namespace cricket {
// For RTCP packets we are not storing SentPacketInfo and not interested in
// Acks, so we will use special datagram id for RTCP packets to filter out
// datagram notifications coming from RTCP packets.
constexpr webrtc::DatagramId kRtcpDatagramId = -1;
// Maximum packet size of RTCP feedback packet for allocation. We re-create RTCP
// feedback packets when we get ACK notifications from datagram transport. Our
// rtcp feedback packets contain only 1 ACK, so they are much smaller than 1250.
constexpr size_t kMaxRtcpFeedbackPacketSize = 1250;
DatagramDtlsAdaptor::DatagramDtlsAdaptor(
const std::vector<webrtc::RtpExtension>& rtp_header_extensions,
IceTransportInternal* ice_transport,
webrtc::DatagramTransportInterface* datagram_transport,
const webrtc::CryptoOptions& crypto_options,
@ -51,6 +67,20 @@ DatagramDtlsAdaptor::DatagramDtlsAdaptor(
ice_transport_(ice_transport),
datagram_transport_(datagram_transport),
event_log_(event_log) {
// Save extension map for parsing RTP packets (we only need transport
// sequence numbers).
const webrtc::RtpExtension* transport_sequence_number_extension =
webrtc::RtpExtension::FindHeaderExtensionByUri(
rtp_header_extensions, webrtc::TransportSequenceNumber::kUri);
if (transport_sequence_number_extension != nullptr) {
rtp_header_extension_map_.Register<webrtc::TransportSequenceNumber>(
transport_sequence_number_extension->id);
} else {
RTC_LOG(LS_ERROR) << "Transport sequence numbers are not supported in "
"datagram transport connection";
}
RTC_DCHECK(ice_transport_);
RTC_DCHECK(datagram_transport_);
ConnectToIceTransport();
@ -63,17 +93,14 @@ void DatagramDtlsAdaptor::ConnectToIceTransport() {
this, &DatagramDtlsAdaptor::OnReadyToSend);
ice_transport_->SignalReceivingState.connect(
this, &DatagramDtlsAdaptor::OnReceivingState);
// Datagram transport does not propagate network route change.
ice_transport_->SignalNetworkRouteChanged.connect(
this, &DatagramDtlsAdaptor::OnNetworkRouteChanged);
if (kBypassDatagramDtlsTestOnly) {
// In bypass mode we have to subscribe to ICE read and sent events.
// Test only case to use ICE directly instead of data transport.
ice_transport_->SignalReadPacket.connect(
this, &DatagramDtlsAdaptor::OnReadPacket);
ice_transport_->SignalSentPacket.connect(
this, &DatagramDtlsAdaptor::OnSentPacket);
} else {
@ -97,20 +124,77 @@ int DatagramDtlsAdaptor::SendPacket(const char* data,
size_t len,
const rtc::PacketOptions& options,
int flags) {
RTC_DCHECK_RUN_ON(&thread_checker_);
// TODO(sukhanov): Handle options and flags.
if (kBypassDatagramDtlsTestOnly) {
// In bypass mode sent directly to ICE.
return ice_transport_->SendPacket(data, len, options);
}
// Send datagram with id equal to options.packet_id, so we get it back
// in DatagramDtlsAdaptor::OnDatagramSent() and propagate notification
// up.
webrtc::RTCError error = datagram_transport_->SendDatagram(
rtc::MakeArrayView(reinterpret_cast<const uint8_t*>(data), len),
/*datagram_id=*/options.packet_id);
rtc::ArrayView<const uint8_t> original_data(
reinterpret_cast<const uint8_t*>(data), len);
// RTCP packets are sent as is and they do not require datagram_id.
if (webrtc::RtpHeaderParser::IsRtcp(original_data.data(),
original_data.size())) {
return SendDatagram(original_data, /*datagram_id=*/kRtcpDatagramId);
}
return (error.ok() ? len : -1);
// Assign and increment datagram_id.
webrtc::DatagramId datagram_id = current_datagram_id_;
current_datagram_id_++;
// Parse RTP packet.
webrtc::RtpPacket rtp_packet(&rtp_header_extension_map_);
if (!rtp_packet.Parse(original_data)) {
RTC_NOTREACHED() << "Failed to parse outgoing RtpPacket, len=" << len
<< ", options.packet_id=" << options.packet_id;
return -1;
}
// Try to get transport sequence number.
uint16_t transport_senquence_number;
if (!rtp_packet.GetExtension<webrtc::TransportSequenceNumber>(
&transport_senquence_number)) {
// Save packet info without transport sequence number.
sent_rtp_packet_map_[datagram_id] = SentPacketInfo(
rtp_packet.Ssrc(),
/*transport_sequence_number=*/absl::nullopt, options.packet_id);
RTC_LOG(LS_VERBOSE)
<< "Sending rtp packet without transport sequence number, packet="
<< rtp_packet.ToString();
return SendDatagram(original_data, datagram_id);
}
// Save packet info with sequence number.
sent_rtp_packet_map_[datagram_id] = SentPacketInfo(
rtp_packet.Ssrc(), transport_senquence_number, options.packet_id);
// Since datagram transport provides feedback and timestamps, we do not need
// to send transport sequence number, so we remove it from RTP packet. Later
// when we get Ack for sent datagram, we will re-create RTCP feedback packet.
if (!rtp_packet.RemoveExtension(webrtc::TransportSequenceNumber::kId)) {
RTC_NOTREACHED() << "Failed to remove transport sequence number, packet="
<< rtp_packet.ToString();
return -1;
}
RTC_LOG(LS_VERBOSE) << "Removed transport_senquence_number="
<< transport_senquence_number
<< " from packet=" << rtp_packet.ToString()
<< ", saved bytes=" << len - rtp_packet.size();
return SendDatagram(
rtc::ArrayView<const uint8_t>(rtp_packet.data(), rtp_packet.size()),
datagram_id);
}
int DatagramDtlsAdaptor::SendDatagram(rtc::ArrayView<const uint8_t> data,
webrtc::DatagramId datagram_id) {
webrtc::RTCError error = datagram_transport_->SendDatagram(data, datagram_id);
return (error.ok() ? data.size() : -1);
}
void DatagramDtlsAdaptor::OnReadPacket(rtc::PacketTransportInternal* transport,
@ -145,17 +229,134 @@ void DatagramDtlsAdaptor::OnDatagramReceived(
}
void DatagramDtlsAdaptor::OnDatagramSent(webrtc::DatagramId datagram_id) {
// When we called DatagramTransportInterface::SendDatagram, we passed
// packet_id as datagram_id, so we simply need to set it in sent_packet
// and propagate notification up the stack.
RTC_DCHECK_RUN_ON(&thread_checker_);
// Sent notifications are not needed for RTCP packets.
if (datagram_id == kRtcpDatagramId) {
return;
}
// Find packet_id and propagate OnPacketSent notification.
const auto& it = sent_rtp_packet_map_.find(datagram_id);
if (it == sent_rtp_packet_map_.end()) {
RTC_NOTREACHED() << "Did not find sent packet info for sent datagram_id="
<< datagram_id;
return;
}
// Also see how DatagramDtlsAdaptor::OnSentPacket handles OnSentPacket
// notification from ICE in bypass mode.
rtc::SentPacket sent_packet(/*packet_id=*/datagram_id, rtc::TimeMillis());
rtc::SentPacket sent_packet(/*packet_id=*/it->second.packet_id,
rtc::TimeMillis());
PropagateOnSentNotification(sent_packet);
}
bool DatagramDtlsAdaptor::GetAndRemoveSentPacketInfo(
webrtc::DatagramId datagram_id,
SentPacketInfo* sent_packet_info) {
RTC_DCHECK_NE(datagram_id, kRtcpDatagramId);
RTC_CHECK(sent_packet_info != nullptr);
const auto& it = sent_rtp_packet_map_.find(datagram_id);
if (it == sent_rtp_packet_map_.end()) {
return false;
}
*sent_packet_info = it->second;
sent_rtp_packet_map_.erase(it);
return true;
}
void DatagramDtlsAdaptor::OnDatagramAcked(const webrtc::DatagramAck& ack) {
RTC_DCHECK_RUN_ON(&thread_checker_);
// ACK notifications are not needed for RTCP packets and RTCP packets are not
// stored in SentPacketInfo map.
if (ack.datagram_id == kRtcpDatagramId) {
return;
}
SentPacketInfo sent_packet_info;
if (!GetAndRemoveSentPacketInfo(ack.datagram_id, &sent_packet_info)) {
// TODO(sukhanov): If OnDatagramAck() can come after OnDatagramLost(),
// datagram_id is already deleted and we may need to relax the CHECK below.
// It's probably OK to ignore such datagrams, because it's been a few RTTs
// anyway since they were sent.
RTC_NOTREACHED() << "Did not find sent packet info for datagram_id="
<< ack.datagram_id;
return;
}
RTC_LOG(LS_VERBOSE) << "Datagram acked, datagram_id=" << ack.datagram_id
<< ", transport_sequence_number="
<< sent_packet_info.transport_sequence_number.value_or(-1)
<< ", ssrc=" << sent_packet_info.ssrc
<< ", receive_timestamp_ms="
<< ack.receive_timestamp.ms();
// If transport sequence number was not present in RTP packet, we do not need
// to propagate RTCP feedback.
if (!sent_packet_info.transport_sequence_number) {
return;
}
// TODO(sukhanov): We noticed that datagram transport implementations can
// return zero timestamps in the middle of the call. This is workaround to
// avoid propagating zero timestamps, but we need to understand why we have
// them in the first place.
int64_t receive_timestamp_us = ack.receive_timestamp.us();
if (receive_timestamp_us == 0) {
receive_timestamp_us = previous_nonzero_timestamp_us_;
} else {
previous_nonzero_timestamp_us_ = receive_timestamp_us;
}
// Recreate RTCP feedback packet.
webrtc::rtcp::TransportFeedback feedback_packet;
feedback_packet.SetMediaSsrc(sent_packet_info.ssrc);
const uint16_t transport_sequence_number =
sent_packet_info.transport_sequence_number.value();
feedback_packet.SetBase(transport_sequence_number, receive_timestamp_us);
feedback_packet.AddReceivedPacket(transport_sequence_number,
receive_timestamp_us);
rtc::Buffer buffer(kMaxRtcpFeedbackPacketSize);
size_t index = 0;
if (!feedback_packet.Create(buffer.data(), &index, buffer.capacity(),
nullptr)) {
RTC_NOTREACHED() << "Failed to create RTCP feedback packet";
return;
}
RTC_CHECK_GT(index, 0);
RTC_CHECK_LE(index, kMaxRtcpFeedbackPacketSize);
// Propagage created RTCP packet as normal incoming packet.
buffer.SetSize(index);
PropagateReadPacket(buffer, /*packet_time_us=*/-1);
}
void DatagramDtlsAdaptor::OnDatagramLost(webrtc::DatagramId datagram_id) {
RTC_DCHECK_RUN_ON(&thread_checker_);
// RTCP packets are not stored in SentPacketInfo map.
if (datagram_id == kRtcpDatagramId) {
return;
}
RTC_LOG(LS_INFO) << "Datagram lost, datagram_id=" << datagram_id;
SentPacketInfo sent_packet_info;
if (!GetAndRemoveSentPacketInfo(datagram_id, &sent_packet_info)) {
RTC_NOTREACHED() << "Did not find sent packet info for lost datagram_id="
<< datagram_id;
}
}
void DatagramDtlsAdaptor::OnSentPacket(rtc::PacketTransportInternal* transport,
const rtc::SentPacket& sent_packet) {
// Only used in bypass mode.
@ -256,7 +457,7 @@ void DatagramDtlsAdaptor::OnWritableState(
rtc::PacketTransportInternal* transport) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(transport == ice_transport_);
RTC_LOG(LS_VERBOSE) << ": ice_transport writable state changed to "
RTC_LOG(LS_VERBOSE) << "ice_transport writable state changed to "
<< ice_transport_->writable();
if (kBypassDatagramDtlsTestOnly) {

View File

@ -11,12 +11,15 @@
#ifndef PC_DATAGRAM_DTLS_ADAPTOR_H_
#define PC_DATAGRAM_DTLS_ADAPTOR_H_
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "api/crypto/crypto_options.h"
#include "api/datagram_transport_interface.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/source/rtp_header_extensions.h"
#include "p2p/base/dtls_transport_internal.h"
#include "p2p/base/ice_transport_internal.h"
#include "p2p/base/packet_transport_internal.h"
@ -42,10 +45,12 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal,
// TODO(sukhanov): Taking crypto options, because DtlsTransportInternal
// has a virtual getter crypto_options(). Consider removing getter and
// removing crypto_options from DatagramDtlsAdaptor.
DatagramDtlsAdaptor(IceTransportInternal* ice_transport,
webrtc::DatagramTransportInterface* datagram_transport,
const webrtc::CryptoOptions& crypto_options,
webrtc::RtcEventLog* event_log);
DatagramDtlsAdaptor(
const std::vector<webrtc::RtpExtension>& rtp_header_extensions,
IceTransportInternal* ice_transport,
webrtc::DatagramTransportInterface* datagram_transport,
const webrtc::CryptoOptions& crypto_options,
webrtc::RtcEventLog* event_log);
~DatagramDtlsAdaptor() override;
@ -60,6 +65,10 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal,
void OnDatagramSent(webrtc::DatagramId datagram_id) override;
void OnDatagramAcked(const webrtc::DatagramAck& ack) override;
void OnDatagramLost(webrtc::DatagramId datagram_id) override;
void OnStateChanged(webrtc::MediaTransportState state) override;
// =====================================================
@ -94,6 +103,37 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal,
bool receiving() const override;
private:
// Stored for each sent RTP packet.
struct SentPacketInfo {
SentPacketInfo(uint32_t ssrc,
absl::optional<uint16_t> transport_sequence_number,
int64_t packet_id)
: ssrc(ssrc),
transport_sequence_number(transport_sequence_number),
packet_id(packet_id) {}
SentPacketInfo() = default;
uint32_t ssrc = 0;
// Transport sequence number (if it was provided in outgoing RTP packet).
// It is used to re-create RTCP feedback packets from datagram ACKs.
absl::optional<uint16_t> transport_sequence_number = 0;
// Packet id from rtc::PacketOptions. It is required to propagage sent
// notification up the stack (SignalSentPacket).
int64_t packet_id = 0;
};
// Finds SentPacketInfo for given |datagram_id| and removes map entry.
// Returns false if entry was not found.
bool GetAndRemoveSentPacketInfo(webrtc::DatagramId datagram_id,
SentPacketInfo* sent_packet_info);
// Sends datagram to datagram_transport.
int SendDatagram(rtc::ArrayView<const uint8_t> data,
webrtc::DatagramId datagram_id);
void set_receiving(bool receiving);
void set_writable(bool writable);
void set_dtls_state(DtlsTransportState state);
@ -145,6 +185,22 @@ class DatagramDtlsAdaptor : public DtlsTransportInternal,
DtlsTransportState dtls_state_ = DTLS_TRANSPORT_NEW;
webrtc::RtcEventLog* const event_log_;
// Extension map for parsing transport sequence numbers.
webrtc::RtpHeaderExtensionMap rtp_header_extension_map_;
// Keeps information about sent RTP packet until they are Acked or Lost.
std::map<webrtc::DatagramId, SentPacketInfo> sent_rtp_packet_map_;
// Current datagram_id, incremented after each sent RTP packets.
// Datagram id is passed to datagram transport when we send datagram and we
// get it back in notifications about Sent, Acked and Lost datagrams.
int64_t current_datagram_id_ = 0;
// TODO(sukhanov): Previous nonzero timestamp is required for workaround for
// zero timestamps received, which sometimes are received from datagram
// transport. Investigate if we can eliminate zero timestamps.
int64_t previous_nonzero_timestamp_us_ = 0;
};
} // namespace cricket

View File

@ -474,6 +474,7 @@ JsepTransportController::CreateIceTransport(const std::string transport_name,
std::unique_ptr<cricket::DtlsTransportInternal>
JsepTransportController::CreateDtlsTransport(
const cricket::ContentInfo& content_info,
cricket::IceTransportInternal* ice,
DatagramTransportInterface* datagram_transport) {
RTC_DCHECK(network_thread_->IsCurrent());
@ -485,7 +486,8 @@ JsepTransportController::CreateDtlsTransport(
// Create DTLS wrapper around DatagramTransportInterface.
dtls = absl::make_unique<cricket::DatagramDtlsAdaptor>(
ice, datagram_transport, config_.crypto_options, config_.event_log);
content_info.media_description()->rtp_header_extensions(), ice,
datagram_transport, config_.crypto_options, config_.event_log);
} else if (config_.media_transport_factory &&
config_.use_media_transport_for_media &&
config_.use_media_transport_for_data_channels) {
@ -1164,11 +1166,11 @@ RTCError JsepTransportController::MaybeCreateJsepTransport(
if (datagram_transport) {
datagram_transport->Connect(ice.get());
datagram_dtls_transport =
CreateDtlsTransport(ice.get(), datagram_transport.get());
CreateDtlsTransport(content_info, ice.get(), datagram_transport.get());
}
std::unique_ptr<cricket::DtlsTransportInternal> rtp_dtls_transport =
CreateDtlsTransport(ice.get(), nullptr);
CreateDtlsTransport(content_info, ice.get(), nullptr);
std::unique_ptr<cricket::DtlsTransportInternal> rtcp_dtls_transport;
std::unique_ptr<RtpTransport> unencrypted_rtp_transport;
@ -1183,7 +1185,7 @@ RTCError JsepTransportController::MaybeCreateJsepTransport(
RTC_DCHECK(media_transport == nullptr);
RTC_DCHECK(datagram_transport == nullptr);
rtcp_ice = CreateIceTransport(content_info.name, /*rtcp=*/true);
rtcp_dtls_transport = CreateDtlsTransport(rtcp_ice.get(),
rtcp_dtls_transport = CreateDtlsTransport(content_info, rtcp_ice.get(),
/*datagram_transport=*/nullptr);
}

View File

@ -355,6 +355,7 @@ class JsepTransportController : public sigslot::has_slots<> {
bool local);
std::unique_ptr<cricket::DtlsTransportInternal> CreateDtlsTransport(
const cricket::ContentInfo& content_info,
cricket::IceTransportInternal* ice,
DatagramTransportInterface* datagram_transport);
std::unique_ptr<cricket::IceTransportInternal> CreateIceTransport(