Refactored CL for moving the output to a separate thread.

The logging thread is always active. The main thread uses SwapQueues to pass events to the logging thread. The logging thread moves the events to either a RingBuffer history in memory, or to a string which is written to disc.

RtcEventLogImpl constructor takes a clock for easier testing.

BUG=webrtc:4741

Review URL: https://codereview.webrtc.org/1687703002

Cr-Commit-Position: refs/heads/master@{#12476}
This commit is contained in:
terelius 2016-04-22 12:40:37 -07:00 committed by Commit bot
parent 1086ed6469
commit 4311ba59d8
12 changed files with 977 additions and 330 deletions

View File

@ -259,6 +259,8 @@ source_set("rtc_event_log") {
sources = [
"call/rtc_event_log.cc",
"call/rtc_event_log.h",
"call/rtc_event_log_helper_thread.cc",
"call/rtc_event_log_helper_thread.h",
]
defines = []

View File

@ -21,12 +21,11 @@ namespace webrtc {
class MockRtcEventLog : public RtcEventLog {
public:
MOCK_METHOD1(SetBufferDuration, void(int64_t buffer_duration_us));
MOCK_METHOD2(StartLogging,
bool(const std::string& file_name, int64_t max_size_bytes));
MOCK_METHOD2(StartLogging,
void(const std::string& file_name, int duration_ms));
MOCK_METHOD1(StartLogging, bool(rtc::PlatformFile log_file));
bool(rtc::PlatformFile log_file, int64_t max_size_bytes));
MOCK_METHOD0(StopLogging, void());

99
webrtc/call/ringbuffer.h Normal file
View File

@ -0,0 +1,99 @@
/*
* Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*
*/
#ifndef WEBRTC_CALL_RINGBUFFER_H_
#define WEBRTC_CALL_RINGBUFFER_H_
#include <memory>
#include <utility>
#include "webrtc/base/checks.h"
namespace webrtc {
// A RingBuffer works like a fixed size queue which starts discarding
// the oldest elements when it becomes full.
template <typename T>
class RingBuffer {
public:
// Creates a RingBuffer with space for |capacity| elements.
explicit RingBuffer(size_t capacity)
: // We allocate space for one extra sentinel element.
data_(new T[capacity + 1]) {
RTC_DCHECK(capacity > 0);
end_ = data_.get() + (capacity + 1);
front_ = data_.get();
back_ = data_.get();
}
~RingBuffer() {
// The unique_ptr will free the memory.
}
// Removes an element from the front of the queue.
void pop_front() {
RTC_DCHECK(!empty());
++front_;
if (front_ == end_) {
front_ = data_.get();
}
}
// Appends an element to the back of the queue (and removes an
// element from the front if there is no space at the back of the queue).
void push_back(const T& elem) {
*back_ = elem;
++back_;
if (back_ == end_) {
back_ = data_.get();
}
if (back_ == front_) {
++front_;
}
if (front_ == end_) {
front_ = data_.get();
}
}
// Appends an element to the back of the queue (and removes an
// element from the front if there is no space at the back of the queue).
void push_back(T&& elem) {
*back_ = std::move(elem);
++back_;
if (back_ == end_) {
back_ = data_.get();
}
if (back_ == front_) {
++front_;
}
if (front_ == end_) {
front_ = data_.get();
}
}
T& front() { return *front_; }
const T& front() const { return *front_; }
bool empty() const { return (front_ == back_); }
private:
std::unique_ptr<T[]> data_;
T* end_;
T* front_;
T* back_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RingBuffer);
};
} // namespace webrtc
#endif // WEBRTC_CALL_RINGBUFFER_H_

View File

@ -0,0 +1,170 @@
/*
* Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*
*/
#include <list>
#include "testing/gtest/include/gtest/gtest.h"
#include "webrtc/base/random.h"
#include "webrtc/call/ringbuffer.h"
namespace {
template <typename T>
class MovableType {
public:
MovableType() : value_(), moved_from_(false), moved_to_(false) {}
explicit MovableType(T value)
: value_(value), moved_from_(false), moved_to_(false) {}
MovableType(const MovableType<T>& other)
: value_(other.value_), moved_from_(false), moved_to_(false) {}
MovableType(MovableType<T>&& other)
: value_(other.value_), moved_from_(false), moved_to_(true) {
other.moved_from_ = true;
}
MovableType& operator=(const MovableType<T>& other) {
value_ = other.value_;
moved_from_ = false;
moved_to_ = false;
return *this;
}
MovableType& operator=(MovableType<T>&& other) {
value_ = other.value_;
moved_from_ = false;
moved_to_ = true;
other.moved_from_ = true;
return *this;
}
T Value() { return value_; }
bool IsMovedFrom() { return moved_from_; }
bool IsMovedTo() { return moved_to_; }
private:
T value_;
bool moved_from_;
bool moved_to_;
};
} // namespace
namespace webrtc {
// Verify that the ring buffer works as a simple queue.
TEST(RingBufferTest, SimpleQueue) {
size_t capacity = 100;
RingBuffer<size_t> q(capacity);
EXPECT_TRUE(q.empty());
for (size_t i = 0; i < capacity; i++) {
q.push_back(i);
EXPECT_FALSE(q.empty());
}
for (size_t i = 0; i < capacity; i++) {
EXPECT_FALSE(q.empty());
EXPECT_EQ(i, q.front());
q.pop_front();
}
EXPECT_TRUE(q.empty());
}
// Do a "random" sequence of queue operations and verify that the
// result is consistent with the same operation performed on a std::list.
TEST(RingBufferTest, ConsistentWithStdList) {
Random prng(987654321ull);
size_t capacity = 10;
RingBuffer<int> q(capacity);
std::list<int> l;
EXPECT_TRUE(q.empty());
for (size_t i = 0; i < 100 * capacity; i++) {
bool insert = prng.Rand<bool>();
if ((insert && l.size() < capacity) || l.size() == 0) {
int x = prng.Rand<int>();
l.push_back(x);
q.push_back(x);
EXPECT_FALSE(q.empty());
} else {
EXPECT_FALSE(q.empty());
EXPECT_EQ(l.front(), q.front());
l.pop_front();
q.pop_front();
}
}
while (!l.empty()) {
EXPECT_FALSE(q.empty());
EXPECT_EQ(l.front(), q.front());
l.pop_front();
q.pop_front();
}
EXPECT_TRUE(q.empty());
}
// Test that the ringbuffer starts reusing elements from the front
// when the queue becomes full.
TEST(RingBufferTest, OverwriteOldElements) {
size_t capacity = 100;
size_t insertions = 3 * capacity + 25;
RingBuffer<size_t> q(capacity);
EXPECT_TRUE(q.empty());
for (size_t i = 0; i < insertions; i++) {
q.push_back(i);
EXPECT_FALSE(q.empty());
}
for (size_t i = insertions - capacity; i < insertions; i++) {
EXPECT_FALSE(q.empty());
EXPECT_EQ(i, q.front());
q.pop_front();
}
EXPECT_TRUE(q.empty());
}
// Test that the ringbuffer uses std::move when pushing an rvalue reference.
TEST(RingBufferTest, MoveSemanticsForPushBack) {
size_t capacity = 100;
size_t insertions = 3 * capacity + 25;
RingBuffer<MovableType<size_t>> q(capacity);
EXPECT_TRUE(q.empty());
for (size_t i = 0; i < insertions; i++) {
MovableType<size_t> tmp(i);
EXPECT_FALSE(tmp.IsMovedFrom());
EXPECT_FALSE(tmp.IsMovedTo());
q.push_back(std::move(tmp));
EXPECT_TRUE(tmp.IsMovedFrom());
EXPECT_FALSE(tmp.IsMovedTo());
EXPECT_FALSE(q.empty());
}
for (size_t i = insertions - capacity; i < insertions; i++) {
EXPECT_FALSE(q.empty());
EXPECT_EQ(i, q.front().Value());
EXPECT_FALSE(q.front().IsMovedFrom());
EXPECT_TRUE(q.front().IsMovedTo());
q.pop_front();
}
EXPECT_TRUE(q.empty());
}
TEST(RingBufferTest, SmallCapacity) {
size_t capacity = 1;
RingBuffer<int> q(capacity);
EXPECT_TRUE(q.empty());
q.push_back(4711);
EXPECT_FALSE(q.empty());
EXPECT_EQ(4711, q.front());
q.push_back(1024);
EXPECT_FALSE(q.empty());
EXPECT_EQ(1024, q.front());
q.pop_front();
EXPECT_TRUE(q.empty());
}
} // namespace webrtc

View File

@ -10,18 +10,22 @@
#include "webrtc/call/rtc_event_log.h"
#include <deque>
#include <limits>
#include <vector>
#include "webrtc/base/checks.h"
#include "webrtc/base/criticalsection.h"
#include "webrtc/base/thread_annotations.h"
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/event.h"
#include "webrtc/base/swap_queue.h"
#include "webrtc/base/thread_checker.h"
#include "webrtc/call.h"
#include "webrtc/call/rtc_event_log_helper_thread.h"
#include "webrtc/modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "webrtc/modules/rtp_rtcp/source/byte_io.h"
#include "webrtc/modules/rtp_rtcp/source/rtcp_utility.h"
#include "webrtc/system_wrappers/include/clock.h"
#include "webrtc/system_wrappers/include/file_wrapper.h"
#include "webrtc/system_wrappers/include/logging.h"
#ifdef ENABLE_RTC_EVENT_LOG
// Files generated at build-time by the protobuf compiler.
@ -37,12 +41,17 @@ namespace webrtc {
#ifndef ENABLE_RTC_EVENT_LOG
// No-op implementation if flag is not set.
class RtcEventLogImpl final : public RtcEventLog {
class RtcEventLogNullImpl final : public RtcEventLog {
public:
void SetBufferDuration(int64_t buffer_duration_us) override {}
void StartLogging(const std::string& file_name, int duration_ms) override {}
bool StartLogging(rtc::PlatformFile log_file) override { return false; }
void StopLogging(void) override {}
bool StartLogging(const std::string& file_name,
int64_t max_size_bytes) override {
return false;
}
bool StartLogging(rtc::PlatformFile platform_file,
int64_t max_size_bytes) override {
return false;
}
void StopLogging() override {}
void LogVideoReceiveStreamConfig(
const VideoReceiveStream::Config& config) override {}
void LogVideoSendStreamConfig(
@ -65,11 +74,13 @@ class RtcEventLogImpl final : public RtcEventLog {
class RtcEventLogImpl final : public RtcEventLog {
public:
RtcEventLogImpl();
explicit RtcEventLogImpl(const Clock* clock);
~RtcEventLogImpl() override;
void SetBufferDuration(int64_t buffer_duration_us) override;
void StartLogging(const std::string& file_name, int duration_ms) override;
bool StartLogging(rtc::PlatformFile log_file) override;
bool StartLogging(const std::string& file_name,
int64_t max_size_bytes) override;
bool StartLogging(rtc::PlatformFile platform_file,
int64_t max_size_bytes) override;
void StopLogging() override;
void LogVideoReceiveStreamConfig(
const VideoReceiveStream::Config& config) override;
@ -88,37 +99,21 @@ class RtcEventLogImpl final : public RtcEventLog {
int32_t total_packets) override;
private:
// Starts logging. This function assumes the file_ has been opened succesfully
// and that the start_time_us_ and _duration_us_ have been set.
void StartLoggingLocked() EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Stops logging and clears the stored data and buffers.
void StopLoggingLocked() EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Adds a new event to the logfile if logging is active, or adds it to the
// list of recent log events otherwise.
void HandleEvent(rtclog::Event* event) EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Writes the event to the file. Note that this will destroy the state of the
// input argument.
void StoreToFile(rtclog::Event* event) EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Adds the event to the list of recent events, and removes any events that
// are too old and no longer fall in the time window.
void AddRecentEvent(const rtclog::Event& event)
EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Message queue for passing control messages to the logging thread.
SwapQueue<RtcEventLogHelperThread::ControlMessage> message_queue_;
rtc::CriticalSection crit_;
std::unique_ptr<FileWrapper> file_ GUARDED_BY(crit_) =
std::unique_ptr<FileWrapper>(FileWrapper::Create());
rtc::PlatformFile platform_file_ GUARDED_BY(crit_) =
rtc::kInvalidPlatformFileValue;
rtclog::EventStream stream_ GUARDED_BY(crit_);
std::deque<rtclog::Event> recent_log_events_ GUARDED_BY(crit_);
std::vector<rtclog::Event> config_events_ GUARDED_BY(crit_);
// Message queue for passing events to the logging thread.
SwapQueue<std::unique_ptr<rtclog::Event> > event_queue_;
rtc::Event wake_up_;
rtc::Event stopped_;
// Microseconds to record log events, before starting the actual log.
int64_t buffer_duration_us_ GUARDED_BY(crit_);
bool currently_logging_ GUARDED_BY(crit_);
int64_t start_time_us_ GUARDED_BY(crit_);
int64_t duration_us_ GUARDED_BY(crit_);
const Clock* const clock_;
RtcEventLogHelperThread helper_thread_;
rtc::ThreadChecker thread_checker_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcEventLogImpl);
};
namespace {
@ -126,10 +121,6 @@ namespace {
// that the rest of the WebRtc project can use, to the corresponding
// serialized enum which is defined by the protobuf.
// Do not add default return values to the conversion functions in this
// unnamed namespace. The intention is to make the compiler warn if anyone
// adds unhandled new events/modes/etc.
rtclog::VideoReceiveConfig_RtcpMode ConvertRtcpMode(RtcpMode rtcp_mode) {
switch (rtcp_mode) {
case RtcpMode::kCompound:
@ -159,114 +150,104 @@ rtclog::MediaType ConvertMediaType(MediaType media_type) {
return rtclog::ANY;
}
} // namespace
namespace {
bool IsConfigEvent(const rtclog::Event& event) {
rtclog::Event_EventType event_type = event.type();
return event_type == rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT ||
event_type == rtclog::Event::VIDEO_SENDER_CONFIG_EVENT ||
event_type == rtclog::Event::AUDIO_RECEIVER_CONFIG_EVENT ||
event_type == rtclog::Event::AUDIO_SENDER_CONFIG_EVENT;
}
// The RTP and RTCP buffers reserve space for twice the expected number of
// sent packets because they also contain received packets.
static const int kEventsPerSecond = 1000;
static const int kControlMessagesPerSecond = 10;
} // namespace
// RtcEventLogImpl member functions.
RtcEventLogImpl::RtcEventLogImpl()
: file_(FileWrapper::Create()),
stream_(),
buffer_duration_us_(10000000),
currently_logging_(false),
start_time_us_(0),
duration_us_(0),
clock_(Clock::GetRealTimeClock()) {
RtcEventLogImpl::RtcEventLogImpl(const Clock* clock)
// Allocate buffers for roughly one second of history.
: message_queue_(kControlMessagesPerSecond),
event_queue_(kEventsPerSecond),
wake_up_(false, false),
stopped_(false, false),
clock_(clock),
helper_thread_(&message_queue_,
&event_queue_,
&wake_up_,
&stopped_,
clock),
thread_checker_() {
thread_checker_.DetachFromThread();
}
void RtcEventLogImpl::SetBufferDuration(int64_t buffer_duration_us) {
rtc::CritScope lock(&crit_);
buffer_duration_us_ = buffer_duration_us;
RtcEventLogImpl::~RtcEventLogImpl() {
// The RtcEventLogHelperThread destructor closes the file
// and waits for the thread to terminate.
}
void RtcEventLogImpl::StartLogging(const std::string& file_name,
int duration_ms) {
rtc::CritScope lock(&crit_);
if (currently_logging_) {
StopLoggingLocked();
}
if (file_->OpenFile(file_name.c_str(), false) != 0) {
return;
}
start_time_us_ = clock_->TimeInMicroseconds();
duration_us_ = static_cast<int64_t>(duration_ms) * 1000;
StartLoggingLocked();
}
bool RtcEventLogImpl::StartLogging(rtc::PlatformFile log_file) {
rtc::CritScope lock(&crit_);
if (currently_logging_) {
StopLoggingLocked();
}
RTC_DCHECK(platform_file_ == rtc::kInvalidPlatformFileValue);
FILE* file_stream = rtc::FdopenPlatformFileForWriting(log_file);
if (!file_stream) {
rtc::ClosePlatformFile(log_file);
bool RtcEventLogImpl::StartLogging(const std::string& file_name,
int64_t max_size_bytes) {
RTC_DCHECK(thread_checker_.CalledOnValidThread());
RtcEventLogHelperThread::ControlMessage message;
message.message_type = RtcEventLogHelperThread::ControlMessage::START_FILE;
message.max_size_bytes = max_size_bytes;
message.start_time = clock_->TimeInMicroseconds();
message.stop_time = std::numeric_limits<int64_t>::max();
message.file.reset(FileWrapper::Create());
if (message.file->OpenFile(file_name.c_str(), false) != 0) {
return false;
}
if (file_->OpenFromFileHandle(file_stream, true, false) != 0) {
rtc::ClosePlatformFile(log_file);
if (!message_queue_.Insert(&message)) {
LOG(LS_WARNING) << "Message queue full. Can't start logging.";
return false;
}
platform_file_ = log_file;
// Set the start time and duration to keep logging for 10 minutes.
start_time_us_ = clock_->TimeInMicroseconds();
duration_us_ = 10 * 60 * 1000000;
StartLoggingLocked();
return true;
}
void RtcEventLogImpl::StartLoggingLocked() {
currently_logging_ = true;
// Write all old configuration events to the log file.
for (auto& event : config_events_) {
StoreToFile(&event);
bool RtcEventLogImpl::StartLogging(rtc::PlatformFile platform_file,
int64_t max_size_bytes) {
RTC_DCHECK(thread_checker_.CalledOnValidThread());
RtcEventLogHelperThread::ControlMessage message;
message.message_type = RtcEventLogHelperThread::ControlMessage::START_FILE;
message.max_size_bytes = max_size_bytes;
message.start_time = clock_->TimeInMicroseconds();
message.stop_time = std::numeric_limits<int64_t>::max();
message.file.reset(FileWrapper::Create());
FILE* file_handle = rtc::FdopenPlatformFileForWriting(platform_file);
if (!file_handle) {
return false;
}
// Write all recent configuration events to the log file, and
// write all other recent events to the log file, ignoring any old events.
for (auto& event : recent_log_events_) {
if (IsConfigEvent(event)) {
StoreToFile(&event);
config_events_.push_back(event);
} else if (event.timestamp_us() >= start_time_us_ - buffer_duration_us_) {
StoreToFile(&event);
}
if (message.file->OpenFromFileHandle(file_handle, true, false) != 0) {
return false;
}
recent_log_events_.clear();
// Write a LOG_START event to the file.
rtclog::Event start_event;
start_event.set_timestamp_us(start_time_us_);
start_event.set_type(rtclog::Event::LOG_START);
StoreToFile(&start_event);
if (!message_queue_.Insert(&message)) {
LOG(LS_WARNING) << "Message queue full. Can't start logging.";
return false;
}
return true;
}
void RtcEventLogImpl::StopLogging() {
rtc::CritScope lock(&crit_);
StopLoggingLocked();
RTC_DCHECK(thread_checker_.CalledOnValidThread());
RtcEventLogHelperThread::ControlMessage message;
message.message_type = RtcEventLogHelperThread::ControlMessage::STOP_FILE;
message.stop_time = clock_->TimeInMicroseconds();
while (!message_queue_.Insert(&message)) {
// TODO(terelius): We would like to have a blocking Insert function in the
// SwapQueue, but for the time being we will just clear any previous
// messages.
// Since StopLogging waits for the thread, it is essential that we don't
// clear any STOP_FILE messages. To ensure that there is only one call at a
// time, we require that all calls to StopLogging are made on the same
// thread.
LOG(LS_WARNING) << "Message queue full. Clearing queue to stop logging.";
message_queue_.Clear();
}
wake_up_.Set(); // Request the output thread to wake up.
stopped_.Wait(rtc::Event::kForever); // Wait for the log to stop.
}
void RtcEventLogImpl::LogVideoReceiveStreamConfig(
const VideoReceiveStream::Config& config) {
rtc::CritScope lock(&crit_);
rtclog::Event event;
event.set_timestamp_us(clock_->TimeInMicroseconds());
event.set_type(rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT);
std::unique_ptr<rtclog::Event> event(new rtclog::Event());
event->set_timestamp_us(clock_->TimeInMicroseconds());
event->set_type(rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT);
rtclog::VideoReceiveConfig* receiver_config =
event.mutable_video_receiver_config();
event->mutable_video_receiver_config();
receiver_config->set_remote_ssrc(config.rtp.remote_ssrc);
receiver_config->set_local_ssrc(config.rtp.local_ssrc);
@ -292,18 +273,18 @@ void RtcEventLogImpl::LogVideoReceiveStreamConfig(
decoder->set_name(d.payload_name);
decoder->set_payload_type(d.payload_type);
}
HandleEvent(&event);
if (!event_queue_.Insert(&event)) {
LOG(LS_WARNING) << "Config queue full. Not logging config event.";
}
}
void RtcEventLogImpl::LogVideoSendStreamConfig(
const VideoSendStream::Config& config) {
rtc::CritScope lock(&crit_);
std::unique_ptr<rtclog::Event> event(new rtclog::Event());
event->set_timestamp_us(clock_->TimeInMicroseconds());
event->set_type(rtclog::Event::VIDEO_SENDER_CONFIG_EVENT);
rtclog::Event event;
event.set_timestamp_us(clock_->TimeInMicroseconds());
event.set_type(rtclog::Event::VIDEO_SENDER_CONFIG_EVENT);
rtclog::VideoSendConfig* sender_config = event.mutable_video_sender_config();
rtclog::VideoSendConfig* sender_config = event->mutable_video_sender_config();
for (const auto& ssrc : config.rtp.ssrcs) {
sender_config->add_ssrcs(ssrc);
@ -324,7 +305,9 @@ void RtcEventLogImpl::LogVideoSendStreamConfig(
rtclog::EncoderConfig* encoder = sender_config->mutable_encoder();
encoder->set_name(config.encoder_settings.payload_name);
encoder->set_payload_type(config.encoder_settings.payload_type);
HandleEvent(&event);
if (!event_queue_.Insert(&event)) {
LOG(LS_WARNING) << "Config queue full. Not logging config event.";
}
}
void RtcEventLogImpl::LogRtpHeader(PacketDirection direction,
@ -347,27 +330,27 @@ void RtcEventLogImpl::LogRtpHeader(PacketDirection direction,
header_length += (x_len + 1) * 4;
}
rtc::CritScope lock(&crit_);
rtclog::Event rtp_event;
rtp_event.set_timestamp_us(clock_->TimeInMicroseconds());
rtp_event.set_type(rtclog::Event::RTP_EVENT);
rtp_event.mutable_rtp_packet()->set_incoming(direction == kIncomingPacket);
rtp_event.mutable_rtp_packet()->set_type(ConvertMediaType(media_type));
rtp_event.mutable_rtp_packet()->set_packet_length(packet_length);
rtp_event.mutable_rtp_packet()->set_header(header, header_length);
HandleEvent(&rtp_event);
std::unique_ptr<rtclog::Event> rtp_event(new rtclog::Event());
rtp_event->set_timestamp_us(clock_->TimeInMicroseconds());
rtp_event->set_type(rtclog::Event::RTP_EVENT);
rtp_event->mutable_rtp_packet()->set_incoming(direction == kIncomingPacket);
rtp_event->mutable_rtp_packet()->set_type(ConvertMediaType(media_type));
rtp_event->mutable_rtp_packet()->set_packet_length(packet_length);
rtp_event->mutable_rtp_packet()->set_header(header, header_length);
if (!event_queue_.Insert(&rtp_event)) {
LOG(LS_WARNING) << "RTP queue full. Not logging RTP packet.";
}
}
void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction,
MediaType media_type,
const uint8_t* packet,
size_t length) {
rtc::CritScope lock(&crit_);
rtclog::Event rtcp_event;
rtcp_event.set_timestamp_us(clock_->TimeInMicroseconds());
rtcp_event.set_type(rtclog::Event::RTCP_EVENT);
rtcp_event.mutable_rtcp_packet()->set_incoming(direction == kIncomingPacket);
rtcp_event.mutable_rtcp_packet()->set_type(ConvertMediaType(media_type));
std::unique_ptr<rtclog::Event> rtcp_event(new rtclog::Event());
rtcp_event->set_timestamp_us(clock_->TimeInMicroseconds());
rtcp_event->set_type(rtclog::Event::RTCP_EVENT);
rtcp_event->mutable_rtcp_packet()->set_incoming(direction == kIncomingPacket);
rtcp_event->mutable_rtcp_packet()->set_type(ConvertMediaType(media_type));
RTCPUtility::RtcpCommonHeader header;
const uint8_t* block_begin = packet;
@ -413,87 +396,35 @@ void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction,
block_begin += block_size;
}
rtcp_event.mutable_rtcp_packet()->set_packet_data(buffer, buffer_length);
HandleEvent(&rtcp_event);
rtcp_event->mutable_rtcp_packet()->set_packet_data(buffer, buffer_length);
if (!event_queue_.Insert(&rtcp_event)) {
LOG(LS_WARNING) << "RTCP queue full. Not logging RTCP packet.";
}
}
void RtcEventLogImpl::LogAudioPlayout(uint32_t ssrc) {
rtc::CritScope lock(&crit_);
rtclog::Event event;
event.set_timestamp_us(clock_->TimeInMicroseconds());
event.set_type(rtclog::Event::AUDIO_PLAYOUT_EVENT);
auto playout_event = event.mutable_audio_playout_event();
std::unique_ptr<rtclog::Event> event(new rtclog::Event());
event->set_timestamp_us(clock_->TimeInMicroseconds());
event->set_type(rtclog::Event::AUDIO_PLAYOUT_EVENT);
auto playout_event = event->mutable_audio_playout_event();
playout_event->set_local_ssrc(ssrc);
HandleEvent(&event);
if (!event_queue_.Insert(&event)) {
LOG(LS_WARNING) << "Playout queue full. Not logging ACM playout.";
}
}
void RtcEventLogImpl::LogBwePacketLossEvent(int32_t bitrate,
uint8_t fraction_loss,
int32_t total_packets) {
rtc::CritScope lock(&crit_);
rtclog::Event event;
event.set_timestamp_us(clock_->TimeInMicroseconds());
event.set_type(rtclog::Event::BWE_PACKET_LOSS_EVENT);
auto bwe_event = event.mutable_bwe_packet_loss_event();
std::unique_ptr<rtclog::Event> event(new rtclog::Event());
event->set_timestamp_us(clock_->TimeInMicroseconds());
event->set_type(rtclog::Event::BWE_PACKET_LOSS_EVENT);
auto bwe_event = event->mutable_bwe_packet_loss_event();
bwe_event->set_bitrate(bitrate);
bwe_event->set_fraction_loss(fraction_loss);
bwe_event->set_total_packets(total_packets);
HandleEvent(&event);
}
void RtcEventLogImpl::StopLoggingLocked() {
if (currently_logging_) {
currently_logging_ = false;
// Create a LogEnd event
rtclog::Event event;
event.set_timestamp_us(clock_->TimeInMicroseconds());
event.set_type(rtclog::Event::LOG_END);
// Store the event and close the file
RTC_DCHECK(file_->Open());
StoreToFile(&event);
file_->CloseFile();
if (platform_file_ != rtc::kInvalidPlatformFileValue) {
rtc::ClosePlatformFile(platform_file_);
platform_file_ = rtc::kInvalidPlatformFileValue;
}
}
RTC_DCHECK(!file_->Open());
stream_.Clear();
}
void RtcEventLogImpl::HandleEvent(rtclog::Event* event) {
if (currently_logging_) {
if (clock_->TimeInMicroseconds() < start_time_us_ + duration_us_) {
StoreToFile(event);
return;
}
StopLoggingLocked();
}
AddRecentEvent(*event);
}
void RtcEventLogImpl::StoreToFile(rtclog::Event* event) {
// Reuse the same object at every log event.
if (stream_.stream_size() < 1) {
stream_.add_stream();
}
RTC_DCHECK_EQ(stream_.stream_size(), 1);
stream_.mutable_stream(0)->Swap(event);
// TODO(terelius): Doesn't this create a new EventStream per event?
// Is this guaranteed to work e.g. in future versions of protobuf?
std::string dump_buffer;
stream_.SerializeToString(&dump_buffer);
file_->Write(dump_buffer.data(), dump_buffer.size());
}
void RtcEventLogImpl::AddRecentEvent(const rtclog::Event& event) {
recent_log_events_.push_back(event);
while (recent_log_events_.front().timestamp_us() <
event.timestamp_us() - buffer_duration_us_) {
if (IsConfigEvent(recent_log_events_.front())) {
config_events_.push_back(recent_log_events_.front());
}
recent_log_events_.pop_front();
if (!event_queue_.Insert(&event)) {
LOG(LS_WARNING) << "BWE loss queue full. Not logging BWE update.";
}
}
@ -516,8 +447,12 @@ bool RtcEventLog::ParseRtcEventLog(const std::string& file_name,
#endif // ENABLE_RTC_EVENT_LOG
// RtcEventLog member functions.
std::unique_ptr<RtcEventLog> RtcEventLog::Create() {
return std::unique_ptr<RtcEventLog>(new RtcEventLogImpl());
std::unique_ptr<RtcEventLog> RtcEventLog::Create(const Clock* clock) {
#ifdef ENABLE_RTC_EVENT_LOG
return std::unique_ptr<RtcEventLog>(new RtcEventLogImpl(clock));
#else
return std::unique_ptr<RtcEventLog>(new RtcEventLogNullImpl());
#endif // ENABLE_RTC_EVENT_LOG
}
} // namespace webrtc

View File

@ -26,6 +26,7 @@ namespace rtclog {
class EventStream;
} // namespace rtclog
class Clock;
class RtcEventLogImpl;
enum class MediaType;
@ -36,30 +37,40 @@ class RtcEventLog {
public:
virtual ~RtcEventLog() {}
static std::unique_ptr<RtcEventLog> Create();
// Factory method to create an RtcEventLog object.
static std::unique_ptr<RtcEventLog> Create(const Clock* clock);
// Sets the time that events are stored in the internal event buffer
// before the user calls StartLogging. The default is 10 000 000 us = 10 s
virtual void SetBufferDuration(int64_t buffer_duration_us) = 0;
// Starts logging for the specified duration to the specified file.
// The logging will stop automatically after the specified duration.
// Starts logging a maximum of max_size_bytes bytes to the specified file.
// If the file already exists it will be overwritten.
// If the file cannot be opened, the RtcEventLog will not start logging.
virtual void StartLogging(const std::string& file_name, int duration_ms) = 0;
// If max_size_bytes <= 0, logging will be active until StopLogging is called.
// The function has no effect and returns false if we can't start a new log
// e.g. because we are already logging or the file cannot be opened.
virtual bool StartLogging(const std::string& file_name,
int64_t max_size_bytes) = 0;
// Starts logging until either the 10 minute timer runs out or the StopLogging
// function is called. The RtcEventLog takes ownership of the supplied
// rtc::PlatformFile.
virtual bool StartLogging(rtc::PlatformFile log_file) = 0;
// Same as above. The RtcEventLog takes ownership of the file if the call
// is successful, i.e. if it returns true.
virtual bool StartLogging(rtc::PlatformFile platform_file,
int64_t max_size_bytes) = 0;
// Deprecated. Pass an explicit file size limit.
bool StartLogging(const std::string& file_name) {
return StartLogging(file_name, 10000000);
}
// Deprecated. Pass an explicit file size limit.
bool StartLogging(rtc::PlatformFile platform_file) {
return StartLogging(platform_file, 10000000);
}
// Stops logging to file and waits until the thread has finished.
virtual void StopLogging() = 0;
// Logs configuration information for webrtc::VideoReceiveStream
// Logs configuration information for webrtc::VideoReceiveStream.
virtual void LogVideoReceiveStreamConfig(
const webrtc::VideoReceiveStream::Config& config) = 0;
// Logs configuration information for webrtc::VideoSendStream
// Logs configuration information for webrtc::VideoSendStream.
virtual void LogVideoSendStreamConfig(
const webrtc::VideoSendStream::Config& config) = 0;
@ -76,7 +87,7 @@ class RtcEventLog {
const uint8_t* packet,
size_t length) = 0;
// Logs an audio playout event
// Logs an audio playout event.
virtual void LogAudioPlayout(uint32_t ssrc) = 0;
// Logs a bitrate update from the bandwidth estimator based on packet loss.
@ -86,6 +97,11 @@ class RtcEventLog {
// Reads an RtcEventLog file and returns true when reading was successful.
// The result is stored in the given EventStream object.
// The order of the events in the EventStream is implementation defined.
// The current implementation writes a LOG_START event, then the old
// configurations, then the remaining events in timestamp order and finally
// a LOG_END event. However, this might change without further notice.
// TODO(terelius): Change result type to a vector?
static bool ParseRtcEventLog(const std::string& file_name,
rtclog::EventStream* result);
};

View File

@ -0,0 +1,285 @@
/*
* Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/call/rtc_event_log_helper_thread.h"
#include <algorithm>
#include "webrtc/base/checks.h"
#include "webrtc/system_wrappers/include/logging.h"
#ifdef ENABLE_RTC_EVENT_LOG
namespace webrtc {
namespace {
const int kEventsInHistory = 10000;
bool IsConfigEvent(const rtclog::Event& event) {
rtclog::Event_EventType event_type = event.type();
return event_type == rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT ||
event_type == rtclog::Event::VIDEO_SENDER_CONFIG_EVENT ||
event_type == rtclog::Event::AUDIO_RECEIVER_CONFIG_EVENT ||
event_type == rtclog::Event::AUDIO_SENDER_CONFIG_EVENT;
}
} // namespace
// RtcEventLogImpl member functions.
RtcEventLogHelperThread::RtcEventLogHelperThread(
SwapQueue<ControlMessage>* message_queue,
SwapQueue<std::unique_ptr<rtclog::Event>>* event_queue,
rtc::Event* wake_up,
rtc::Event* stopped,
const Clock* const clock)
: message_queue_(message_queue),
event_queue_(event_queue),
history_(kEventsInHistory),
config_history_(),
file_(FileWrapper::Create()),
thread_(&ThreadOutputFunction, this, "RtcEventLog thread"),
max_size_bytes_(std::numeric_limits<int64_t>::max()),
written_bytes_(0),
start_time_(0),
stop_time_(std::numeric_limits<int64_t>::max()),
has_recent_event_(false),
most_recent_event_(),
output_string_(),
wake_up_(wake_up),
stopped_(stopped),
clock_(clock) {
RTC_DCHECK(message_queue_);
RTC_DCHECK(event_queue_);
RTC_DCHECK(wake_up_);
RTC_DCHECK(stopped_);
RTC_DCHECK(clock_);
thread_.Start();
}
RtcEventLogHelperThread::~RtcEventLogHelperThread() {
ControlMessage message;
message.message_type = ControlMessage::TERMINATE_THREAD;
message.stop_time = clock_->TimeInMicroseconds();
while (!message_queue_->Insert(&message)) {
// We can't destroy the event log until we have stopped the thread,
// so clear the message queue and try again. Note that if we clear
// any STOP_FILE events, then the threads calling StopLogging would likely
// wait indefinitely. However, there should not be any such calls as we
// are executing the destructor.
LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
message_queue_->Clear();
}
wake_up_->Set(); // Wake up the output thread.
thread_.Stop(); // Wait for the thread to terminate.
}
bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
rtclog::EventStream event_stream;
event_stream.add_stream();
event_stream.mutable_stream(0)->Swap(event);
// We create a new event stream per event but because of the way protobufs
// are encoded, events can be merged by concatenating them. Therefore,
// it will look like a single stream when we read it back from file.
bool stop = true;
if (written_bytes_ + static_cast<int64_t>(output_string_.size()) +
event_stream.ByteSize() <=
max_size_bytes_) {
event_stream.AppendToString(&output_string_);
stop = false;
}
// Swap the event back so that we don't mix event types in the queues.
event_stream.mutable_stream(0)->Swap(event);
return stop;
}
void RtcEventLogHelperThread::LogToMemory() {
RTC_DCHECK(!file_->Open());
// Process each event earlier than the current time and append it to the
// appropriate history_.
int64_t current_time = clock_->TimeInMicroseconds();
if (!has_recent_event_) {
has_recent_event_ = event_queue_->Remove(&most_recent_event_);
}
while (has_recent_event_ &&
most_recent_event_->timestamp_us() <= current_time) {
if (IsConfigEvent(*most_recent_event_)) {
config_history_.push_back(std::move(most_recent_event_));
} else {
history_.push_back(std::move(most_recent_event_));
}
has_recent_event_ = event_queue_->Remove(&most_recent_event_);
}
}
void RtcEventLogHelperThread::StartLogFile() {
RTC_DCHECK(file_->Open());
bool stop = false;
output_string_.clear();
// Create and serialize the LOG_START event.
rtclog::Event start_event;
start_event.set_timestamp_us(start_time_);
start_event.set_type(rtclog::Event::LOG_START);
AppendEventToString(&start_event);
// Serialize the config information for all old streams.
for (auto& event : config_history_) {
AppendEventToString(event.get());
}
// Serialize the events in the event queue.
while (!history_.empty() && !stop) {
stop = AppendEventToString(history_.front().get());
if (!stop) {
history_.pop_front();
}
}
// Write to file.
file_->Write(output_string_.data(), output_string_.size());
written_bytes_ += output_string_.size();
// Free the allocated memory since we probably won't need this amount of
// space again.
output_string_.clear();
output_string_.shrink_to_fit();
if (stop) {
RTC_DCHECK(file_->Open());
StopLogFile();
}
}
void RtcEventLogHelperThread::LogToFile() {
RTC_DCHECK(file_->Open());
output_string_.clear();
// Append each event older than both the current time and the stop time
// to the output_string_.
int64_t current_time = clock_->TimeInMicroseconds();
int64_t time_limit = std::min(current_time, stop_time_);
if (!has_recent_event_) {
has_recent_event_ = event_queue_->Remove(&most_recent_event_);
}
bool stop = false;
while (!stop && has_recent_event_ &&
most_recent_event_->timestamp_us() <= time_limit) {
stop = AppendEventToString(most_recent_event_.get());
if (!stop) {
if (IsConfigEvent(*most_recent_event_)) {
config_history_.push_back(std::move(most_recent_event_));
}
has_recent_event_ = event_queue_->Remove(&most_recent_event_);
}
}
// Write string to file.
file_->Write(output_string_.data(), output_string_.size());
written_bytes_ += output_string_.size();
if (!file_->Open()) {
LOG(LS_WARNING) << "WebRTC event log file closed by FileWrapper.";
}
// We want to stop logging if we have reached the file size limit. We also
// want to stop logging if the remaining events are more recent than the
// time limit, or in other words if we have terminated the loop despite
// having more events in the queue.
if ((has_recent_event_ && most_recent_event_->timestamp_us() > stop_time_) ||
stop) {
RTC_DCHECK(file_->Open());
StopLogFile();
}
}
void RtcEventLogHelperThread::StopLogFile() {
RTC_DCHECK(file_->Open());
output_string_.clear();
rtclog::Event end_event;
end_event.set_timestamp_us(stop_time_);
end_event.set_type(rtclog::Event::LOG_END);
AppendEventToString(&end_event);
if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <=
max_size_bytes_) {
file_->Write(output_string_.data(), output_string_.size());
written_bytes_ += output_string_.size();
}
max_size_bytes_ = std::numeric_limits<int64_t>::max();
written_bytes_ = 0;
start_time_ = 0;
stop_time_ = std::numeric_limits<int64_t>::max();
output_string_.clear();
file_->CloseFile();
RTC_DCHECK(!file_->Open());
}
void RtcEventLogHelperThread::WriteLog() {
ControlMessage message;
while (true) {
// Process control messages.
while (message_queue_->Remove(&message)) {
switch (message.message_type) {
case ControlMessage::START_FILE:
if (!file_->Open()) {
max_size_bytes_ = message.max_size_bytes;
start_time_ = message.start_time;
stop_time_ = message.stop_time;
file_.swap(message.file);
StartLogFile();
} else {
// Already started. Ignore message and close file handle.
message.file->CloseFile();
}
break;
case ControlMessage::STOP_FILE:
if (file_->Open()) {
stop_time_ = message.stop_time;
LogToFile(); // Log remaining events from message queues.
}
// LogToFile might stop on it's own so we need to recheck the state.
if (file_->Open()) {
StopLogFile();
}
stopped_->Set();
break;
case ControlMessage::TERMINATE_THREAD:
if (file_->Open()) {
StopLogFile();
}
return;
}
}
// Write events to file or memory
if (file_->Open()) {
LogToFile();
} else {
LogToMemory();
}
// Accumulate a new batch of events instead of processing them one at a
// time.
wake_up_->Wait(50);
}
}
bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
helper->WriteLog();
return false;
}
} // namespace webrtc
#endif // ENABLE_RTC_EVENT_LOG

View File

@ -0,0 +1,123 @@
/*
* Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_CALL_RTC_EVENT_LOG_HELPER_THREAD_H_
#define WEBRTC_CALL_RTC_EVENT_LOG_HELPER_THREAD_H_
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/event.h"
#include "webrtc/base/platform_thread.h"
#include "webrtc/base/swap_queue.h"
#include "webrtc/call/ringbuffer.h"
#include "webrtc/system_wrappers/include/clock.h"
#include "webrtc/system_wrappers/include/file_wrapper.h"
#ifdef ENABLE_RTC_EVENT_LOG
// Files generated at build-time by the protobuf compiler.
#ifdef WEBRTC_ANDROID_PLATFORM_BUILD
#include "external/webrtc/webrtc/call/rtc_event_log.pb.h"
#else
#include "webrtc/call/rtc_event_log.pb.h"
#endif
#endif
#ifdef ENABLE_RTC_EVENT_LOG
namespace webrtc {
class RtcEventLogHelperThread final {
public:
struct ControlMessage {
ControlMessage()
: message_type(STOP_FILE),
file(nullptr),
max_size_bytes(0),
start_time(0),
stop_time(0) {}
enum { START_FILE, STOP_FILE, TERMINATE_THREAD } message_type;
std::unique_ptr<FileWrapper> file; // Only used with START_FILE.
int64_t max_size_bytes; // Only used with START_FILE.
int64_t start_time; // Only used with START_FILE.
int64_t stop_time; // Used with all 3 message types.
friend void swap(ControlMessage& lhs, ControlMessage& rhs) {
using std::swap;
swap(lhs.message_type, rhs.message_type);
lhs.file.swap(rhs.file);
swap(lhs.max_size_bytes, rhs.max_size_bytes);
swap(lhs.start_time, rhs.start_time);
swap(lhs.stop_time, rhs.stop_time);
}
};
RtcEventLogHelperThread(
SwapQueue<ControlMessage>* message_queue,
SwapQueue<std::unique_ptr<rtclog::Event>>* event_queue,
rtc::Event* wake_up,
rtc::Event* file_finished,
const Clock* const clock);
~RtcEventLogHelperThread();
private:
static bool ThreadOutputFunction(void* obj);
void TerminateThread();
bool AppendEventToString(rtclog::Event* event);
void AppendEventToHistory(const rtclog::Event& event);
void LogToMemory();
void StartLogFile();
void LogToFile();
void StopLogFile();
void WriteLog();
// Message queues for passing events to the logging thread.
SwapQueue<ControlMessage>* message_queue_;
SwapQueue<std::unique_ptr<rtclog::Event>>* event_queue_;
// History containing the most recent events (~ 10 s).
RingBuffer<std::unique_ptr<rtclog::Event>> history_;
// History containing all past configuration events.
std::vector<std::unique_ptr<rtclog::Event>> config_history_;
std::unique_ptr<FileWrapper> file_;
rtc::PlatformThread thread_;
int64_t max_size_bytes_;
int64_t written_bytes_;
int64_t start_time_;
int64_t stop_time_;
bool has_recent_event_;
std::unique_ptr<rtclog::Event> most_recent_event_;
// Temporary space for serializing profobuf data.
std::string output_string_;
rtc::Event* wake_up_;
rtc::Event* stopped_;
const Clock* const clock_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcEventLogHelperThread);
};
} // namespace webrtc
#endif // ENABLE_RTC_EVENT_LOG
#endif // WEBRTC_CALL_RTC_EVENT_LOG_HELPER_THREAD_H_

View File

@ -10,6 +10,7 @@
#ifdef ENABLE_RTC_EVENT_LOG
#include <map>
#include <memory>
#include <string>
#include <utility>
@ -224,7 +225,7 @@ void VerifySendStreamConfig(const rtclog::Event& event,
}
void VerifyRtpEvent(const rtclog::Event& event,
bool incoming,
PacketDirection direction,
MediaType media_type,
const uint8_t* header,
size_t header_size,
@ -233,7 +234,7 @@ void VerifyRtpEvent(const rtclog::Event& event,
ASSERT_EQ(rtclog::Event::RTP_EVENT, event.type());
const rtclog::RtpPacket& rtp_packet = event.rtp_packet();
ASSERT_TRUE(rtp_packet.has_incoming());
EXPECT_EQ(incoming, rtp_packet.incoming());
EXPECT_EQ(direction == kIncomingPacket, rtp_packet.incoming());
ASSERT_TRUE(rtp_packet.has_type());
EXPECT_EQ(media_type, GetRuntimeMediaType(rtp_packet.type()));
ASSERT_TRUE(rtp_packet.has_packet_length());
@ -246,7 +247,7 @@ void VerifyRtpEvent(const rtclog::Event& event,
}
void VerifyRtcpEvent(const rtclog::Event& event,
bool incoming,
PacketDirection direction,
MediaType media_type,
const uint8_t* packet,
size_t total_size) {
@ -254,7 +255,7 @@ void VerifyRtcpEvent(const rtclog::Event& event,
ASSERT_EQ(rtclog::Event::RTCP_EVENT, event.type());
const rtclog::RtcpPacket& rtcp_packet = event.rtcp_packet();
ASSERT_TRUE(rtcp_packet.has_incoming());
EXPECT_EQ(incoming, rtcp_packet.incoming());
EXPECT_EQ(direction == kIncomingPacket, rtcp_packet.incoming());
ASSERT_TRUE(rtcp_packet.has_type());
EXPECT_EQ(media_type, GetRuntimeMediaType(rtcp_packet.type()));
ASSERT_TRUE(rtcp_packet.has_packet_data());
@ -292,6 +293,11 @@ void VerifyLogStartEvent(const rtclog::Event& event) {
EXPECT_EQ(rtclog::Event::LOG_START, event.type());
}
void VerifyLogEndEvent(const rtclog::Event& event) {
ASSERT_TRUE(IsValidBasicEvent(event));
EXPECT_EQ(rtclog::Event::LOG_END, event.type());
}
/*
* Bit number i of extension_bitvector is set to indicate the
* presence of extension number i from kExtensionTypes / kExtensionNames.
@ -472,9 +478,12 @@ void LogSessionAndReadBack(size_t rtp_count,
// When log_dumper goes out of scope, it causes the log file to be flushed
// to disk.
{
std::unique_ptr<RtcEventLog> log_dumper(RtcEventLog::Create());
SimulatedClock fake_clock(prng.Rand<uint32_t>());
std::unique_ptr<RtcEventLog> log_dumper(RtcEventLog::Create(&fake_clock));
log_dumper->LogVideoReceiveStreamConfig(receiver_config);
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
log_dumper->LogVideoSendStreamConfig(sender_config);
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
size_t rtcp_index = 1;
size_t playout_index = 1;
size_t bwe_loss_index = 1;
@ -483,6 +492,7 @@ void LogSessionAndReadBack(size_t rtp_count,
(i % 2 == 0) ? kIncomingPacket : kOutgoingPacket,
(i % 3 == 0) ? MediaType::AUDIO : MediaType::VIDEO,
rtp_packets[i - 1].data(), rtp_packets[i - 1].size());
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
if (i * rtcp_count >= rtcp_index * rtp_count) {
log_dumper->LogRtcpPacket(
(rtcp_index % 2 == 0) ? kIncomingPacket : kOutgoingPacket,
@ -490,21 +500,26 @@ void LogSessionAndReadBack(size_t rtp_count,
rtcp_packets[rtcp_index - 1].data(),
rtcp_packets[rtcp_index - 1].size());
rtcp_index++;
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
}
if (i * playout_count >= playout_index * rtp_count) {
log_dumper->LogAudioPlayout(playout_ssrcs[playout_index - 1]);
playout_index++;
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
}
if (i * bwe_loss_count >= bwe_loss_index * rtp_count) {
log_dumper->LogBwePacketLossEvent(
bwe_loss_updates[bwe_loss_index - 1].first,
bwe_loss_updates[bwe_loss_index - 1].second, i);
bwe_loss_index++;
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
}
if (i == rtp_count / 2) {
log_dumper->StartLogging(temp_filename, 10000000);
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
}
}
log_dumper->StopLogging();
}
// Read the generated file from disk.
@ -516,24 +531,66 @@ void LogSessionAndReadBack(size_t rtp_count,
// what we wrote down. For RTCP we log the full packets, but for
// RTP we should only log the header.
const int event_count = config_count + playout_count + bwe_loss_count +
rtcp_count + rtp_count + 1;
rtcp_count + rtp_count + 2;
EXPECT_GE(1000, event_count); // The events must fit in the message queue.
EXPECT_EQ(event_count, parsed_stream.stream_size());
VerifyReceiveStreamConfig(parsed_stream.stream(0), receiver_config);
VerifySendStreamConfig(parsed_stream.stream(1), sender_config);
size_t event_index = config_count;
if (event_count != parsed_stream.stream_size()) {
// Print the expected and actual event types for easier debugging.
std::map<int, size_t> actual_event_counts;
for (size_t i = 0; i < static_cast<size_t>(parsed_stream.stream_size());
i++) {
actual_event_counts[parsed_stream.stream(i).type()]++;
}
printf("Actual events: ");
for (auto kv : actual_event_counts) {
printf("%d_count = %zu, ", kv.first, kv.second);
}
printf("\n");
for (size_t i = 0; i < static_cast<size_t>(parsed_stream.stream_size());
i++) {
printf("%4d ", parsed_stream.stream(i).type());
}
printf("\n");
printf(
"Expected events: rtp_count = %zu, rtcp_count = %zu,"
"playout_count = %zu, bwe_loss_count = %zu\n",
rtp_count, rtcp_count, playout_count, bwe_loss_count);
size_t rtcp_index = 1, playout_index = 1, bwe_loss_index = 1;
printf("strt cfg cfg ");
for (size_t i = 1; i <= rtp_count; i++) {
printf(" rtp ");
if (i * rtcp_count >= rtcp_index * rtp_count) {
printf("rtcp ");
rtcp_index++;
}
if (i * playout_count >= playout_index * rtp_count) {
printf("play ");
playout_index++;
}
if (i * bwe_loss_count >= bwe_loss_index * rtp_count) {
printf("loss ");
bwe_loss_index++;
}
}
printf("\n");
}
VerifyLogStartEvent(parsed_stream.stream(0));
VerifyReceiveStreamConfig(parsed_stream.stream(1), receiver_config);
VerifySendStreamConfig(parsed_stream.stream(2), sender_config);
size_t event_index = config_count + 1;
size_t rtcp_index = 1;
size_t playout_index = 1;
size_t bwe_loss_index = 1;
for (size_t i = 1; i <= rtp_count; i++) {
VerifyRtpEvent(parsed_stream.stream(event_index),
(i % 2 == 0), // Every second packet is incoming.
(i % 2 == 0) ? kIncomingPacket : kOutgoingPacket,
(i % 3 == 0) ? MediaType::AUDIO : MediaType::VIDEO,
rtp_packets[i - 1].data(), rtp_header_sizes[i - 1],
rtp_packets[i - 1].size());
event_index++;
if (i * rtcp_count >= rtcp_index * rtp_count) {
VerifyRtcpEvent(parsed_stream.stream(event_index),
rtcp_index % 2 == 0, // Every second packet is incoming.
(rtcp_index % 2 == 0) ? kIncomingPacket : kOutgoingPacket,
rtcp_index % 3 == 0 ? MediaType::AUDIO : MediaType::VIDEO,
rtcp_packets[rtcp_index - 1].data(),
rtcp_packets[rtcp_index - 1].size());
@ -553,10 +610,6 @@ void LogSessionAndReadBack(size_t rtp_count,
event_index++;
bwe_loss_index++;
}
if (i == rtp_count / 2) {
VerifyLogStartEvent(parsed_stream.stream(event_index));
event_index++;
}
}
// Clean up temporary file - can be pretty slow.
@ -596,39 +649,15 @@ TEST(RtcEventLogTest, LogSessionAndReadBack) {
}
}
// Tests that the event queue works correctly, i.e. drops old RTP, RTCP and
// debug events, but keeps config events even if they are older than the limit.
void DropOldEvents(uint32_t extensions_bitvector,
uint32_t csrcs_count,
unsigned int random_seed) {
rtc::Buffer old_rtp_packet;
rtc::Buffer recent_rtp_packet;
rtc::Buffer old_rtcp_packet;
rtc::Buffer recent_rtcp_packet;
TEST(RtcEventLogTest, LogEventAndReadBack) {
Random prng(987654321);
VideoReceiveStream::Config receiver_config(nullptr);
VideoSendStream::Config sender_config(nullptr);
Random prng(random_seed);
// Create two RTP packets containing random data.
// Create one RTP and one RTCP packet containing random data.
size_t packet_size = prng.Rand(1000, 1100);
old_rtp_packet.SetSize(packet_size);
GenerateRtpPacket(extensions_bitvector, csrcs_count, old_rtp_packet.data(),
packet_size, &prng);
packet_size = prng.Rand(1000, 1100);
recent_rtp_packet.SetSize(packet_size);
size_t recent_header_size =
GenerateRtpPacket(extensions_bitvector, csrcs_count,
recent_rtp_packet.data(), packet_size, &prng);
// Create two RTCP packets containing random data.
old_rtcp_packet = GenerateRtcpPacket(&prng);
recent_rtcp_packet = GenerateRtcpPacket(&prng);
// Create configurations for the video streams.
GenerateVideoReceiveConfig(extensions_bitvector, &receiver_config, &prng);
GenerateVideoSendConfig(extensions_bitvector, &sender_config, &prng);
rtc::Buffer rtp_packet(packet_size);
size_t header_size =
GenerateRtpPacket(0, 0, rtp_packet.data(), packet_size, &prng);
rtc::Buffer rtcp_packet = GenerateRtcpPacket(&prng);
// Find the name of the current test, in order to use it as a temporary
// filename.
@ -636,58 +665,44 @@ void DropOldEvents(uint32_t extensions_bitvector,
const std::string temp_filename =
test::OutputPath() + test_info->test_case_name() + test_info->name();
// The log file will be flushed to disk when the log_dumper goes out of scope.
{
std::unique_ptr<RtcEventLog> log_dumper(RtcEventLog::Create());
// Reduce the time old events are stored to 50 ms.
log_dumper->SetBufferDuration(50000);
log_dumper->LogVideoReceiveStreamConfig(receiver_config);
log_dumper->LogVideoSendStreamConfig(sender_config);
log_dumper->LogRtpHeader(kOutgoingPacket, MediaType::AUDIO,
old_rtp_packet.data(), old_rtp_packet.size());
log_dumper->LogRtcpPacket(kIncomingPacket, MediaType::AUDIO,
old_rtcp_packet.data(),
old_rtcp_packet.size());
// Sleep 55 ms to let old events be removed from the queue.
rtc::Thread::SleepMs(55);
log_dumper->StartLogging(temp_filename, 10000000);
log_dumper->LogRtpHeader(kIncomingPacket, MediaType::VIDEO,
recent_rtp_packet.data(),
recent_rtp_packet.size());
log_dumper->LogRtcpPacket(kOutgoingPacket, MediaType::VIDEO,
recent_rtcp_packet.data(),
recent_rtcp_packet.size());
}
// Add RTP, start logging, add RTCP and then stop logging
SimulatedClock fake_clock(prng.Rand<uint32_t>());
std::unique_ptr<RtcEventLog> log_dumper(RtcEventLog::Create(&fake_clock));
log_dumper->LogRtpHeader(kIncomingPacket, MediaType::VIDEO, rtp_packet.data(),
rtp_packet.size());
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
log_dumper->StartLogging(temp_filename, 10000000);
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
log_dumper->LogRtcpPacket(kOutgoingPacket, MediaType::VIDEO,
rtcp_packet.data(), rtcp_packet.size());
fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000));
log_dumper->StopLogging();
// Read the generated file from disk.
rtclog::EventStream parsed_stream;
ASSERT_TRUE(RtcEventLog::ParseRtcEventLog(temp_filename, &parsed_stream));
// Verify that what we read back from the event log is the same as
// what we wrote. Old RTP and RTCP events should have been discarded,
// but old configuration events should still be available.
EXPECT_EQ(5, parsed_stream.stream_size());
VerifyReceiveStreamConfig(parsed_stream.stream(0), receiver_config);
VerifySendStreamConfig(parsed_stream.stream(1), sender_config);
VerifyLogStartEvent(parsed_stream.stream(2));
VerifyRtpEvent(parsed_stream.stream(3), true, MediaType::VIDEO,
recent_rtp_packet.data(), recent_header_size,
recent_rtp_packet.size());
VerifyRtcpEvent(parsed_stream.stream(4), false, MediaType::VIDEO,
recent_rtcp_packet.data(), recent_rtcp_packet.size());
// what we wrote down.
EXPECT_EQ(4, parsed_stream.stream_size());
VerifyLogStartEvent(parsed_stream.stream(0));
VerifyRtpEvent(parsed_stream.stream(1), kIncomingPacket, MediaType::VIDEO,
rtp_packet.data(), header_size, rtp_packet.size());
VerifyRtcpEvent(parsed_stream.stream(2), kOutgoingPacket, MediaType::VIDEO,
rtcp_packet.data(), rtcp_packet.size());
VerifyLogEndEvent(parsed_stream.stream(3));
// Clean up temporary file - can be pretty slow.
remove(temp_filename.c_str());
}
TEST(RtcEventLogTest, DropOldEvents) {
// Enable all header extensions
uint32_t extensions = (1u << kNumExtensions) - 1;
uint32_t csrcs_count = 2;
DropOldEvents(extensions, csrcs_count, 141421356);
DropOldEvents(extensions, csrcs_count, 173205080);
}
} // namespace webrtc
#endif // ENABLE_RTC_EVENT_LOG

View File

@ -49,7 +49,7 @@ ChannelManager::ChannelManager(uint32_t instance_id, const Config& config)
: instance_id_(instance_id),
last_channel_id_(-1),
config_(config),
event_log_(RtcEventLog::Create()) {}
event_log_(RtcEventLog::Create(Clock::GetRealTimeClock())) {}
ChannelOwner ChannelManager::CreateChannel() {
return CreateChannelInternal(config_);

View File

@ -141,6 +141,8 @@
'sources': [
'call/rtc_event_log.cc',
'call/rtc_event_log.h',
'call/rtc_event_log_helper_thread.cc',
'call/rtc_event_log_helper_thread.h',
],
'conditions': [
# If enable_protobuf is defined, we want to compile the protobuf

View File

@ -161,6 +161,7 @@
'call/bitrate_estimator_tests.cc',
'call/call_unittest.cc',
'call/packet_injection_tests.cc',
'call/ringbuffer_unittest.cc',
'test/common_unittest.cc',
'test/testsupport/metrics/video_metrics_unittest.cc',
'video/call_stats_unittest.cc',