diff --git a/logging/BUILD.gn b/logging/BUILD.gn index e1d5289cb0..2258a7356a 100644 --- a/logging/BUILD.gn +++ b/logging/BUILD.gn @@ -335,8 +335,8 @@ if (rtc_enable_protobuf) { "../api/units:time_delta", "../api/units:timestamp", "../call:video_stream_api", + "../modules:module_api", "../modules/audio_coding:audio_network_adaptor", - "../modules/congestion_controller/rtp:transport_feedback", "../modules/remote_bitrate_estimator", "../modules/rtp_rtcp", "../modules/rtp_rtcp:rtp_rtcp_format", diff --git a/logging/rtc_event_log/logged_events.h b/logging/rtc_event_log/logged_events.h index afe5127cfa..3a82d626aa 100644 --- a/logging/rtc_event_log/logged_events.h +++ b/logging/rtc_event_log/logged_events.h @@ -392,7 +392,9 @@ struct LoggedRtcpPacketPli { }; struct LoggedRtcpPacketTransportFeedback { - LoggedRtcpPacketTransportFeedback() = default; + LoggedRtcpPacketTransportFeedback() + : transport_feedback(/*include_timestamps=*/true, /*include_lost*/ true) { + } LoggedRtcpPacketTransportFeedback( int64_t timestamp_us, const rtcp::TransportFeedback& transport_feedback) diff --git a/logging/rtc_event_log/rtc_event_log_parser.cc b/logging/rtc_event_log/rtc_event_log_parser.cc index 289744da27..c66ecfdc1b 100644 --- a/logging/rtc_event_log/rtc_event_log_parser.cc +++ b/logging/rtc_event_log/rtc_event_log_parser.cc @@ -30,7 +30,7 @@ #include "logging/rtc_event_log/rtc_event_log.h" #include "logging/rtc_event_log/rtc_event_processor.h" #include "modules/audio_coding/audio_network_adaptor/include/audio_network_adaptor.h" -#include "modules/congestion_controller/rtp/transport_feedback_adapter.h" +#include "modules/include/module_common_types.h" #include "modules/remote_bitrate_estimator/include/bwe_defines.h" #include "modules/rtp_rtcp/include/rtp_cvo.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -1918,7 +1918,6 @@ std::vector ParsedRtcEventLog::GetPacketInfos( AddSendStreamInfos(&streams, video_send_configs(), LoggedMediaType::kVideo); } - TransportFeedbackAdapter feedback_adapter; std::vector overheads = GetOverheadChangingEvents(GetRouteChanges(), direction); auto overhead_iter = overheads.begin(); @@ -1926,6 +1925,7 @@ std::vector ParsedRtcEventLog::GetPacketInfos( std::map indices; uint16_t current_overhead = kDefaultOverhead; Timestamp last_log_time = Timestamp::Zero(); + SequenceNumberUnwrapper seq_num_unwrapper; auto advance_time = [&](Timestamp new_log_time) { if (overhead_iter != overheads.end() && @@ -1959,72 +1959,82 @@ std::vector ParsedRtcEventLog::GetPacketInfos( } LoggedPacketInfo logged(rtp, stream->media_type, stream->rtx, capture_time); logged.overhead = current_overhead; - if (rtp.header.extension.hasTransportSequenceNumber) { + if (logged.has_transport_seq_no) { logged.log_feedback_time = Timestamp::PlusInfinity(); - - RtpPacketSendInfo packet_info; - packet_info.ssrc = rtp.header.ssrc; - packet_info.transport_sequence_number = - rtp.header.extension.transportSequenceNumber; - packet_info.rtp_sequence_number = rtp.header.sequenceNumber; - packet_info.has_rtp_sequence_number = true; - packet_info.length = rtp.total_length; - feedback_adapter.AddPacket(packet_info, - 0u, // Should this be current_overhead? - Timestamp::ms(rtp.log_time_ms())); - - rtc::SentPacket sent_packet; - sent_packet.send_time_ms = rtp.log_time_ms(); - sent_packet.info.packet_size_bytes = rtp.total_length; - sent_packet.info.included_in_feedback = true; - sent_packet.packet_id = rtp.header.extension.transportSequenceNumber; - auto sent_packet_msg = feedback_adapter.ProcessSentPacket(sent_packet); - RTC_CHECK(sent_packet_msg); - indices[sent_packet_msg->sequence_number] = packets.size(); + int64_t unwrapped_seq_num = + seq_num_unwrapper.Unwrap(logged.transport_seq_no); + indices[unwrapped_seq_num] = packets.size(); } packets.push_back(logged); }; - auto feedback_handler = [&](const LoggedRtcpPacketTransportFeedback& logged) { - advance_time(Timestamp::ms(logged.log_time_ms())); - auto msg = feedback_adapter.ProcessTransportFeedback( - logged.transport_feedback, Timestamp::ms(logged.log_time_ms())); - if (!msg.has_value() || msg->packet_feedbacks.empty()) - return; + Timestamp feedback_base_time = Timestamp::MinusInfinity(); + absl::optional last_feedback_base_time_us; - auto& last_fb = msg->packet_feedbacks.back(); - Timestamp last_recv_time = last_fb.receive_time; - // This can happen if send time info is missing for the real last packet in - // the feedback, allowing the reported last packet to med indicated as lost. - if (last_recv_time.IsInfinite()) - RTC_LOG(LS_WARNING) << "No receive time for last packet in feedback."; - - for (auto& fb : msg->packet_feedbacks) { - if (indices.find(fb.sent_packet.sequence_number) == indices.end()) { - RTC_LOG(LS_ERROR) << "Received feedback for unknown packet: " - << fb.sent_packet.sequence_number; - continue; - } - LoggedPacketInfo* sent = - &packets[indices[fb.sent_packet.sequence_number]]; - sent->reported_recv_time = fb.receive_time; - // If we have received feedback with a valid receive time for this packet - // before, we keep the previous values. - if (sent->log_feedback_time.IsFinite() && - sent->reported_recv_time.IsFinite()) - continue; - sent->log_feedback_time = msg->feedback_time; - if (last_recv_time.IsFinite()) { - if (direction == PacketDirection::kOutgoingPacket) { - sent->feedback_hold_duration = last_recv_time - fb.receive_time; + auto feedback_handler = + [&](const LoggedRtcpPacketTransportFeedback& logged_rtcp) { + auto log_feedback_time = Timestamp::ms(logged_rtcp.log_time_ms()); + advance_time(log_feedback_time); + const auto& feedback = logged_rtcp.transport_feedback; + // Add timestamp deltas to a local time base selected on first packet + // arrival. This won't be the true time base, but makes it easier to + // manually inspect time stamps. + if (!last_feedback_base_time_us) { + feedback_base_time = log_feedback_time; } else { - sent->feedback_hold_duration = - Timestamp::ms(logged.log_time_ms()) - sent->log_packet_time; + feedback_base_time += TimeDelta::us( + feedback.GetBaseDeltaUs(*last_feedback_base_time_us)); } - } - sent->last_in_feedback = (&fb == &last_fb); - } - }; + last_feedback_base_time_us = feedback.GetBaseTimeUs(); + + std::vector packet_feedbacks; + packet_feedbacks.reserve(feedback.GetAllPackets().size()); + Timestamp receive_timestamp = feedback_base_time; + for (const auto& packet : feedback.GetAllPackets()) { + int64_t unwrapped_seq_num = + seq_num_unwrapper.Unwrap(packet.sequence_number()); + auto it = indices.find(unwrapped_seq_num); + if (it == indices.end()) { + RTC_LOG(LS_WARNING) << "Received feedback for unknown packet: " + << unwrapped_seq_num; + continue; + } + LoggedPacketInfo* sent = &packets[it->second]; + if (log_feedback_time - sent->log_packet_time > + TimeDelta::seconds(60)) { + RTC_LOG(LS_WARNING) + << "Received very late feedback, possibly due to wraparound."; + continue; + } + if (packet.received()) { + receive_timestamp += TimeDelta::us(packet.delta_us()); + if (sent->reported_recv_time.IsInfinite()) { + sent->reported_recv_time = Timestamp::ms(receive_timestamp.ms()); + sent->log_feedback_time = log_feedback_time; + } + } else { + if (sent->reported_recv_time.IsInfinite() && + sent->log_feedback_time.IsInfinite()) { + sent->reported_recv_time = Timestamp::PlusInfinity(); + sent->log_feedback_time = log_feedback_time; + } + } + packet_feedbacks.push_back(sent); + } + if (packet_feedbacks.empty()) + return; + LoggedPacketInfo* last = packet_feedbacks.back(); + last->last_in_feedback = true; + for (LoggedPacketInfo* fb : packet_feedbacks) { + if (direction == PacketDirection::kOutgoingPacket) { + fb->feedback_hold_duration = + last->reported_recv_time - fb->reported_recv_time; + } else { + fb->feedback_hold_duration = + log_feedback_time - fb->log_packet_time; + } + } + }; RtcEventProcessor process; for (const auto& rtp_packets : rtp_packets_by_ssrc(direction)) { diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc index bb2425db85..02e2088f14 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc @@ -43,9 +43,6 @@ PacketResult NetworkPacketFeedbackFromRtpPacketFeedback( } // namespace const int64_t kNoTimestamp = -1; const int64_t kSendTimeHistoryWindowMs = 60000; -const int64_t kBaseTimestampScaleFactor = - rtcp::TransportFeedback::kDeltaScaleFactor * (1 << 8); -const int64_t kBaseTimestampRangeSizeUs = kBaseTimestampScaleFactor * (1 << 24); TransportFeedbackAdapter::TransportFeedbackAdapter() : allow_duplicates_(field_trial::IsEnabled( @@ -191,26 +188,15 @@ DataSize TransportFeedbackAdapter::GetOutstandingData() const { std::vector TransportFeedbackAdapter::GetPacketFeedbackVector( const rtcp::TransportFeedback& feedback, Timestamp feedback_time) { - int64_t timestamp_us = feedback.GetBaseTimeUs(); - // Add timestamp deltas to a local time base selected on first packet arrival. // This won't be the true time base, but makes it easier to manually inspect // time stamps. if (last_timestamp_us_ == kNoTimestamp) { current_offset_ms_ = feedback_time.ms(); } else { - int64_t delta = timestamp_us - last_timestamp_us_; - - // Detect and compensate for wrap-arounds in base time. - if (std::abs(delta - kBaseTimestampRangeSizeUs) < std::abs(delta)) { - delta -= kBaseTimestampRangeSizeUs; // Wrap backwards. - } else if (std::abs(delta + kBaseTimestampRangeSizeUs) < std::abs(delta)) { - delta += kBaseTimestampRangeSizeUs; // Wrap forwards. - } - - current_offset_ms_ += delta / 1000; + current_offset_ms_ += feedback.GetBaseDeltaUs(last_timestamp_us_) / 1000; } - last_timestamp_us_ = timestamp_us; + last_timestamp_us_ = feedback.GetBaseTimeUs(); std::vector packet_feedback_vector; if (feedback.GetPacketStatusCount() == 0) { diff --git a/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.cc b/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.cc index ee8b93a6d7..4382f32c22 100644 --- a/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.cc +++ b/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.cc @@ -262,10 +262,11 @@ void TransportFeedback::LastChunk::DecodeRunLength(uint16_t chunk, } TransportFeedback::TransportFeedback() - : TransportFeedback(/*include_timestamps=*/true) {} + : TransportFeedback(/*include_timestamps=*/true, /*include_lost*/ false) {} -TransportFeedback::TransportFeedback(bool include_timestamps) - : base_seq_no_(0), +TransportFeedback::TransportFeedback(bool include_timestamps, bool include_lost) + : include_lost_(include_lost), + base_seq_no_(0), num_seq_no_(0), base_time_ticks_(0), feedback_seq_(0), @@ -276,13 +277,15 @@ TransportFeedback::TransportFeedback(bool include_timestamps) TransportFeedback::TransportFeedback(const TransportFeedback&) = default; TransportFeedback::TransportFeedback(TransportFeedback&& other) - : base_seq_no_(other.base_seq_no_), + : include_lost_(other.include_lost_), + base_seq_no_(other.base_seq_no_), num_seq_no_(other.num_seq_no_), base_time_ticks_(other.base_time_ticks_), feedback_seq_(other.feedback_seq_), include_timestamps_(other.include_timestamps_), last_timestamp_us_(other.last_timestamp_us_), - packets_(std::move(other.packets_)), + received_packets_(std::move(other.received_packets_)), + all_packets_(std::move(other.all_packets_)), encoded_chunks_(std::move(other.encoded_chunks_)), last_chunk_(other.last_chunk_), size_bytes_(other.size_bytes_) { @@ -341,7 +344,7 @@ bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number, if (!AddDeltaSize(delta_size)) return false; - packets_.emplace_back(sequence_number, delta); + received_packets_.emplace_back(sequence_number, delta); last_timestamp_us_ += delta * kDeltaScaleFactor; if (include_timestamps_) { size_bytes_ += delta_size; @@ -351,7 +354,13 @@ bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number, const std::vector& TransportFeedback::GetReceivedPackets() const { - return packets_; + return received_packets_; +} + +const std::vector& +TransportFeedback::GetAllPackets() const { + RTC_DCHECK(include_lost_); + return all_packets_; } uint16_t TransportFeedback::GetBaseSequence() const { @@ -362,6 +371,18 @@ int64_t TransportFeedback::GetBaseTimeUs() const { return static_cast(base_time_ticks_) * kBaseScaleFactor; } +int64_t TransportFeedback::GetBaseDeltaUs(int64_t prev_timestamp_us) const { + int64_t delta = GetBaseTimeUs() - prev_timestamp_us; + + // Detect and compensate for wrap-arounds in base time. + if (std::abs(delta - kTimeWrapPeriodUs) < std::abs(delta)) { + delta -= kTimeWrapPeriodUs; // Wrap backwards. + } else if (std::abs(delta + kTimeWrapPeriodUs) < std::abs(delta)) { + delta += kTimeWrapPeriodUs; // Wrap forwards. + } + return delta; +} + // De-serialize packet. bool TransportFeedback::Parse(const CommonHeader& packet) { RTC_DCHECK_EQ(packet.type(), kPacketType); @@ -428,17 +449,23 @@ bool TransportFeedback::Parse(const CommonHeader& packet) { } switch (delta_size) { case 0: + if (include_lost_) + all_packets_.emplace_back(seq_no); break; case 1: { int16_t delta = payload[index]; - packets_.emplace_back(seq_no, delta); + received_packets_.emplace_back(seq_no, delta); + if (include_lost_) + all_packets_.emplace_back(seq_no, delta); last_timestamp_us_ += delta * kDeltaScaleFactor; index += delta_size; break; } case 2: { int16_t delta = ByteReader::ReadBigEndian(&payload[index]); - packets_.emplace_back(seq_no, delta); + received_packets_.emplace_back(seq_no, delta); + if (include_lost_) + all_packets_.emplace_back(seq_no, delta); last_timestamp_us_ += delta * kDeltaScaleFactor; index += delta_size; break; @@ -460,7 +487,14 @@ bool TransportFeedback::Parse(const CommonHeader& packet) { for (size_t delta_size : delta_sizes) { // Use delta sizes to detect if packet was received. if (delta_size > 0) { - packets_.emplace_back(seq_no, 0); + received_packets_.emplace_back(seq_no, 0); + } + if (include_lost_) { + if (delta_size > 0) { + all_packets_.emplace_back(seq_no, 0); + } else { + all_packets_.emplace_back(seq_no); + } } ++seq_no; } @@ -503,11 +537,11 @@ bool TransportFeedback::IsConsistent() const { return false; } int64_t timestamp_us = base_time_ticks_ * kBaseScaleFactor; - auto packet_it = packets_.begin(); + auto packet_it = received_packets_.begin(); uint16_t seq_no = base_seq_no_; for (DeltaSize delta_size : delta_sizes) { if (delta_size > 0) { - if (packet_it == packets_.end()) { + if (packet_it == received_packets_.end()) { RTC_LOG(LS_ERROR) << "Failed to find delta for seq_no " << seq_no; return false; } @@ -532,7 +566,7 @@ bool TransportFeedback::IsConsistent() const { } ++seq_no; } - if (packet_it != packets_.end()) { + if (packet_it != received_packets_.end()) { RTC_LOG(LS_ERROR) << "Unencoded delta for seq_no " << packet_it->sequence_number(); return false; @@ -601,7 +635,7 @@ bool TransportFeedback::Create(uint8_t* packet, } if (include_timestamps_) { - for (const auto& received_packet : packets_) { + for (const auto& received_packet : received_packets_) { int16_t delta = received_packet.delta_ticks(); if (delta >= 0 && delta <= 0xFF) { packet[(*position)++] = delta; @@ -625,7 +659,8 @@ bool TransportFeedback::Create(uint8_t* packet, void TransportFeedback::Clear() { num_seq_no_ = 0; last_timestamp_us_ = GetBaseTimeUs(); - packets_.clear(); + received_packets_.clear(); + all_packets_.clear(); encoded_chunks_.clear(); last_chunk_.Clear(); size_bytes_ = kTransportFeedbackHeaderSizeBytes; diff --git a/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h b/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h index 174ef6bcb5..00c649663d 100644 --- a/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h +++ b/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h @@ -25,17 +25,23 @@ class TransportFeedback : public Rtpfb { class ReceivedPacket { public: ReceivedPacket(uint16_t sequence_number, int16_t delta_ticks) - : sequence_number_(sequence_number), delta_ticks_(delta_ticks) {} + : sequence_number_(sequence_number), + delta_ticks_(delta_ticks), + received_(true) {} + explicit ReceivedPacket(uint16_t sequence_number) + : sequence_number_(sequence_number), received_(false) {} ReceivedPacket(const ReceivedPacket&) = default; ReceivedPacket& operator=(const ReceivedPacket&) = default; uint16_t sequence_number() const { return sequence_number_; } int16_t delta_ticks() const { return delta_ticks_; } int32_t delta_us() const { return delta_ticks_ * kDeltaScaleFactor; } + bool received() const { return received_; } private: uint16_t sequence_number_; int16_t delta_ticks_; + bool received_; }; // TODO(sprang): IANA reg? static constexpr uint8_t kFeedbackMessageType = 15; @@ -45,10 +51,11 @@ class TransportFeedback : public Rtpfb { static constexpr size_t kMaxReportedPackets = 0xffff; TransportFeedback(); - explicit TransportFeedback( - bool include_timestamps); // If |include_timestamps| is set to false, the - // created packet will not contain the receive - // delta block. + + // If |include_timestamps| is set to false, the created packet will not + // contain the receive delta block. + explicit TransportFeedback(bool include_timestamps, + bool include_lost = false); TransportFeedback(const TransportFeedback&); TransportFeedback(TransportFeedback&&); @@ -60,6 +67,7 @@ class TransportFeedback : public Rtpfb { // NOTE: This method requires increasing sequence numbers (excepting wraps). bool AddReceivedPacket(uint16_t sequence_number, int64_t timestamp_us); const std::vector& GetReceivedPackets() const; + const std::vector& GetAllPackets() const; uint16_t GetBaseSequence() const; @@ -69,6 +77,9 @@ class TransportFeedback : public Rtpfb { // Get the reference time in microseconds, including any precision loss. int64_t GetBaseTimeUs() const; + // Get the unwrapped delta between current base time and |prev_timestamp_us|. + int64_t GetBaseDeltaUs(int64_t prev_timestamp_us) const; + // Does the feedback packet contain timestamp information? bool IncludeTimestamps() const { return include_timestamps_; } @@ -144,6 +155,7 @@ class TransportFeedback : public Rtpfb { bool AddDeltaSize(DeltaSize delta_size); + const bool include_lost_; uint16_t base_seq_no_; uint16_t num_seq_no_; int32_t base_time_ticks_; @@ -151,7 +163,8 @@ class TransportFeedback : public Rtpfb { bool include_timestamps_; int64_t last_timestamp_us_; - std::vector packets_; + std::vector received_packets_; + std::vector all_packets_; // All but last encoded packet chunks. std::vector encoded_chunks_; LastChunk last_chunk_; diff --git a/modules/rtp_rtcp/source/rtcp_packet/transport_feedback_unittest.cc b/modules/rtp_rtcp/source/rtcp_packet/transport_feedback_unittest.cc index 0bb2d475f5..83f28b0506 100644 --- a/modules/rtp_rtcp/source/rtcp_packet/transport_feedback_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_packet/transport_feedback_unittest.cc @@ -15,6 +15,7 @@ #include #include "modules/rtp_rtcp/source/byte_io.h" +#include "modules/rtp_rtcp/source/rtcp_packet/common_header.h" #include "test/gmock.h" #include "test/gtest.h" @@ -555,5 +556,51 @@ TEST(RtcpPacketTest, TransportFeedbackMoveConstructor) { EXPECT_EQ(moved.Build(), feedback_copy.Build()); } +TEST(TransportFeedbackTest, ReportsMissingPackets) { + const uint16_t kBaseSeqNo = 1000; + const int64_t kBaseTimestampUs = 10000; + const uint8_t kFeedbackSeqNo = 90; + TransportFeedback feedback_builder(/*include_timestamps*/ true); + feedback_builder.SetBase(kBaseSeqNo, kBaseTimestampUs); + feedback_builder.SetFeedbackSequenceNumber(kFeedbackSeqNo); + feedback_builder.AddReceivedPacket(kBaseSeqNo + 0, kBaseTimestampUs); + // Packet losses indicated by jump in sequence number. + feedback_builder.AddReceivedPacket(kBaseSeqNo + 3, kBaseTimestampUs + 2000); + rtc::Buffer coded = feedback_builder.Build(); + + rtcp::CommonHeader header; + header.Parse(coded.data(), coded.size()); + TransportFeedback feedback(/*include_timestamps*/ true, + /*include_lost*/ true); + feedback.Parse(header); + auto packets = feedback.GetAllPackets(); + EXPECT_TRUE(packets[0].received()); + EXPECT_FALSE(packets[1].received()); + EXPECT_FALSE(packets[2].received()); + EXPECT_TRUE(packets[3].received()); +} + +TEST(TransportFeedbackTest, ReportsMissingPacketsWithoutTimestamps) { + const uint16_t kBaseSeqNo = 1000; + const uint8_t kFeedbackSeqNo = 90; + TransportFeedback feedback_builder(/*include_timestamps*/ false); + feedback_builder.SetBase(kBaseSeqNo, 10000); + feedback_builder.SetFeedbackSequenceNumber(kFeedbackSeqNo); + feedback_builder.AddReceivedPacket(kBaseSeqNo + 0, /*timestamp_us*/ 0); + // Packet losses indicated by jump in sequence number. + feedback_builder.AddReceivedPacket(kBaseSeqNo + 3, /*timestamp_us*/ 0); + rtc::Buffer coded = feedback_builder.Build(); + + rtcp::CommonHeader header; + header.Parse(coded.data(), coded.size()); + TransportFeedback feedback(/*include_timestamps*/ true, + /*include_lost*/ true); + feedback.Parse(header); + auto packets = feedback.GetAllPackets(); + EXPECT_TRUE(packets[0].received()); + EXPECT_FALSE(packets[1].received()); + EXPECT_FALSE(packets[2].received()); + EXPECT_TRUE(packets[3].received()); +} } // namespace } // namespace webrtc