Add helper class to process RtcEventLog events in order.

Add helper class to process RtcEventLog events in order.
Use helper class to migrate rtc_event_log2rtp_dump.cc
to new parser API.

Bug: webrtc:8111
Change-Id: I7cbc220dad1f50be3a985ed44de27b38e5f20476
Reviewed-on: https://webrtc-review.googlesource.com/98601
Commit-Queue: Björn Terelius <terelius@webrtc.org>
Reviewed-by: Elad Alon <eladalon@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24806}
This commit is contained in:
Bjorn Terelius 2018-09-24 17:10:27 +02:00 committed by Commit Bot
parent 9931ddbbc8
commit 60d629f3fc
6 changed files with 477 additions and 109 deletions

View File

@ -266,6 +266,7 @@ if (rtc_enable_protobuf) {
"rtc_event_log/rtc_event_log_parser.h",
"rtc_event_log/rtc_event_log_parser_new.cc",
"rtc_event_log/rtc_event_log_parser_new.h",
"rtc_event_log/rtc_event_processor.h",
]
deps = [
@ -300,6 +301,7 @@ if (rtc_enable_protobuf) {
"rtc_event_log/rtc_event_log_unittest.cc",
"rtc_event_log/rtc_event_log_unittest_helper.cc",
"rtc_event_log/rtc_event_log_unittest_helper.h",
"rtc_event_log/rtc_event_processor_unittest.cc",
]
deps = [
":ice_log",
@ -351,6 +353,8 @@ if (rtc_enable_protobuf) {
"../system_wrappers:field_trial_default",
"../system_wrappers:metrics_default",
"../test:rtp_test_utils",
"//third_party/abseil-cpp/absl/memory:memory",
"//third_party/abseil-cpp/absl/types:optional",
]
if (!build_with_chromium && is_clang) {
# Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).

View File

@ -15,9 +15,14 @@
#include <sstream>
#include <string>
#include "absl/memory/memory.h"
#include "absl/types/optional.h"
#include "logging/rtc_event_log/rtc_event_log.h"
#include "logging/rtc_event_log/rtc_event_log_parser_new.h"
#include "logging/rtc_event_log/rtc_event_processor.h"
#include "modules/rtp_rtcp/source/byte_io.h"
#include "modules/rtp_rtcp/source/rtp_header_extensions.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "modules/rtp_rtcp/source/rtp_utility.h"
#include "rtc_base/checks.h"
#include "rtc_base/flags.h"
@ -59,8 +64,9 @@ DEFINE_bool(help, false, "Prints this message.");
// The empty string must be validated as true, because it is the default value
// of the command-line flag. In this case, no value is written to the output
// variable.
bool ParseSsrc(std::string str, uint32_t* ssrc) {
absl::optional<uint32_t> ParseSsrc(std::string str) {
// If the input string starts with 0x or 0X it indicates a hexadecimal number.
uint32_t ssrc;
auto read_mode = std::dec;
if (str.size() > 2 &&
(str.substr(0, 2) == "0x" || str.substr(0, 2) == "0X")) {
@ -68,8 +74,79 @@ bool ParseSsrc(std::string str, uint32_t* ssrc) {
str = str.substr(2);
}
std::stringstream ss(str);
ss >> read_mode >> *ssrc;
return str.empty() || (!ss.fail() && ss.eof());
ss >> read_mode >> ssrc;
if (str.empty() || (!ss.fail() && ss.eof()))
return ssrc;
return absl::nullopt;
}
bool ShouldSkipStream(MediaType media_type,
uint32_t ssrc,
absl::optional<uint32_t> ssrc_filter) {
if (!FLAG_audio && media_type == MediaType::AUDIO)
return true;
if (!FLAG_video && media_type == MediaType::VIDEO)
return true;
if (!FLAG_data && media_type == MediaType::DATA)
return true;
if (ssrc_filter.has_value() && ssrc != *ssrc_filter)
return true;
return false;
}
// Convert a LoggedRtpPacketIncoming to a test::RtpPacket. Header extension IDs
// are allocated according to the provided extension map. This might not match
// the extension map used in the actual call.
void ConvertRtpPacket(const webrtc::LoggedRtpPacketIncoming& incoming,
const webrtc::RtpHeaderExtensionMap default_extension_map,
webrtc::test::RtpPacket* packet) {
webrtc::RtpPacket reconstructed_packet(&default_extension_map);
reconstructed_packet.SetMarker(incoming.rtp.header.markerBit);
reconstructed_packet.SetPayloadType(incoming.rtp.header.payloadType);
reconstructed_packet.SetSequenceNumber(incoming.rtp.header.sequenceNumber);
reconstructed_packet.SetTimestamp(incoming.rtp.header.timestamp);
reconstructed_packet.SetSsrc(incoming.rtp.header.ssrc);
if (incoming.rtp.header.numCSRCs > 0) {
reconstructed_packet.SetCsrcs(rtc::ArrayView<const uint32_t>(
incoming.rtp.header.arrOfCSRCs, incoming.rtp.header.numCSRCs));
}
// Set extensions.
if (incoming.rtp.header.extension.hasTransmissionTimeOffset)
reconstructed_packet.SetExtension<webrtc::TransmissionOffset>(
incoming.rtp.header.extension.transmissionTimeOffset);
if (incoming.rtp.header.extension.hasAbsoluteSendTime)
reconstructed_packet.SetExtension<webrtc::AbsoluteSendTime>(
incoming.rtp.header.extension.absoluteSendTime);
if (incoming.rtp.header.extension.hasTransportSequenceNumber)
reconstructed_packet.SetExtension<webrtc::TransportSequenceNumber>(
incoming.rtp.header.extension.transportSequenceNumber);
if (incoming.rtp.header.extension.hasAudioLevel)
reconstructed_packet.SetExtension<webrtc::AudioLevel>(
incoming.rtp.header.extension.voiceActivity,
incoming.rtp.header.extension.audioLevel);
if (incoming.rtp.header.extension.hasVideoRotation)
reconstructed_packet.SetExtension<webrtc::VideoOrientation>(
incoming.rtp.header.extension.videoRotation);
if (incoming.rtp.header.extension.hasVideoContentType)
reconstructed_packet.SetExtension<webrtc::VideoContentTypeExtension>(
incoming.rtp.header.extension.videoContentType);
if (incoming.rtp.header.extension.has_video_timing)
reconstructed_packet.SetExtension<webrtc::VideoTimingExtension>(
incoming.rtp.header.extension.video_timing);
RTC_DCHECK_EQ(reconstructed_packet.size(), incoming.rtp.header_length);
RTC_DCHECK_EQ(reconstructed_packet.headers_size(),
incoming.rtp.header_length);
memcpy(packet->data, reconstructed_packet.data(),
reconstructed_packet.headers_size());
packet->length = reconstructed_packet.headers_size();
packet->original_length = incoming.rtp.total_length;
packet->time_ms = incoming.log_time_ms();
// Set padding bit.
if (incoming.rtp.header.paddingLength > 0)
packet->data[0] = packet->data[0] | 0x20;
}
} // namespace
@ -97,10 +174,11 @@ int main(int argc, char* argv[]) {
std::string input_file = argv[1];
std::string output_file = argv[2];
uint32_t ssrc_filter = 0;
if (strlen(FLAG_ssrc) > 0)
RTC_CHECK(ParseSsrc(FLAG_ssrc, &ssrc_filter))
<< "Flag verification has failed.";
absl::optional<uint32_t> ssrc_filter;
if (strlen(FLAG_ssrc) > 0) {
ssrc_filter = ParseSsrc(FLAG_ssrc);
RTC_CHECK(ssrc_filter.has_value()) << "Failed to read SSRC filter flag.";
}
webrtc::ParsedRtcEventLogNew parsed_stream;
if (!parsed_stream.ParseFile(input_file)) {
@ -122,84 +200,57 @@ int main(int argc, char* argv[]) {
<< " events in the input file." << std::endl;
int rtp_counter = 0, rtcp_counter = 0;
bool header_only = false;
for (size_t i = 0; i < parsed_stream.GetNumberOfEvents(); i++) {
// The parsed_stream will assert if the protobuf event is missing
// some required fields and we attempt to access them. We could consider
// a softer failure option, but it does not seem useful to generate
// RTP dumps based on broken event logs.
if (FLAG_rtp && parsed_stream.GetEventType(i) ==
webrtc::ParsedRtcEventLogNew::EventType::RTP_EVENT) {
webrtc::test::RtpPacket packet;
webrtc::PacketDirection direction;
parsed_stream.GetRtpHeader(i, &direction, packet.data, &packet.length,
&packet.original_length, nullptr);
if (packet.original_length > packet.length)
header_only = true;
packet.time_ms = parsed_stream.GetTimestamp(i) / 1000;
webrtc::RtpUtility::RtpHeaderParser rtp_parser(packet.data,
packet.length);
webrtc::RtpHeaderExtensionMap default_extension_map =
webrtc::ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap();
auto handle_rtp = [&default_extension_map, &rtp_writer, &rtp_counter](
const webrtc::LoggedRtpPacketIncoming& incoming) {
webrtc::test::RtpPacket packet;
ConvertRtpPacket(incoming, default_extension_map, &packet);
// TODO(terelius): Maybe add a flag to dump outgoing traffic instead?
if (direction == webrtc::kOutgoingPacket)
continue;
rtp_writer->WritePacket(&packet);
rtp_counter++;
};
webrtc::RTPHeader parsed_header;
rtp_parser.Parse(&parsed_header);
MediaType media_type =
parsed_stream.GetMediaType(parsed_header.ssrc, direction);
if (!FLAG_audio && media_type == MediaType::AUDIO)
continue;
if (!FLAG_video && media_type == MediaType::VIDEO)
continue;
if (!FLAG_data && media_type == MediaType::DATA)
continue;
if (strlen(FLAG_ssrc) > 0) {
const uint32_t packet_ssrc =
webrtc::ByteReader<uint32_t>::ReadBigEndian(
reinterpret_cast<const uint8_t*>(packet.data + 8));
if (packet_ssrc != ssrc_filter)
continue;
}
auto handle_rtcp = [&rtp_writer, &rtcp_counter](
const webrtc::LoggedRtcpPacketIncoming& incoming) {
webrtc::test::RtpPacket packet;
memcpy(packet.data, incoming.rtcp.raw_data.data(),
incoming.rtcp.raw_data.size());
packet.length = incoming.rtcp.raw_data.size();
// For RTCP packets the original_length should be set to 0 in the
// RTPdump format.
packet.original_length = 0;
packet.time_ms = incoming.log_time_ms();
rtp_writer->WritePacket(&packet);
rtp_counter++;
}
if (FLAG_rtcp && parsed_stream.GetEventType(i) ==
webrtc::ParsedRtcEventLogNew::EventType::RTCP_EVENT) {
webrtc::test::RtpPacket packet;
webrtc::PacketDirection direction;
parsed_stream.GetRtcpPacket(i, &direction, packet.data, &packet.length);
// For RTCP packets the original_length should be set to 0 in the
// RTPdump format.
packet.original_length = 0;
packet.time_ms = parsed_stream.GetTimestamp(i) / 1000;
rtp_writer->WritePacket(&packet);
rtcp_counter++;
};
// TODO(terelius): Maybe add a flag to dump outgoing traffic instead?
if (direction == webrtc::kOutgoingPacket)
continue;
// Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain
// report blocks for many streams, thus several SSRCs and they doen't
// necessarily have to be of the same media type.
const uint32_t packet_ssrc = webrtc::ByteReader<uint32_t>::ReadBigEndian(
reinterpret_cast<const uint8_t*>(packet.data + 4));
MediaType media_type = parsed_stream.GetMediaType(packet_ssrc, direction);
if (!FLAG_audio && media_type == MediaType::AUDIO)
continue;
if (!FLAG_video && media_type == MediaType::VIDEO)
continue;
if (!FLAG_data && media_type == MediaType::DATA)
continue;
if (strlen(FLAG_ssrc) > 0) {
if (packet_ssrc != ssrc_filter)
continue;
}
rtp_writer->WritePacket(&packet);
rtcp_counter++;
}
webrtc::RtcEventProcessor event_processor;
for (const auto& stream : parsed_stream.incoming_rtp_packets_by_ssrc()) {
MediaType media_type =
parsed_stream.GetMediaType(stream.ssrc, webrtc::kIncomingPacket);
if (ShouldSkipStream(media_type, stream.ssrc, ssrc_filter))
continue;
auto rtp_view = absl::make_unique<
webrtc::ProcessableEventList<webrtc::LoggedRtpPacketIncoming>>(
stream.incoming_packets.begin(), stream.incoming_packets.end(),
handle_rtp);
event_processor.AddEvents(std::move(rtp_view));
}
// Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain
// report blocks for many streams, thus several SSRCs and they don't
// necessarily have to be of the same media type. We therefore don't
// support filtering of RTCP based on SSRC and media type.
auto rtcp_view = absl::make_unique<
webrtc::ProcessableEventList<webrtc::LoggedRtcpPacketIncoming>>(
parsed_stream.incoming_rtcp_packets().begin(),
parsed_stream.incoming_rtcp_packets().end(), handle_rtcp);
event_processor.AddEvents(std::move(rtcp_view));
event_processor.ProcessEventsInOrder();
std::cout << "Wrote " << rtp_counter << (header_only ? " header-only" : "")
<< " RTP packets and " << rtcp_counter << " RTCP packets to the "
<< "output file." << std::endl;

View File

@ -210,33 +210,6 @@ IceCandidatePairEventType GetRuntimeIceCandidatePairEventType(
return IceCandidatePairEventType::kCheckSent;
}
// Return default values for header extensions, to use on streams without stored
// mapping data. Currently this only applies to audio streams, since the mapping
// is not stored in the event log.
// TODO(ivoc): Remove this once this mapping is stored in the event log for
// audio streams. Tracking bug: webrtc:6399
webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap() {
webrtc::RtpHeaderExtensionMap default_map;
default_map.Register<AudioLevel>(webrtc::RtpExtension::kAudioLevelDefaultId);
default_map.Register<TransmissionOffset>(
webrtc::RtpExtension::kTimestampOffsetDefaultId);
default_map.Register<AbsoluteSendTime>(
webrtc::RtpExtension::kAbsSendTimeDefaultId);
default_map.Register<VideoOrientation>(
webrtc::RtpExtension::kVideoRotationDefaultId);
default_map.Register<VideoContentTypeExtension>(
webrtc::RtpExtension::kVideoContentTypeDefaultId);
default_map.Register<VideoTimingExtension>(
webrtc::RtpExtension::kVideoTimingDefaultId);
default_map.Register<FrameMarkingExtension>(
webrtc::RtpExtension::kFrameMarkingDefaultId);
default_map.Register<TransportSequenceNumber>(
webrtc::RtpExtension::kTransportSequenceNumberDefaultId);
default_map.Register<PlayoutDelayLimits>(
webrtc::RtpExtension::kPlayoutDelayDefaultId);
return default_map;
}
std::pair<uint64_t, bool> ParseVarInt(
std::istream& stream) { // no-presubmit-check TODO(webrtc:8982)
uint64_t varint = 0;
@ -328,6 +301,32 @@ ParsedRtcEventLogNew::LoggedRtpStreamView::LoggedRtpStreamView(
ParsedRtcEventLogNew::LoggedRtpStreamView::LoggedRtpStreamView(
const LoggedRtpStreamView&) = default;
// Return default values for header extensions, to use on streams without stored
// mapping data. Currently this only applies to audio streams, since the mapping
// is not stored in the event log.
// TODO(ivoc): Remove this once this mapping is stored in the event log for
// audio streams. Tracking bug: webrtc:6399
webrtc::RtpHeaderExtensionMap
ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap() {
webrtc::RtpHeaderExtensionMap default_map;
default_map.Register<AudioLevel>(webrtc::RtpExtension::kAudioLevelDefaultId);
default_map.Register<TransmissionOffset>(
webrtc::RtpExtension::kTimestampOffsetDefaultId);
default_map.Register<AbsoluteSendTime>(
webrtc::RtpExtension::kAbsSendTimeDefaultId);
default_map.Register<VideoOrientation>(
webrtc::RtpExtension::kVideoRotationDefaultId);
default_map.Register<VideoContentTypeExtension>(
webrtc::RtpExtension::kVideoContentTypeDefaultId);
default_map.Register<VideoTimingExtension>(
webrtc::RtpExtension::kVideoTimingDefaultId);
default_map.Register<TransportSequenceNumber>(
webrtc::RtpExtension::kTransportSequenceNumberDefaultId);
default_map.Register<PlayoutDelayLimits>(
webrtc::RtpExtension::kPlayoutDelayDefaultId);
return default_map;
}
ParsedRtcEventLogNew::ParsedRtcEventLogNew(
UnconfiguredHeaderExtensions parse_unconfigured_header_extensions)
: parse_unconfigured_header_extensions_(

View File

@ -49,6 +49,11 @@ namespace webrtc {
enum class BandwidthUsage;
struct AudioEncoderRuntimeConfig;
// The different event types are deliberately POD. Analysis of large logs is
// already resource intensive. The code simplifications that would be possible
// possible by having a base class (containing e.g. the log time) are not
// considered to outweigh the added memory and runtime overhead incurred by
// adding a vptr.
struct LoggedAlrStateEvent {
int64_t timestamp_us;
bool in_alr;
@ -244,7 +249,7 @@ struct LoggedRtcpPacketTransportFeedback {
};
struct LoggedStartEvent {
explicit LoggedStartEvent(uint64_t timestamp_us)
explicit LoggedStartEvent(int64_t timestamp_us)
: timestamp_us(timestamp_us) {}
int64_t timestamp_us;
int64_t log_time_us() const { return timestamp_us; }
@ -252,8 +257,7 @@ struct LoggedStartEvent {
};
struct LoggedStopEvent {
explicit LoggedStopEvent(uint64_t timestamp_us)
: timestamp_us(timestamp_us) {}
explicit LoggedStopEvent(int64_t timestamp_us) : timestamp_us(timestamp_us) {}
int64_t timestamp_us;
int64_t log_time_us() const { return timestamp_us; }
int64_t log_time_ms() const { return timestamp_us / 1000; }
@ -531,6 +535,8 @@ class ParsedRtcEventLogNew {
PacketView<const LoggedRtpPacket> packet_view;
};
static webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap();
explicit ParsedRtcEventLogNew(
UnconfiguredHeaderExtensions parse_unconfigured_header_extensions =
UnconfiguredHeaderExtensions::kDontParse);

View File

@ -0,0 +1,137 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
#define LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
#include <algorithm>
#include <memory>
#include <utility>
#include <vector>
#include "rtc_base/function_view.h"
namespace webrtc {
// This file contains helper class used to process the elements of two or more
// sorted lists in timestamp order. The effect is the same as doing a merge step
// in the merge-sort algorithm but without copying the elements or modifying the
// lists.
// Interface to allow "merging" lists of different types. ProcessNext()
// processes the next unprocesses element in the list. IsEmpty() checks if all
// elements have been processed. GetNextTime returns the timestamp of the next
// unprocessed element.
class ProcessableEventListInterface {
public:
virtual ~ProcessableEventListInterface() = default;
virtual void ProcessNext() = 0;
virtual bool IsEmpty() const = 0;
virtual int64_t GetNextTime() const = 0;
};
// ProcessableEventList encapsulates a list of events and a function that will
// be applied to each element of the list.
template <typename T>
class ProcessableEventList : public ProcessableEventListInterface {
public:
// N.B. |f| is not owned by ProcessableEventList. The caller must ensure that
// the function object or lambda outlives ProcessableEventList and
// RtcEventProcessor. The same thing applies to the iterators (begin, end);
// the vector must outlive ProcessableEventList and must not be modified until
// processing has finished.
ProcessableEventList(typename std::vector<T>::const_iterator begin,
typename std::vector<T>::const_iterator end,
rtc::FunctionView<void(const T&)> f)
: begin_(begin), end_(end), f_(f) {}
void ProcessNext() override {
RTC_DCHECK(!IsEmpty());
f_(*begin_);
++begin_;
}
bool IsEmpty() const override { return begin_ == end_; }
int64_t GetNextTime() const override {
RTC_DCHECK(!IsEmpty());
return begin_->log_time_us();
}
private:
typename std::vector<T>::const_iterator begin_;
typename std::vector<T>::const_iterator end_;
rtc::FunctionView<void(const T&)> f_;
};
// Helper class used to "merge" two or more lists of ordered RtcEventLog events
// so that they can be treated as a single ordered list. Since the individual
// lists may have different types, we need to access the lists via pointers to
// the common base class.
//
// Usage example:
// ParsedRtcEventLogNew log;
// auto incoming_handler = [] (LoggedRtcpPacketIncoming elem) { ... };
// auto incoming_rtcp =
// absl::make_unique<ProcessableEventList<LoggedRtcpPacketIncoming>>(
// log.incoming_rtcp_packets().begin(),
// log.incoming_rtcp_packets().end(),
// incoming_handler);
// auto outgoing_handler = [] (LoggedRtcpPacketOutgoing elem) { ... };
// auto outgoing_rtcp =
// absl::make_unique<ProcessableEventList<LoggedRtcpPacketOutgoing>>(
// log.outgoing_rtcp_packets().begin(),
// log.outgoing_rtcp_packets().end(),
// outgoing_handler);
//
// RtcEventProcessor processor;
// processor.AddEvents(std::move(incoming_rtcp));
// processor.AddEvents(std::move(outgoing_rtcp));
// processor.ProcessEventsInOrder();
class RtcEventProcessor {
public:
// The elements of each list is processed in the index order. To process all
// elements in all lists in timestamp order, each lists need to be sorted in
// timestamp order prior to insertion. Otherwise,
void AddEvents(std::unique_ptr<ProcessableEventListInterface> events) {
if (!events->IsEmpty()) {
event_lists_.push_back(std::move(events));
std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
}
}
void ProcessEventsInOrder() {
// |event_lists_| is a min-heap of lists ordered by the timestamp of the
// first element in the list. We therefore process the first element of the
// first list, then reinsert the remainder of that list into the heap
// if the list still contains unprocessed elements.
while (!event_lists_.empty()) {
event_lists_.front()->ProcessNext();
std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp);
if (event_lists_.back()->IsEmpty()) {
event_lists_.pop_back();
} else {
std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
}
}
}
private:
using ListPtrType = std::unique_ptr<ProcessableEventListInterface>;
std::vector<ListPtrType> event_lists_;
// Comparison function to make |event_lists_| into a min heap.
static bool Cmp(const ListPtrType& a, const ListPtrType& b) {
return a->GetNextTime() > b->GetNextTime();
}
};
} // namespace webrtc
#endif // LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_

View File

@ -0,0 +1,171 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "logging/rtc_event_log/rtc_event_processor.h"
#include <initializer_list>
#include <numeric>
#include "absl/memory/memory.h"
#include "logging/rtc_event_log/rtc_event_log_parser_new.h"
#include "rtc_base/checks.h"
#include "rtc_base/random.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
std::vector<LoggedStartEvent> CreateEventList(
std::initializer_list<int64_t> timestamp_list) {
std::vector<LoggedStartEvent> v;
for (int64_t timestamp_ms : timestamp_list) {
v.emplace_back(timestamp_ms * 1000); // Convert ms to us.
}
return v;
}
using OrderedEventView = ProcessableEventList<LoggedStartEvent>;
std::vector<std::vector<LoggedStartEvent>>
CreateRandomEventLists(size_t num_lists, size_t num_elements, uint64_t seed) {
Random prng(seed);
std::vector<std::vector<LoggedStartEvent>> lists(num_lists);
for (size_t elem = 0; elem < num_elements; elem++) {
uint32_t i = prng.Rand(0u, num_lists - 1);
int64_t timestamp_ms = elem;
lists[i].emplace_back(timestamp_ms * 1000);
}
return lists;
}
} // namespace
TEST(RtcEventProcessor, NoList) {
RtcEventProcessor processor;
processor.ProcessEventsInOrder(); // Don't crash but do nothing.
}
TEST(RtcEventProcessor, EmptyList) {
auto not_called = [](LoggedStartEvent /*elem*/) { EXPECT_TRUE(false); };
std::vector<LoggedStartEvent> events;
RtcEventProcessor processor;
processor.AddEvents(absl::make_unique<OrderedEventView>(
events.begin(), events.end(), not_called));
processor.ProcessEventsInOrder(); // Don't crash but do nothing.
}
TEST(RtcEventProcessor, OneList) {
std::vector<LoggedStartEvent> result;
auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
std::vector<LoggedStartEvent> events(CreateEventList({1, 2, 3, 4}));
RtcEventProcessor processor;
processor.AddEvents(
absl::make_unique<OrderedEventView>(events.begin(), events.end(), f));
processor.ProcessEventsInOrder();
std::vector<int64_t> expected_results{1, 2, 3, 4};
ASSERT_EQ(result.size(), expected_results.size());
for (size_t i = 0; i < expected_results.size(); i++) {
EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
}
}
TEST(RtcEventProcessor, MergeTwoLists) {
std::vector<LoggedStartEvent> result;
auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 4, 7, 8, 9}));
std::vector<LoggedStartEvent> events2(CreateEventList({3, 5, 6, 10}));
RtcEventProcessor processor;
processor.AddEvents(
absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
processor.AddEvents(
absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
processor.ProcessEventsInOrder();
std::vector<int64_t> expected_results{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
ASSERT_EQ(result.size(), expected_results.size());
for (size_t i = 0; i < expected_results.size(); i++) {
EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
}
}
TEST(RtcEventProcessor, MergeTwoListsWithDuplicatedElements) {
std::vector<LoggedStartEvent> result;
auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 2, 3, 5, 5}));
std::vector<LoggedStartEvent> events2(CreateEventList({1, 3, 4, 4}));
RtcEventProcessor processor;
processor.AddEvents(
absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
processor.AddEvents(
absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
processor.ProcessEventsInOrder();
std::vector<int64_t> expected_results{1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
ASSERT_EQ(result.size(), expected_results.size());
for (size_t i = 0; i < expected_results.size(); i++) {
EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
}
}
TEST(RtcEventProcessor, MergeManyLists) {
std::vector<LoggedStartEvent> result;
auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
constexpr size_t kNumLists = 5;
constexpr size_t kNumElems = 30;
constexpr uint64_t kSeed = 0xF3C6B91F;
std::vector<std::vector<LoggedStartEvent>> lists(
CreateRandomEventLists(kNumLists, kNumElems, kSeed));
RTC_DCHECK_EQ(lists.size(), kNumLists);
RtcEventProcessor processor;
for (const auto& list : lists) {
processor.AddEvents(
absl::make_unique<OrderedEventView>(list.begin(), list.end(), f));
}
processor.ProcessEventsInOrder();
std::vector<int64_t> expected_results(kNumElems);
std::iota(expected_results.begin(), expected_results.end(), 0);
ASSERT_EQ(result.size(), expected_results.size());
for (size_t i = 0; i < expected_results.size(); i++) {
EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
}
}
TEST(RtcEventProcessor, DifferentTypes) {
std::vector<int64_t> result;
auto f1 = [&result](LoggedStartEvent elem) {
result.push_back(elem.log_time_ms());
};
auto f2 = [&result](LoggedStopEvent elem) {
result.push_back(elem.log_time_ms());
};
std::vector<LoggedStartEvent> events1{LoggedStartEvent(2000)};
std::vector<LoggedStopEvent> events2{LoggedStopEvent(1000)};
RtcEventProcessor processor;
processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStartEvent>>(
events1.begin(), events1.end(), f1));
processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStopEvent>>(
events2.begin(), events2.end(), f2));
processor.ProcessEventsInOrder();
std::vector<int64_t> expected_results{1, 2};
ASSERT_EQ(result.size(), expected_results.size());
for (size_t i = 0; i < expected_results.size(); i++) {
EXPECT_EQ(result[i], expected_results[i]);
}
}
} // namespace webrtc