diff --git a/logging/BUILD.gn b/logging/BUILD.gn index eefb83661a..0269485b8f 100644 --- a/logging/BUILD.gn +++ b/logging/BUILD.gn @@ -266,6 +266,7 @@ if (rtc_enable_protobuf) { "rtc_event_log/rtc_event_log_parser.h", "rtc_event_log/rtc_event_log_parser_new.cc", "rtc_event_log/rtc_event_log_parser_new.h", + "rtc_event_log/rtc_event_processor.h", ] deps = [ @@ -300,6 +301,7 @@ if (rtc_enable_protobuf) { "rtc_event_log/rtc_event_log_unittest.cc", "rtc_event_log/rtc_event_log_unittest_helper.cc", "rtc_event_log/rtc_event_log_unittest_helper.h", + "rtc_event_log/rtc_event_processor_unittest.cc", ] deps = [ ":ice_log", @@ -351,6 +353,8 @@ if (rtc_enable_protobuf) { "../system_wrappers:field_trial_default", "../system_wrappers:metrics_default", "../test:rtp_test_utils", + "//third_party/abseil-cpp/absl/memory:memory", + "//third_party/abseil-cpp/absl/types:optional", ] if (!build_with_chromium && is_clang) { # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). diff --git a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc index b62759b0d5..a3d3456a7b 100644 --- a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc +++ b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc @@ -15,9 +15,14 @@ #include #include +#include "absl/memory/memory.h" +#include "absl/types/optional.h" #include "logging/rtc_event_log/rtc_event_log.h" #include "logging/rtc_event_log/rtc_event_log_parser_new.h" +#include "logging/rtc_event_log/rtc_event_processor.h" #include "modules/rtp_rtcp/source/byte_io.h" +#include "modules/rtp_rtcp/source/rtp_header_extensions.h" +#include "modules/rtp_rtcp/source/rtp_packet.h" #include "modules/rtp_rtcp/source/rtp_utility.h" #include "rtc_base/checks.h" #include "rtc_base/flags.h" @@ -59,8 +64,9 @@ DEFINE_bool(help, false, "Prints this message."); // The empty string must be validated as true, because it is the default value // of the command-line flag. In this case, no value is written to the output // variable. -bool ParseSsrc(std::string str, uint32_t* ssrc) { +absl::optional ParseSsrc(std::string str) { // If the input string starts with 0x or 0X it indicates a hexadecimal number. + uint32_t ssrc; auto read_mode = std::dec; if (str.size() > 2 && (str.substr(0, 2) == "0x" || str.substr(0, 2) == "0X")) { @@ -68,8 +74,79 @@ bool ParseSsrc(std::string str, uint32_t* ssrc) { str = str.substr(2); } std::stringstream ss(str); - ss >> read_mode >> *ssrc; - return str.empty() || (!ss.fail() && ss.eof()); + ss >> read_mode >> ssrc; + if (str.empty() || (!ss.fail() && ss.eof())) + return ssrc; + return absl::nullopt; +} + +bool ShouldSkipStream(MediaType media_type, + uint32_t ssrc, + absl::optional ssrc_filter) { + if (!FLAG_audio && media_type == MediaType::AUDIO) + return true; + if (!FLAG_video && media_type == MediaType::VIDEO) + return true; + if (!FLAG_data && media_type == MediaType::DATA) + return true; + if (ssrc_filter.has_value() && ssrc != *ssrc_filter) + return true; + return false; +} + +// Convert a LoggedRtpPacketIncoming to a test::RtpPacket. Header extension IDs +// are allocated according to the provided extension map. This might not match +// the extension map used in the actual call. +void ConvertRtpPacket(const webrtc::LoggedRtpPacketIncoming& incoming, + const webrtc::RtpHeaderExtensionMap default_extension_map, + webrtc::test::RtpPacket* packet) { + webrtc::RtpPacket reconstructed_packet(&default_extension_map); + + reconstructed_packet.SetMarker(incoming.rtp.header.markerBit); + reconstructed_packet.SetPayloadType(incoming.rtp.header.payloadType); + reconstructed_packet.SetSequenceNumber(incoming.rtp.header.sequenceNumber); + reconstructed_packet.SetTimestamp(incoming.rtp.header.timestamp); + reconstructed_packet.SetSsrc(incoming.rtp.header.ssrc); + if (incoming.rtp.header.numCSRCs > 0) { + reconstructed_packet.SetCsrcs(rtc::ArrayView( + incoming.rtp.header.arrOfCSRCs, incoming.rtp.header.numCSRCs)); + } + + // Set extensions. + if (incoming.rtp.header.extension.hasTransmissionTimeOffset) + reconstructed_packet.SetExtension( + incoming.rtp.header.extension.transmissionTimeOffset); + if (incoming.rtp.header.extension.hasAbsoluteSendTime) + reconstructed_packet.SetExtension( + incoming.rtp.header.extension.absoluteSendTime); + if (incoming.rtp.header.extension.hasTransportSequenceNumber) + reconstructed_packet.SetExtension( + incoming.rtp.header.extension.transportSequenceNumber); + if (incoming.rtp.header.extension.hasAudioLevel) + reconstructed_packet.SetExtension( + incoming.rtp.header.extension.voiceActivity, + incoming.rtp.header.extension.audioLevel); + if (incoming.rtp.header.extension.hasVideoRotation) + reconstructed_packet.SetExtension( + incoming.rtp.header.extension.videoRotation); + if (incoming.rtp.header.extension.hasVideoContentType) + reconstructed_packet.SetExtension( + incoming.rtp.header.extension.videoContentType); + if (incoming.rtp.header.extension.has_video_timing) + reconstructed_packet.SetExtension( + incoming.rtp.header.extension.video_timing); + + RTC_DCHECK_EQ(reconstructed_packet.size(), incoming.rtp.header_length); + RTC_DCHECK_EQ(reconstructed_packet.headers_size(), + incoming.rtp.header_length); + memcpy(packet->data, reconstructed_packet.data(), + reconstructed_packet.headers_size()); + packet->length = reconstructed_packet.headers_size(); + packet->original_length = incoming.rtp.total_length; + packet->time_ms = incoming.log_time_ms(); + // Set padding bit. + if (incoming.rtp.header.paddingLength > 0) + packet->data[0] = packet->data[0] | 0x20; } } // namespace @@ -97,10 +174,11 @@ int main(int argc, char* argv[]) { std::string input_file = argv[1]; std::string output_file = argv[2]; - uint32_t ssrc_filter = 0; - if (strlen(FLAG_ssrc) > 0) - RTC_CHECK(ParseSsrc(FLAG_ssrc, &ssrc_filter)) - << "Flag verification has failed."; + absl::optional ssrc_filter; + if (strlen(FLAG_ssrc) > 0) { + ssrc_filter = ParseSsrc(FLAG_ssrc); + RTC_CHECK(ssrc_filter.has_value()) << "Failed to read SSRC filter flag."; + } webrtc::ParsedRtcEventLogNew parsed_stream; if (!parsed_stream.ParseFile(input_file)) { @@ -122,84 +200,57 @@ int main(int argc, char* argv[]) { << " events in the input file." << std::endl; int rtp_counter = 0, rtcp_counter = 0; bool header_only = false; - for (size_t i = 0; i < parsed_stream.GetNumberOfEvents(); i++) { - // The parsed_stream will assert if the protobuf event is missing - // some required fields and we attempt to access them. We could consider - // a softer failure option, but it does not seem useful to generate - // RTP dumps based on broken event logs. - if (FLAG_rtp && parsed_stream.GetEventType(i) == - webrtc::ParsedRtcEventLogNew::EventType::RTP_EVENT) { - webrtc::test::RtpPacket packet; - webrtc::PacketDirection direction; - parsed_stream.GetRtpHeader(i, &direction, packet.data, &packet.length, - &packet.original_length, nullptr); - if (packet.original_length > packet.length) - header_only = true; - packet.time_ms = parsed_stream.GetTimestamp(i) / 1000; - webrtc::RtpUtility::RtpHeaderParser rtp_parser(packet.data, - packet.length); + webrtc::RtpHeaderExtensionMap default_extension_map = + webrtc::ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap(); + auto handle_rtp = [&default_extension_map, &rtp_writer, &rtp_counter]( + const webrtc::LoggedRtpPacketIncoming& incoming) { + webrtc::test::RtpPacket packet; + ConvertRtpPacket(incoming, default_extension_map, &packet); - // TODO(terelius): Maybe add a flag to dump outgoing traffic instead? - if (direction == webrtc::kOutgoingPacket) - continue; + rtp_writer->WritePacket(&packet); + rtp_counter++; + }; - webrtc::RTPHeader parsed_header; - rtp_parser.Parse(&parsed_header); - MediaType media_type = - parsed_stream.GetMediaType(parsed_header.ssrc, direction); - if (!FLAG_audio && media_type == MediaType::AUDIO) - continue; - if (!FLAG_video && media_type == MediaType::VIDEO) - continue; - if (!FLAG_data && media_type == MediaType::DATA) - continue; - if (strlen(FLAG_ssrc) > 0) { - const uint32_t packet_ssrc = - webrtc::ByteReader::ReadBigEndian( - reinterpret_cast(packet.data + 8)); - if (packet_ssrc != ssrc_filter) - continue; - } + auto handle_rtcp = [&rtp_writer, &rtcp_counter]( + const webrtc::LoggedRtcpPacketIncoming& incoming) { + webrtc::test::RtpPacket packet; + memcpy(packet.data, incoming.rtcp.raw_data.data(), + incoming.rtcp.raw_data.size()); + packet.length = incoming.rtcp.raw_data.size(); + // For RTCP packets the original_length should be set to 0 in the + // RTPdump format. + packet.original_length = 0; + packet.time_ms = incoming.log_time_ms(); - rtp_writer->WritePacket(&packet); - rtp_counter++; - } - if (FLAG_rtcp && parsed_stream.GetEventType(i) == - webrtc::ParsedRtcEventLogNew::EventType::RTCP_EVENT) { - webrtc::test::RtpPacket packet; - webrtc::PacketDirection direction; - parsed_stream.GetRtcpPacket(i, &direction, packet.data, &packet.length); - // For RTCP packets the original_length should be set to 0 in the - // RTPdump format. - packet.original_length = 0; - packet.time_ms = parsed_stream.GetTimestamp(i) / 1000; + rtp_writer->WritePacket(&packet); + rtcp_counter++; + }; - // TODO(terelius): Maybe add a flag to dump outgoing traffic instead? - if (direction == webrtc::kOutgoingPacket) - continue; - - // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain - // report blocks for many streams, thus several SSRCs and they doen't - // necessarily have to be of the same media type. - const uint32_t packet_ssrc = webrtc::ByteReader::ReadBigEndian( - reinterpret_cast(packet.data + 4)); - MediaType media_type = parsed_stream.GetMediaType(packet_ssrc, direction); - if (!FLAG_audio && media_type == MediaType::AUDIO) - continue; - if (!FLAG_video && media_type == MediaType::VIDEO) - continue; - if (!FLAG_data && media_type == MediaType::DATA) - continue; - if (strlen(FLAG_ssrc) > 0) { - if (packet_ssrc != ssrc_filter) - continue; - } - - rtp_writer->WritePacket(&packet); - rtcp_counter++; - } + webrtc::RtcEventProcessor event_processor; + for (const auto& stream : parsed_stream.incoming_rtp_packets_by_ssrc()) { + MediaType media_type = + parsed_stream.GetMediaType(stream.ssrc, webrtc::kIncomingPacket); + if (ShouldSkipStream(media_type, stream.ssrc, ssrc_filter)) + continue; + auto rtp_view = absl::make_unique< + webrtc::ProcessableEventList>( + stream.incoming_packets.begin(), stream.incoming_packets.end(), + handle_rtp); + event_processor.AddEvents(std::move(rtp_view)); } + // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain + // report blocks for many streams, thus several SSRCs and they don't + // necessarily have to be of the same media type. We therefore don't + // support filtering of RTCP based on SSRC and media type. + auto rtcp_view = absl::make_unique< + webrtc::ProcessableEventList>( + parsed_stream.incoming_rtcp_packets().begin(), + parsed_stream.incoming_rtcp_packets().end(), handle_rtcp); + event_processor.AddEvents(std::move(rtcp_view)); + + event_processor.ProcessEventsInOrder(); + std::cout << "Wrote " << rtp_counter << (header_only ? " header-only" : "") << " RTP packets and " << rtcp_counter << " RTCP packets to the " << "output file." << std::endl; diff --git a/logging/rtc_event_log/rtc_event_log_parser_new.cc b/logging/rtc_event_log/rtc_event_log_parser_new.cc index 9fbf0d0c74..4a75dc35fa 100644 --- a/logging/rtc_event_log/rtc_event_log_parser_new.cc +++ b/logging/rtc_event_log/rtc_event_log_parser_new.cc @@ -210,33 +210,6 @@ IceCandidatePairEventType GetRuntimeIceCandidatePairEventType( return IceCandidatePairEventType::kCheckSent; } -// Return default values for header extensions, to use on streams without stored -// mapping data. Currently this only applies to audio streams, since the mapping -// is not stored in the event log. -// TODO(ivoc): Remove this once this mapping is stored in the event log for -// audio streams. Tracking bug: webrtc:6399 -webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap() { - webrtc::RtpHeaderExtensionMap default_map; - default_map.Register(webrtc::RtpExtension::kAudioLevelDefaultId); - default_map.Register( - webrtc::RtpExtension::kTimestampOffsetDefaultId); - default_map.Register( - webrtc::RtpExtension::kAbsSendTimeDefaultId); - default_map.Register( - webrtc::RtpExtension::kVideoRotationDefaultId); - default_map.Register( - webrtc::RtpExtension::kVideoContentTypeDefaultId); - default_map.Register( - webrtc::RtpExtension::kVideoTimingDefaultId); - default_map.Register( - webrtc::RtpExtension::kFrameMarkingDefaultId); - default_map.Register( - webrtc::RtpExtension::kTransportSequenceNumberDefaultId); - default_map.Register( - webrtc::RtpExtension::kPlayoutDelayDefaultId); - return default_map; -} - std::pair ParseVarInt( std::istream& stream) { // no-presubmit-check TODO(webrtc:8982) uint64_t varint = 0; @@ -328,6 +301,32 @@ ParsedRtcEventLogNew::LoggedRtpStreamView::LoggedRtpStreamView( ParsedRtcEventLogNew::LoggedRtpStreamView::LoggedRtpStreamView( const LoggedRtpStreamView&) = default; +// Return default values for header extensions, to use on streams without stored +// mapping data. Currently this only applies to audio streams, since the mapping +// is not stored in the event log. +// TODO(ivoc): Remove this once this mapping is stored in the event log for +// audio streams. Tracking bug: webrtc:6399 +webrtc::RtpHeaderExtensionMap +ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap() { + webrtc::RtpHeaderExtensionMap default_map; + default_map.Register(webrtc::RtpExtension::kAudioLevelDefaultId); + default_map.Register( + webrtc::RtpExtension::kTimestampOffsetDefaultId); + default_map.Register( + webrtc::RtpExtension::kAbsSendTimeDefaultId); + default_map.Register( + webrtc::RtpExtension::kVideoRotationDefaultId); + default_map.Register( + webrtc::RtpExtension::kVideoContentTypeDefaultId); + default_map.Register( + webrtc::RtpExtension::kVideoTimingDefaultId); + default_map.Register( + webrtc::RtpExtension::kTransportSequenceNumberDefaultId); + default_map.Register( + webrtc::RtpExtension::kPlayoutDelayDefaultId); + return default_map; +} + ParsedRtcEventLogNew::ParsedRtcEventLogNew( UnconfiguredHeaderExtensions parse_unconfigured_header_extensions) : parse_unconfigured_header_extensions_( diff --git a/logging/rtc_event_log/rtc_event_log_parser_new.h b/logging/rtc_event_log/rtc_event_log_parser_new.h index bf35ae9440..8f7cc8fdcf 100644 --- a/logging/rtc_event_log/rtc_event_log_parser_new.h +++ b/logging/rtc_event_log/rtc_event_log_parser_new.h @@ -49,6 +49,11 @@ namespace webrtc { enum class BandwidthUsage; struct AudioEncoderRuntimeConfig; +// The different event types are deliberately POD. Analysis of large logs is +// already resource intensive. The code simplifications that would be possible +// possible by having a base class (containing e.g. the log time) are not +// considered to outweigh the added memory and runtime overhead incurred by +// adding a vptr. struct LoggedAlrStateEvent { int64_t timestamp_us; bool in_alr; @@ -244,7 +249,7 @@ struct LoggedRtcpPacketTransportFeedback { }; struct LoggedStartEvent { - explicit LoggedStartEvent(uint64_t timestamp_us) + explicit LoggedStartEvent(int64_t timestamp_us) : timestamp_us(timestamp_us) {} int64_t timestamp_us; int64_t log_time_us() const { return timestamp_us; } @@ -252,8 +257,7 @@ struct LoggedStartEvent { }; struct LoggedStopEvent { - explicit LoggedStopEvent(uint64_t timestamp_us) - : timestamp_us(timestamp_us) {} + explicit LoggedStopEvent(int64_t timestamp_us) : timestamp_us(timestamp_us) {} int64_t timestamp_us; int64_t log_time_us() const { return timestamp_us; } int64_t log_time_ms() const { return timestamp_us / 1000; } @@ -531,6 +535,8 @@ class ParsedRtcEventLogNew { PacketView packet_view; }; + static webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap(); + explicit ParsedRtcEventLogNew( UnconfiguredHeaderExtensions parse_unconfigured_header_extensions = UnconfiguredHeaderExtensions::kDontParse); diff --git a/logging/rtc_event_log/rtc_event_processor.h b/logging/rtc_event_log/rtc_event_processor.h new file mode 100644 index 0000000000..8bd81d32ab --- /dev/null +++ b/logging/rtc_event_log/rtc_event_processor.h @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_ +#define LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_ + +#include +#include +#include +#include + +#include "rtc_base/function_view.h" + +namespace webrtc { + +// This file contains helper class used to process the elements of two or more +// sorted lists in timestamp order. The effect is the same as doing a merge step +// in the merge-sort algorithm but without copying the elements or modifying the +// lists. + +// Interface to allow "merging" lists of different types. ProcessNext() +// processes the next unprocesses element in the list. IsEmpty() checks if all +// elements have been processed. GetNextTime returns the timestamp of the next +// unprocessed element. +class ProcessableEventListInterface { + public: + virtual ~ProcessableEventListInterface() = default; + virtual void ProcessNext() = 0; + virtual bool IsEmpty() const = 0; + virtual int64_t GetNextTime() const = 0; +}; + +// ProcessableEventList encapsulates a list of events and a function that will +// be applied to each element of the list. +template +class ProcessableEventList : public ProcessableEventListInterface { + public: + // N.B. |f| is not owned by ProcessableEventList. The caller must ensure that + // the function object or lambda outlives ProcessableEventList and + // RtcEventProcessor. The same thing applies to the iterators (begin, end); + // the vector must outlive ProcessableEventList and must not be modified until + // processing has finished. + ProcessableEventList(typename std::vector::const_iterator begin, + typename std::vector::const_iterator end, + rtc::FunctionView f) + : begin_(begin), end_(end), f_(f) {} + + void ProcessNext() override { + RTC_DCHECK(!IsEmpty()); + f_(*begin_); + ++begin_; + } + + bool IsEmpty() const override { return begin_ == end_; } + + int64_t GetNextTime() const override { + RTC_DCHECK(!IsEmpty()); + return begin_->log_time_us(); + } + + private: + typename std::vector::const_iterator begin_; + typename std::vector::const_iterator end_; + rtc::FunctionView f_; +}; + +// Helper class used to "merge" two or more lists of ordered RtcEventLog events +// so that they can be treated as a single ordered list. Since the individual +// lists may have different types, we need to access the lists via pointers to +// the common base class. +// +// Usage example: +// ParsedRtcEventLogNew log; +// auto incoming_handler = [] (LoggedRtcpPacketIncoming elem) { ... }; +// auto incoming_rtcp = +// absl::make_unique>( +// log.incoming_rtcp_packets().begin(), +// log.incoming_rtcp_packets().end(), +// incoming_handler); +// auto outgoing_handler = [] (LoggedRtcpPacketOutgoing elem) { ... }; +// auto outgoing_rtcp = +// absl::make_unique>( +// log.outgoing_rtcp_packets().begin(), +// log.outgoing_rtcp_packets().end(), +// outgoing_handler); +// +// RtcEventProcessor processor; +// processor.AddEvents(std::move(incoming_rtcp)); +// processor.AddEvents(std::move(outgoing_rtcp)); +// processor.ProcessEventsInOrder(); +class RtcEventProcessor { + public: + // The elements of each list is processed in the index order. To process all + // elements in all lists in timestamp order, each lists need to be sorted in + // timestamp order prior to insertion. Otherwise, + void AddEvents(std::unique_ptr events) { + if (!events->IsEmpty()) { + event_lists_.push_back(std::move(events)); + std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp); + } + } + + void ProcessEventsInOrder() { + // |event_lists_| is a min-heap of lists ordered by the timestamp of the + // first element in the list. We therefore process the first element of the + // first list, then reinsert the remainder of that list into the heap + // if the list still contains unprocessed elements. + while (!event_lists_.empty()) { + event_lists_.front()->ProcessNext(); + std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp); + if (event_lists_.back()->IsEmpty()) { + event_lists_.pop_back(); + } else { + std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp); + } + } + } + + private: + using ListPtrType = std::unique_ptr; + std::vector event_lists_; + // Comparison function to make |event_lists_| into a min heap. + static bool Cmp(const ListPtrType& a, const ListPtrType& b) { + return a->GetNextTime() > b->GetNextTime(); + } +}; + +} // namespace webrtc + +#endif // LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_ diff --git a/logging/rtc_event_log/rtc_event_processor_unittest.cc b/logging/rtc_event_log/rtc_event_processor_unittest.cc new file mode 100644 index 0000000000..f2764e7ad9 --- /dev/null +++ b/logging/rtc_event_log/rtc_event_processor_unittest.cc @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "logging/rtc_event_log/rtc_event_processor.h" + +#include +#include + +#include "absl/memory/memory.h" +#include "logging/rtc_event_log/rtc_event_log_parser_new.h" +#include "rtc_base/checks.h" +#include "rtc_base/random.h" +#include "test/gtest.h" + +namespace webrtc { + +namespace { +std::vector CreateEventList( + std::initializer_list timestamp_list) { + std::vector v; + for (int64_t timestamp_ms : timestamp_list) { + v.emplace_back(timestamp_ms * 1000); // Convert ms to us. + } + return v; +} + +using OrderedEventView = ProcessableEventList; + +std::vector> +CreateRandomEventLists(size_t num_lists, size_t num_elements, uint64_t seed) { + Random prng(seed); + std::vector> lists(num_lists); + for (size_t elem = 0; elem < num_elements; elem++) { + uint32_t i = prng.Rand(0u, num_lists - 1); + int64_t timestamp_ms = elem; + lists[i].emplace_back(timestamp_ms * 1000); + } + return lists; +} +} // namespace + +TEST(RtcEventProcessor, NoList) { + RtcEventProcessor processor; + processor.ProcessEventsInOrder(); // Don't crash but do nothing. +} + +TEST(RtcEventProcessor, EmptyList) { + auto not_called = [](LoggedStartEvent /*elem*/) { EXPECT_TRUE(false); }; + std::vector events; + RtcEventProcessor processor; + + processor.AddEvents(absl::make_unique( + events.begin(), events.end(), not_called)); + processor.ProcessEventsInOrder(); // Don't crash but do nothing. +} + +TEST(RtcEventProcessor, OneList) { + std::vector result; + auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); }; + + std::vector events(CreateEventList({1, 2, 3, 4})); + RtcEventProcessor processor; + processor.AddEvents( + absl::make_unique(events.begin(), events.end(), f)); + processor.ProcessEventsInOrder(); + + std::vector expected_results{1, 2, 3, 4}; + ASSERT_EQ(result.size(), expected_results.size()); + for (size_t i = 0; i < expected_results.size(); i++) { + EXPECT_EQ(result[i].log_time_ms(), expected_results[i]); + } +} + +TEST(RtcEventProcessor, MergeTwoLists) { + std::vector result; + auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); }; + + std::vector events1(CreateEventList({1, 2, 4, 7, 8, 9})); + std::vector events2(CreateEventList({3, 5, 6, 10})); + RtcEventProcessor processor; + processor.AddEvents( + absl::make_unique(events1.begin(), events1.end(), f)); + processor.AddEvents( + absl::make_unique(events2.begin(), events2.end(), f)); + processor.ProcessEventsInOrder(); + + std::vector expected_results{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + ASSERT_EQ(result.size(), expected_results.size()); + for (size_t i = 0; i < expected_results.size(); i++) { + EXPECT_EQ(result[i].log_time_ms(), expected_results[i]); + } +} + +TEST(RtcEventProcessor, MergeTwoListsWithDuplicatedElements) { + std::vector result; + auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); }; + + std::vector events1(CreateEventList({1, 2, 2, 3, 5, 5})); + std::vector events2(CreateEventList({1, 3, 4, 4})); + RtcEventProcessor processor; + processor.AddEvents( + absl::make_unique(events1.begin(), events1.end(), f)); + processor.AddEvents( + absl::make_unique(events2.begin(), events2.end(), f)); + processor.ProcessEventsInOrder(); + + std::vector expected_results{1, 1, 2, 2, 3, 3, 4, 4, 5, 5}; + ASSERT_EQ(result.size(), expected_results.size()); + for (size_t i = 0; i < expected_results.size(); i++) { + EXPECT_EQ(result[i].log_time_ms(), expected_results[i]); + } +} + +TEST(RtcEventProcessor, MergeManyLists) { + std::vector result; + auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); }; + + constexpr size_t kNumLists = 5; + constexpr size_t kNumElems = 30; + constexpr uint64_t kSeed = 0xF3C6B91F; + std::vector> lists( + CreateRandomEventLists(kNumLists, kNumElems, kSeed)); + RTC_DCHECK_EQ(lists.size(), kNumLists); + RtcEventProcessor processor; + for (const auto& list : lists) { + processor.AddEvents( + absl::make_unique(list.begin(), list.end(), f)); + } + processor.ProcessEventsInOrder(); + + std::vector expected_results(kNumElems); + std::iota(expected_results.begin(), expected_results.end(), 0); + ASSERT_EQ(result.size(), expected_results.size()); + for (size_t i = 0; i < expected_results.size(); i++) { + EXPECT_EQ(result[i].log_time_ms(), expected_results[i]); + } +} + +TEST(RtcEventProcessor, DifferentTypes) { + std::vector result; + auto f1 = [&result](LoggedStartEvent elem) { + result.push_back(elem.log_time_ms()); + }; + auto f2 = [&result](LoggedStopEvent elem) { + result.push_back(elem.log_time_ms()); + }; + + std::vector events1{LoggedStartEvent(2000)}; + std::vector events2{LoggedStopEvent(1000)}; + RtcEventProcessor processor; + processor.AddEvents(absl::make_unique>( + events1.begin(), events1.end(), f1)); + processor.AddEvents(absl::make_unique>( + events2.begin(), events2.end(), f2)); + processor.ProcessEventsInOrder(); + + std::vector expected_results{1, 2}; + ASSERT_EQ(result.size(), expected_results.size()); + for (size_t i = 0; i < expected_results.size(); i++) { + EXPECT_EQ(result[i], expected_results[i]); + } +} + +} // namespace webrtc