From 4311ba59d8186400b892eafff2b71529a7e25a85 Mon Sep 17 00:00:00 2001 From: terelius Date: Fri, 22 Apr 2016 12:40:37 -0700 Subject: [PATCH] Refactored CL for moving the output to a separate thread. The logging thread is always active. The main thread uses SwapQueues to pass events to the logging thread. The logging thread moves the events to either a RingBuffer history in memory, or to a string which is written to disc. RtcEventLogImpl constructor takes a clock for easier testing. BUG=webrtc:4741 Review URL: https://codereview.webrtc.org/1687703002 Cr-Commit-Position: refs/heads/master@{#12476} --- webrtc/BUILD.gn | 2 + webrtc/call/mock/mock_rtc_event_log.h | 7 +- webrtc/call/ringbuffer.h | 99 ++++++ webrtc/call/ringbuffer_unittest.cc | 170 ++++++++++ webrtc/call/rtc_event_log.cc | 377 +++++++++------------ webrtc/call/rtc_event_log.h | 48 ++- webrtc/call/rtc_event_log_helper_thread.cc | 285 ++++++++++++++++ webrtc/call/rtc_event_log_helper_thread.h | 123 +++++++ webrtc/call/rtc_event_log_unittest.cc | 191 ++++++----- webrtc/voice_engine/channel_manager.cc | 2 +- webrtc/webrtc.gyp | 2 + webrtc/webrtc_tests.gypi | 1 + 12 files changed, 977 insertions(+), 330 deletions(-) create mode 100644 webrtc/call/ringbuffer.h create mode 100644 webrtc/call/ringbuffer_unittest.cc create mode 100644 webrtc/call/rtc_event_log_helper_thread.cc create mode 100644 webrtc/call/rtc_event_log_helper_thread.h diff --git a/webrtc/BUILD.gn b/webrtc/BUILD.gn index 03785fba71..50df44a967 100644 --- a/webrtc/BUILD.gn +++ b/webrtc/BUILD.gn @@ -259,6 +259,8 @@ source_set("rtc_event_log") { sources = [ "call/rtc_event_log.cc", "call/rtc_event_log.h", + "call/rtc_event_log_helper_thread.cc", + "call/rtc_event_log_helper_thread.h", ] defines = [] diff --git a/webrtc/call/mock/mock_rtc_event_log.h b/webrtc/call/mock/mock_rtc_event_log.h index f523105d0e..8ca73a3dc7 100644 --- a/webrtc/call/mock/mock_rtc_event_log.h +++ b/webrtc/call/mock/mock_rtc_event_log.h @@ -21,12 +21,11 @@ namespace webrtc { class MockRtcEventLog : public RtcEventLog { public: - MOCK_METHOD1(SetBufferDuration, void(int64_t buffer_duration_us)); + MOCK_METHOD2(StartLogging, + bool(const std::string& file_name, int64_t max_size_bytes)); MOCK_METHOD2(StartLogging, - void(const std::string& file_name, int duration_ms)); - - MOCK_METHOD1(StartLogging, bool(rtc::PlatformFile log_file)); + bool(rtc::PlatformFile log_file, int64_t max_size_bytes)); MOCK_METHOD0(StopLogging, void()); diff --git a/webrtc/call/ringbuffer.h b/webrtc/call/ringbuffer.h new file mode 100644 index 0000000000..7ff8215461 --- /dev/null +++ b/webrtc/call/ringbuffer.h @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + * + */ + +#ifndef WEBRTC_CALL_RINGBUFFER_H_ +#define WEBRTC_CALL_RINGBUFFER_H_ + +#include +#include + +#include "webrtc/base/checks.h" + +namespace webrtc { + +// A RingBuffer works like a fixed size queue which starts discarding +// the oldest elements when it becomes full. +template +class RingBuffer { + public: + // Creates a RingBuffer with space for |capacity| elements. + explicit RingBuffer(size_t capacity) + : // We allocate space for one extra sentinel element. + data_(new T[capacity + 1]) { + RTC_DCHECK(capacity > 0); + end_ = data_.get() + (capacity + 1); + front_ = data_.get(); + back_ = data_.get(); + } + + ~RingBuffer() { + // The unique_ptr will free the memory. + } + + // Removes an element from the front of the queue. + void pop_front() { + RTC_DCHECK(!empty()); + ++front_; + if (front_ == end_) { + front_ = data_.get(); + } + } + + // Appends an element to the back of the queue (and removes an + // element from the front if there is no space at the back of the queue). + void push_back(const T& elem) { + *back_ = elem; + ++back_; + if (back_ == end_) { + back_ = data_.get(); + } + if (back_ == front_) { + ++front_; + } + if (front_ == end_) { + front_ = data_.get(); + } + } + + // Appends an element to the back of the queue (and removes an + // element from the front if there is no space at the back of the queue). + void push_back(T&& elem) { + *back_ = std::move(elem); + ++back_; + if (back_ == end_) { + back_ = data_.get(); + } + if (back_ == front_) { + ++front_; + } + if (front_ == end_) { + front_ = data_.get(); + } + } + + T& front() { return *front_; } + + const T& front() const { return *front_; } + + bool empty() const { return (front_ == back_); } + + private: + std::unique_ptr data_; + T* end_; + T* front_; + T* back_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RingBuffer); +}; + +} // namespace webrtc + +#endif // WEBRTC_CALL_RINGBUFFER_H_ diff --git a/webrtc/call/ringbuffer_unittest.cc b/webrtc/call/ringbuffer_unittest.cc new file mode 100644 index 0000000000..e2b5a41461 --- /dev/null +++ b/webrtc/call/ringbuffer_unittest.cc @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + * + */ + +#include + +#include "testing/gtest/include/gtest/gtest.h" +#include "webrtc/base/random.h" +#include "webrtc/call/ringbuffer.h" + +namespace { +template +class MovableType { + public: + MovableType() : value_(), moved_from_(false), moved_to_(false) {} + explicit MovableType(T value) + : value_(value), moved_from_(false), moved_to_(false) {} + MovableType(const MovableType& other) + : value_(other.value_), moved_from_(false), moved_to_(false) {} + MovableType(MovableType&& other) + : value_(other.value_), moved_from_(false), moved_to_(true) { + other.moved_from_ = true; + } + + MovableType& operator=(const MovableType& other) { + value_ = other.value_; + moved_from_ = false; + moved_to_ = false; + return *this; + } + + MovableType& operator=(MovableType&& other) { + value_ = other.value_; + moved_from_ = false; + moved_to_ = true; + other.moved_from_ = true; + return *this; + } + + T Value() { return value_; } + bool IsMovedFrom() { return moved_from_; } + bool IsMovedTo() { return moved_to_; } + + private: + T value_; + bool moved_from_; + bool moved_to_; +}; + +} // namespace + +namespace webrtc { + +// Verify that the ring buffer works as a simple queue. +TEST(RingBufferTest, SimpleQueue) { + size_t capacity = 100; + RingBuffer q(capacity); + EXPECT_TRUE(q.empty()); + for (size_t i = 0; i < capacity; i++) { + q.push_back(i); + EXPECT_FALSE(q.empty()); + } + + for (size_t i = 0; i < capacity; i++) { + EXPECT_FALSE(q.empty()); + EXPECT_EQ(i, q.front()); + q.pop_front(); + } + EXPECT_TRUE(q.empty()); +} + +// Do a "random" sequence of queue operations and verify that the +// result is consistent with the same operation performed on a std::list. +TEST(RingBufferTest, ConsistentWithStdList) { + Random prng(987654321ull); + size_t capacity = 10; + RingBuffer q(capacity); + std::list l; + EXPECT_TRUE(q.empty()); + for (size_t i = 0; i < 100 * capacity; i++) { + bool insert = prng.Rand(); + if ((insert && l.size() < capacity) || l.size() == 0) { + int x = prng.Rand(); + l.push_back(x); + q.push_back(x); + EXPECT_FALSE(q.empty()); + } else { + EXPECT_FALSE(q.empty()); + EXPECT_EQ(l.front(), q.front()); + l.pop_front(); + q.pop_front(); + } + } + while (!l.empty()) { + EXPECT_FALSE(q.empty()); + EXPECT_EQ(l.front(), q.front()); + l.pop_front(); + q.pop_front(); + } + EXPECT_TRUE(q.empty()); +} + +// Test that the ringbuffer starts reusing elements from the front +// when the queue becomes full. +TEST(RingBufferTest, OverwriteOldElements) { + size_t capacity = 100; + size_t insertions = 3 * capacity + 25; + RingBuffer q(capacity); + EXPECT_TRUE(q.empty()); + for (size_t i = 0; i < insertions; i++) { + q.push_back(i); + EXPECT_FALSE(q.empty()); + } + + for (size_t i = insertions - capacity; i < insertions; i++) { + EXPECT_FALSE(q.empty()); + EXPECT_EQ(i, q.front()); + q.pop_front(); + } + EXPECT_TRUE(q.empty()); +} + +// Test that the ringbuffer uses std::move when pushing an rvalue reference. +TEST(RingBufferTest, MoveSemanticsForPushBack) { + size_t capacity = 100; + size_t insertions = 3 * capacity + 25; + RingBuffer> q(capacity); + EXPECT_TRUE(q.empty()); + for (size_t i = 0; i < insertions; i++) { + MovableType tmp(i); + EXPECT_FALSE(tmp.IsMovedFrom()); + EXPECT_FALSE(tmp.IsMovedTo()); + q.push_back(std::move(tmp)); + EXPECT_TRUE(tmp.IsMovedFrom()); + EXPECT_FALSE(tmp.IsMovedTo()); + EXPECT_FALSE(q.empty()); + } + + for (size_t i = insertions - capacity; i < insertions; i++) { + EXPECT_FALSE(q.empty()); + EXPECT_EQ(i, q.front().Value()); + EXPECT_FALSE(q.front().IsMovedFrom()); + EXPECT_TRUE(q.front().IsMovedTo()); + q.pop_front(); + } + EXPECT_TRUE(q.empty()); +} + +TEST(RingBufferTest, SmallCapacity) { + size_t capacity = 1; + RingBuffer q(capacity); + EXPECT_TRUE(q.empty()); + q.push_back(4711); + EXPECT_FALSE(q.empty()); + EXPECT_EQ(4711, q.front()); + q.push_back(1024); + EXPECT_FALSE(q.empty()); + EXPECT_EQ(1024, q.front()); + q.pop_front(); + EXPECT_TRUE(q.empty()); +} + +} // namespace webrtc diff --git a/webrtc/call/rtc_event_log.cc b/webrtc/call/rtc_event_log.cc index ce4d6ef39f..c929bc7e57 100644 --- a/webrtc/call/rtc_event_log.cc +++ b/webrtc/call/rtc_event_log.cc @@ -10,18 +10,22 @@ #include "webrtc/call/rtc_event_log.h" -#include +#include #include #include "webrtc/base/checks.h" -#include "webrtc/base/criticalsection.h" -#include "webrtc/base/thread_annotations.h" +#include "webrtc/base/constructormagic.h" +#include "webrtc/base/event.h" +#include "webrtc/base/swap_queue.h" +#include "webrtc/base/thread_checker.h" #include "webrtc/call.h" +#include "webrtc/call/rtc_event_log_helper_thread.h" #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "webrtc/modules/rtp_rtcp/source/byte_io.h" #include "webrtc/modules/rtp_rtcp/source/rtcp_utility.h" #include "webrtc/system_wrappers/include/clock.h" #include "webrtc/system_wrappers/include/file_wrapper.h" +#include "webrtc/system_wrappers/include/logging.h" #ifdef ENABLE_RTC_EVENT_LOG // Files generated at build-time by the protobuf compiler. @@ -37,12 +41,17 @@ namespace webrtc { #ifndef ENABLE_RTC_EVENT_LOG // No-op implementation if flag is not set. -class RtcEventLogImpl final : public RtcEventLog { +class RtcEventLogNullImpl final : public RtcEventLog { public: - void SetBufferDuration(int64_t buffer_duration_us) override {} - void StartLogging(const std::string& file_name, int duration_ms) override {} - bool StartLogging(rtc::PlatformFile log_file) override { return false; } - void StopLogging(void) override {} + bool StartLogging(const std::string& file_name, + int64_t max_size_bytes) override { + return false; + } + bool StartLogging(rtc::PlatformFile platform_file, + int64_t max_size_bytes) override { + return false; + } + void StopLogging() override {} void LogVideoReceiveStreamConfig( const VideoReceiveStream::Config& config) override {} void LogVideoSendStreamConfig( @@ -65,11 +74,13 @@ class RtcEventLogImpl final : public RtcEventLog { class RtcEventLogImpl final : public RtcEventLog { public: - RtcEventLogImpl(); + explicit RtcEventLogImpl(const Clock* clock); + ~RtcEventLogImpl() override; - void SetBufferDuration(int64_t buffer_duration_us) override; - void StartLogging(const std::string& file_name, int duration_ms) override; - bool StartLogging(rtc::PlatformFile log_file) override; + bool StartLogging(const std::string& file_name, + int64_t max_size_bytes) override; + bool StartLogging(rtc::PlatformFile platform_file, + int64_t max_size_bytes) override; void StopLogging() override; void LogVideoReceiveStreamConfig( const VideoReceiveStream::Config& config) override; @@ -88,37 +99,21 @@ class RtcEventLogImpl final : public RtcEventLog { int32_t total_packets) override; private: - // Starts logging. This function assumes the file_ has been opened succesfully - // and that the start_time_us_ and _duration_us_ have been set. - void StartLoggingLocked() EXCLUSIVE_LOCKS_REQUIRED(crit_); - // Stops logging and clears the stored data and buffers. - void StopLoggingLocked() EXCLUSIVE_LOCKS_REQUIRED(crit_); - // Adds a new event to the logfile if logging is active, or adds it to the - // list of recent log events otherwise. - void HandleEvent(rtclog::Event* event) EXCLUSIVE_LOCKS_REQUIRED(crit_); - // Writes the event to the file. Note that this will destroy the state of the - // input argument. - void StoreToFile(rtclog::Event* event) EXCLUSIVE_LOCKS_REQUIRED(crit_); - // Adds the event to the list of recent events, and removes any events that - // are too old and no longer fall in the time window. - void AddRecentEvent(const rtclog::Event& event) - EXCLUSIVE_LOCKS_REQUIRED(crit_); + // Message queue for passing control messages to the logging thread. + SwapQueue message_queue_; - rtc::CriticalSection crit_; - std::unique_ptr file_ GUARDED_BY(crit_) = - std::unique_ptr(FileWrapper::Create()); - rtc::PlatformFile platform_file_ GUARDED_BY(crit_) = - rtc::kInvalidPlatformFileValue; - rtclog::EventStream stream_ GUARDED_BY(crit_); - std::deque recent_log_events_ GUARDED_BY(crit_); - std::vector config_events_ GUARDED_BY(crit_); + // Message queue for passing events to the logging thread. + SwapQueue > event_queue_; + + rtc::Event wake_up_; + rtc::Event stopped_; - // Microseconds to record log events, before starting the actual log. - int64_t buffer_duration_us_ GUARDED_BY(crit_); - bool currently_logging_ GUARDED_BY(crit_); - int64_t start_time_us_ GUARDED_BY(crit_); - int64_t duration_us_ GUARDED_BY(crit_); const Clock* const clock_; + + RtcEventLogHelperThread helper_thread_; + rtc::ThreadChecker thread_checker_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcEventLogImpl); }; namespace { @@ -126,10 +121,6 @@ namespace { // that the rest of the WebRtc project can use, to the corresponding // serialized enum which is defined by the protobuf. -// Do not add default return values to the conversion functions in this -// unnamed namespace. The intention is to make the compiler warn if anyone -// adds unhandled new events/modes/etc. - rtclog::VideoReceiveConfig_RtcpMode ConvertRtcpMode(RtcpMode rtcp_mode) { switch (rtcp_mode) { case RtcpMode::kCompound: @@ -159,114 +150,104 @@ rtclog::MediaType ConvertMediaType(MediaType media_type) { return rtclog::ANY; } -} // namespace - -namespace { -bool IsConfigEvent(const rtclog::Event& event) { - rtclog::Event_EventType event_type = event.type(); - return event_type == rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT || - event_type == rtclog::Event::VIDEO_SENDER_CONFIG_EVENT || - event_type == rtclog::Event::AUDIO_RECEIVER_CONFIG_EVENT || - event_type == rtclog::Event::AUDIO_SENDER_CONFIG_EVENT; -} +// The RTP and RTCP buffers reserve space for twice the expected number of +// sent packets because they also contain received packets. +static const int kEventsPerSecond = 1000; +static const int kControlMessagesPerSecond = 10; } // namespace // RtcEventLogImpl member functions. -RtcEventLogImpl::RtcEventLogImpl() - : file_(FileWrapper::Create()), - stream_(), - buffer_duration_us_(10000000), - currently_logging_(false), - start_time_us_(0), - duration_us_(0), - clock_(Clock::GetRealTimeClock()) { +RtcEventLogImpl::RtcEventLogImpl(const Clock* clock) + // Allocate buffers for roughly one second of history. + : message_queue_(kControlMessagesPerSecond), + event_queue_(kEventsPerSecond), + wake_up_(false, false), + stopped_(false, false), + clock_(clock), + helper_thread_(&message_queue_, + &event_queue_, + &wake_up_, + &stopped_, + clock), + thread_checker_() { + thread_checker_.DetachFromThread(); } -void RtcEventLogImpl::SetBufferDuration(int64_t buffer_duration_us) { - rtc::CritScope lock(&crit_); - buffer_duration_us_ = buffer_duration_us; +RtcEventLogImpl::~RtcEventLogImpl() { + // The RtcEventLogHelperThread destructor closes the file + // and waits for the thread to terminate. } -void RtcEventLogImpl::StartLogging(const std::string& file_name, - int duration_ms) { - rtc::CritScope lock(&crit_); - if (currently_logging_) { - StopLoggingLocked(); - } - if (file_->OpenFile(file_name.c_str(), false) != 0) { - return; - } - start_time_us_ = clock_->TimeInMicroseconds(); - duration_us_ = static_cast(duration_ms) * 1000; - StartLoggingLocked(); -} - -bool RtcEventLogImpl::StartLogging(rtc::PlatformFile log_file) { - rtc::CritScope lock(&crit_); - - if (currently_logging_) { - StopLoggingLocked(); - } - RTC_DCHECK(platform_file_ == rtc::kInvalidPlatformFileValue); - - FILE* file_stream = rtc::FdopenPlatformFileForWriting(log_file); - if (!file_stream) { - rtc::ClosePlatformFile(log_file); +bool RtcEventLogImpl::StartLogging(const std::string& file_name, + int64_t max_size_bytes) { + RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RtcEventLogHelperThread::ControlMessage message; + message.message_type = RtcEventLogHelperThread::ControlMessage::START_FILE; + message.max_size_bytes = max_size_bytes; + message.start_time = clock_->TimeInMicroseconds(); + message.stop_time = std::numeric_limits::max(); + message.file.reset(FileWrapper::Create()); + if (message.file->OpenFile(file_name.c_str(), false) != 0) { return false; } - - if (file_->OpenFromFileHandle(file_stream, true, false) != 0) { - rtc::ClosePlatformFile(log_file); + if (!message_queue_.Insert(&message)) { + LOG(LS_WARNING) << "Message queue full. Can't start logging."; return false; } - platform_file_ = log_file; - // Set the start time and duration to keep logging for 10 minutes. - start_time_us_ = clock_->TimeInMicroseconds(); - duration_us_ = 10 * 60 * 1000000; - StartLoggingLocked(); return true; } -void RtcEventLogImpl::StartLoggingLocked() { - currently_logging_ = true; - - // Write all old configuration events to the log file. - for (auto& event : config_events_) { - StoreToFile(&event); +bool RtcEventLogImpl::StartLogging(rtc::PlatformFile platform_file, + int64_t max_size_bytes) { + RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RtcEventLogHelperThread::ControlMessage message; + message.message_type = RtcEventLogHelperThread::ControlMessage::START_FILE; + message.max_size_bytes = max_size_bytes; + message.start_time = clock_->TimeInMicroseconds(); + message.stop_time = std::numeric_limits::max(); + message.file.reset(FileWrapper::Create()); + FILE* file_handle = rtc::FdopenPlatformFileForWriting(platform_file); + if (!file_handle) { + return false; } - // Write all recent configuration events to the log file, and - // write all other recent events to the log file, ignoring any old events. - for (auto& event : recent_log_events_) { - if (IsConfigEvent(event)) { - StoreToFile(&event); - config_events_.push_back(event); - } else if (event.timestamp_us() >= start_time_us_ - buffer_duration_us_) { - StoreToFile(&event); - } + if (message.file->OpenFromFileHandle(file_handle, true, false) != 0) { + return false; } - recent_log_events_.clear(); - // Write a LOG_START event to the file. - rtclog::Event start_event; - start_event.set_timestamp_us(start_time_us_); - start_event.set_type(rtclog::Event::LOG_START); - StoreToFile(&start_event); + if (!message_queue_.Insert(&message)) { + LOG(LS_WARNING) << "Message queue full. Can't start logging."; + return false; + } + return true; } void RtcEventLogImpl::StopLogging() { - rtc::CritScope lock(&crit_); - StopLoggingLocked(); + RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RtcEventLogHelperThread::ControlMessage message; + message.message_type = RtcEventLogHelperThread::ControlMessage::STOP_FILE; + message.stop_time = clock_->TimeInMicroseconds(); + while (!message_queue_.Insert(&message)) { + // TODO(terelius): We would like to have a blocking Insert function in the + // SwapQueue, but for the time being we will just clear any previous + // messages. + // Since StopLogging waits for the thread, it is essential that we don't + // clear any STOP_FILE messages. To ensure that there is only one call at a + // time, we require that all calls to StopLogging are made on the same + // thread. + LOG(LS_WARNING) << "Message queue full. Clearing queue to stop logging."; + message_queue_.Clear(); + } + wake_up_.Set(); // Request the output thread to wake up. + stopped_.Wait(rtc::Event::kForever); // Wait for the log to stop. } void RtcEventLogImpl::LogVideoReceiveStreamConfig( const VideoReceiveStream::Config& config) { - rtc::CritScope lock(&crit_); - - rtclog::Event event; - event.set_timestamp_us(clock_->TimeInMicroseconds()); - event.set_type(rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT); + std::unique_ptr event(new rtclog::Event()); + event->set_timestamp_us(clock_->TimeInMicroseconds()); + event->set_type(rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT); rtclog::VideoReceiveConfig* receiver_config = - event.mutable_video_receiver_config(); + event->mutable_video_receiver_config(); receiver_config->set_remote_ssrc(config.rtp.remote_ssrc); receiver_config->set_local_ssrc(config.rtp.local_ssrc); @@ -292,18 +273,18 @@ void RtcEventLogImpl::LogVideoReceiveStreamConfig( decoder->set_name(d.payload_name); decoder->set_payload_type(d.payload_type); } - HandleEvent(&event); + if (!event_queue_.Insert(&event)) { + LOG(LS_WARNING) << "Config queue full. Not logging config event."; + } } void RtcEventLogImpl::LogVideoSendStreamConfig( const VideoSendStream::Config& config) { - rtc::CritScope lock(&crit_); + std::unique_ptr event(new rtclog::Event()); + event->set_timestamp_us(clock_->TimeInMicroseconds()); + event->set_type(rtclog::Event::VIDEO_SENDER_CONFIG_EVENT); - rtclog::Event event; - event.set_timestamp_us(clock_->TimeInMicroseconds()); - event.set_type(rtclog::Event::VIDEO_SENDER_CONFIG_EVENT); - - rtclog::VideoSendConfig* sender_config = event.mutable_video_sender_config(); + rtclog::VideoSendConfig* sender_config = event->mutable_video_sender_config(); for (const auto& ssrc : config.rtp.ssrcs) { sender_config->add_ssrcs(ssrc); @@ -324,7 +305,9 @@ void RtcEventLogImpl::LogVideoSendStreamConfig( rtclog::EncoderConfig* encoder = sender_config->mutable_encoder(); encoder->set_name(config.encoder_settings.payload_name); encoder->set_payload_type(config.encoder_settings.payload_type); - HandleEvent(&event); + if (!event_queue_.Insert(&event)) { + LOG(LS_WARNING) << "Config queue full. Not logging config event."; + } } void RtcEventLogImpl::LogRtpHeader(PacketDirection direction, @@ -347,27 +330,27 @@ void RtcEventLogImpl::LogRtpHeader(PacketDirection direction, header_length += (x_len + 1) * 4; } - rtc::CritScope lock(&crit_); - rtclog::Event rtp_event; - rtp_event.set_timestamp_us(clock_->TimeInMicroseconds()); - rtp_event.set_type(rtclog::Event::RTP_EVENT); - rtp_event.mutable_rtp_packet()->set_incoming(direction == kIncomingPacket); - rtp_event.mutable_rtp_packet()->set_type(ConvertMediaType(media_type)); - rtp_event.mutable_rtp_packet()->set_packet_length(packet_length); - rtp_event.mutable_rtp_packet()->set_header(header, header_length); - HandleEvent(&rtp_event); + std::unique_ptr rtp_event(new rtclog::Event()); + rtp_event->set_timestamp_us(clock_->TimeInMicroseconds()); + rtp_event->set_type(rtclog::Event::RTP_EVENT); + rtp_event->mutable_rtp_packet()->set_incoming(direction == kIncomingPacket); + rtp_event->mutable_rtp_packet()->set_type(ConvertMediaType(media_type)); + rtp_event->mutable_rtp_packet()->set_packet_length(packet_length); + rtp_event->mutable_rtp_packet()->set_header(header, header_length); + if (!event_queue_.Insert(&rtp_event)) { + LOG(LS_WARNING) << "RTP queue full. Not logging RTP packet."; + } } void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction, MediaType media_type, const uint8_t* packet, size_t length) { - rtc::CritScope lock(&crit_); - rtclog::Event rtcp_event; - rtcp_event.set_timestamp_us(clock_->TimeInMicroseconds()); - rtcp_event.set_type(rtclog::Event::RTCP_EVENT); - rtcp_event.mutable_rtcp_packet()->set_incoming(direction == kIncomingPacket); - rtcp_event.mutable_rtcp_packet()->set_type(ConvertMediaType(media_type)); + std::unique_ptr rtcp_event(new rtclog::Event()); + rtcp_event->set_timestamp_us(clock_->TimeInMicroseconds()); + rtcp_event->set_type(rtclog::Event::RTCP_EVENT); + rtcp_event->mutable_rtcp_packet()->set_incoming(direction == kIncomingPacket); + rtcp_event->mutable_rtcp_packet()->set_type(ConvertMediaType(media_type)); RTCPUtility::RtcpCommonHeader header; const uint8_t* block_begin = packet; @@ -413,87 +396,35 @@ void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction, block_begin += block_size; } - rtcp_event.mutable_rtcp_packet()->set_packet_data(buffer, buffer_length); - HandleEvent(&rtcp_event); + rtcp_event->mutable_rtcp_packet()->set_packet_data(buffer, buffer_length); + if (!event_queue_.Insert(&rtcp_event)) { + LOG(LS_WARNING) << "RTCP queue full. Not logging RTCP packet."; + } } void RtcEventLogImpl::LogAudioPlayout(uint32_t ssrc) { - rtc::CritScope lock(&crit_); - rtclog::Event event; - event.set_timestamp_us(clock_->TimeInMicroseconds()); - event.set_type(rtclog::Event::AUDIO_PLAYOUT_EVENT); - auto playout_event = event.mutable_audio_playout_event(); + std::unique_ptr event(new rtclog::Event()); + event->set_timestamp_us(clock_->TimeInMicroseconds()); + event->set_type(rtclog::Event::AUDIO_PLAYOUT_EVENT); + auto playout_event = event->mutable_audio_playout_event(); playout_event->set_local_ssrc(ssrc); - HandleEvent(&event); + if (!event_queue_.Insert(&event)) { + LOG(LS_WARNING) << "Playout queue full. Not logging ACM playout."; + } } void RtcEventLogImpl::LogBwePacketLossEvent(int32_t bitrate, uint8_t fraction_loss, int32_t total_packets) { - rtc::CritScope lock(&crit_); - rtclog::Event event; - event.set_timestamp_us(clock_->TimeInMicroseconds()); - event.set_type(rtclog::Event::BWE_PACKET_LOSS_EVENT); - auto bwe_event = event.mutable_bwe_packet_loss_event(); + std::unique_ptr event(new rtclog::Event()); + event->set_timestamp_us(clock_->TimeInMicroseconds()); + event->set_type(rtclog::Event::BWE_PACKET_LOSS_EVENT); + auto bwe_event = event->mutable_bwe_packet_loss_event(); bwe_event->set_bitrate(bitrate); bwe_event->set_fraction_loss(fraction_loss); bwe_event->set_total_packets(total_packets); - HandleEvent(&event); -} - -void RtcEventLogImpl::StopLoggingLocked() { - if (currently_logging_) { - currently_logging_ = false; - // Create a LogEnd event - rtclog::Event event; - event.set_timestamp_us(clock_->TimeInMicroseconds()); - event.set_type(rtclog::Event::LOG_END); - // Store the event and close the file - RTC_DCHECK(file_->Open()); - StoreToFile(&event); - file_->CloseFile(); - if (platform_file_ != rtc::kInvalidPlatformFileValue) { - rtc::ClosePlatformFile(platform_file_); - platform_file_ = rtc::kInvalidPlatformFileValue; - } - } - RTC_DCHECK(!file_->Open()); - stream_.Clear(); -} - -void RtcEventLogImpl::HandleEvent(rtclog::Event* event) { - if (currently_logging_) { - if (clock_->TimeInMicroseconds() < start_time_us_ + duration_us_) { - StoreToFile(event); - return; - } - StopLoggingLocked(); - } - AddRecentEvent(*event); -} - -void RtcEventLogImpl::StoreToFile(rtclog::Event* event) { - // Reuse the same object at every log event. - if (stream_.stream_size() < 1) { - stream_.add_stream(); - } - RTC_DCHECK_EQ(stream_.stream_size(), 1); - stream_.mutable_stream(0)->Swap(event); - // TODO(terelius): Doesn't this create a new EventStream per event? - // Is this guaranteed to work e.g. in future versions of protobuf? - std::string dump_buffer; - stream_.SerializeToString(&dump_buffer); - file_->Write(dump_buffer.data(), dump_buffer.size()); -} - -void RtcEventLogImpl::AddRecentEvent(const rtclog::Event& event) { - recent_log_events_.push_back(event); - while (recent_log_events_.front().timestamp_us() < - event.timestamp_us() - buffer_duration_us_) { - if (IsConfigEvent(recent_log_events_.front())) { - config_events_.push_back(recent_log_events_.front()); - } - recent_log_events_.pop_front(); + if (!event_queue_.Insert(&event)) { + LOG(LS_WARNING) << "BWE loss queue full. Not logging BWE update."; } } @@ -516,8 +447,12 @@ bool RtcEventLog::ParseRtcEventLog(const std::string& file_name, #endif // ENABLE_RTC_EVENT_LOG // RtcEventLog member functions. -std::unique_ptr RtcEventLog::Create() { - return std::unique_ptr(new RtcEventLogImpl()); +std::unique_ptr RtcEventLog::Create(const Clock* clock) { +#ifdef ENABLE_RTC_EVENT_LOG + return std::unique_ptr(new RtcEventLogImpl(clock)); +#else + return std::unique_ptr(new RtcEventLogNullImpl()); +#endif // ENABLE_RTC_EVENT_LOG } } // namespace webrtc diff --git a/webrtc/call/rtc_event_log.h b/webrtc/call/rtc_event_log.h index 518308bf2d..bea57b01cd 100644 --- a/webrtc/call/rtc_event_log.h +++ b/webrtc/call/rtc_event_log.h @@ -26,6 +26,7 @@ namespace rtclog { class EventStream; } // namespace rtclog +class Clock; class RtcEventLogImpl; enum class MediaType; @@ -36,30 +37,40 @@ class RtcEventLog { public: virtual ~RtcEventLog() {} - static std::unique_ptr Create(); + // Factory method to create an RtcEventLog object. + static std::unique_ptr Create(const Clock* clock); - // Sets the time that events are stored in the internal event buffer - // before the user calls StartLogging. The default is 10 000 000 us = 10 s - virtual void SetBufferDuration(int64_t buffer_duration_us) = 0; - - // Starts logging for the specified duration to the specified file. - // The logging will stop automatically after the specified duration. + // Starts logging a maximum of max_size_bytes bytes to the specified file. // If the file already exists it will be overwritten. - // If the file cannot be opened, the RtcEventLog will not start logging. - virtual void StartLogging(const std::string& file_name, int duration_ms) = 0; + // If max_size_bytes <= 0, logging will be active until StopLogging is called. + // The function has no effect and returns false if we can't start a new log + // e.g. because we are already logging or the file cannot be opened. + virtual bool StartLogging(const std::string& file_name, + int64_t max_size_bytes) = 0; - // Starts logging until either the 10 minute timer runs out or the StopLogging - // function is called. The RtcEventLog takes ownership of the supplied - // rtc::PlatformFile. - virtual bool StartLogging(rtc::PlatformFile log_file) = 0; + // Same as above. The RtcEventLog takes ownership of the file if the call + // is successful, i.e. if it returns true. + virtual bool StartLogging(rtc::PlatformFile platform_file, + int64_t max_size_bytes) = 0; + // Deprecated. Pass an explicit file size limit. + bool StartLogging(const std::string& file_name) { + return StartLogging(file_name, 10000000); + } + + // Deprecated. Pass an explicit file size limit. + bool StartLogging(rtc::PlatformFile platform_file) { + return StartLogging(platform_file, 10000000); + } + + // Stops logging to file and waits until the thread has finished. virtual void StopLogging() = 0; - // Logs configuration information for webrtc::VideoReceiveStream + // Logs configuration information for webrtc::VideoReceiveStream. virtual void LogVideoReceiveStreamConfig( const webrtc::VideoReceiveStream::Config& config) = 0; - // Logs configuration information for webrtc::VideoSendStream + // Logs configuration information for webrtc::VideoSendStream. virtual void LogVideoSendStreamConfig( const webrtc::VideoSendStream::Config& config) = 0; @@ -76,7 +87,7 @@ class RtcEventLog { const uint8_t* packet, size_t length) = 0; - // Logs an audio playout event + // Logs an audio playout event. virtual void LogAudioPlayout(uint32_t ssrc) = 0; // Logs a bitrate update from the bandwidth estimator based on packet loss. @@ -86,6 +97,11 @@ class RtcEventLog { // Reads an RtcEventLog file and returns true when reading was successful. // The result is stored in the given EventStream object. + // The order of the events in the EventStream is implementation defined. + // The current implementation writes a LOG_START event, then the old + // configurations, then the remaining events in timestamp order and finally + // a LOG_END event. However, this might change without further notice. + // TODO(terelius): Change result type to a vector? static bool ParseRtcEventLog(const std::string& file_name, rtclog::EventStream* result); }; diff --git a/webrtc/call/rtc_event_log_helper_thread.cc b/webrtc/call/rtc_event_log_helper_thread.cc new file mode 100644 index 0000000000..a9aa85144f --- /dev/null +++ b/webrtc/call/rtc_event_log_helper_thread.cc @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/call/rtc_event_log_helper_thread.h" + +#include + +#include "webrtc/base/checks.h" +#include "webrtc/system_wrappers/include/logging.h" + +#ifdef ENABLE_RTC_EVENT_LOG + +namespace webrtc { + +namespace { +const int kEventsInHistory = 10000; + +bool IsConfigEvent(const rtclog::Event& event) { + rtclog::Event_EventType event_type = event.type(); + return event_type == rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT || + event_type == rtclog::Event::VIDEO_SENDER_CONFIG_EVENT || + event_type == rtclog::Event::AUDIO_RECEIVER_CONFIG_EVENT || + event_type == rtclog::Event::AUDIO_SENDER_CONFIG_EVENT; +} +} // namespace + +// RtcEventLogImpl member functions. +RtcEventLogHelperThread::RtcEventLogHelperThread( + SwapQueue* message_queue, + SwapQueue>* event_queue, + rtc::Event* wake_up, + rtc::Event* stopped, + const Clock* const clock) + : message_queue_(message_queue), + event_queue_(event_queue), + history_(kEventsInHistory), + config_history_(), + file_(FileWrapper::Create()), + thread_(&ThreadOutputFunction, this, "RtcEventLog thread"), + max_size_bytes_(std::numeric_limits::max()), + written_bytes_(0), + start_time_(0), + stop_time_(std::numeric_limits::max()), + has_recent_event_(false), + most_recent_event_(), + output_string_(), + wake_up_(wake_up), + stopped_(stopped), + clock_(clock) { + RTC_DCHECK(message_queue_); + RTC_DCHECK(event_queue_); + RTC_DCHECK(wake_up_); + RTC_DCHECK(stopped_); + RTC_DCHECK(clock_); + thread_.Start(); +} + +RtcEventLogHelperThread::~RtcEventLogHelperThread() { + ControlMessage message; + message.message_type = ControlMessage::TERMINATE_THREAD; + message.stop_time = clock_->TimeInMicroseconds(); + while (!message_queue_->Insert(&message)) { + // We can't destroy the event log until we have stopped the thread, + // so clear the message queue and try again. Note that if we clear + // any STOP_FILE events, then the threads calling StopLogging would likely + // wait indefinitely. However, there should not be any such calls as we + // are executing the destructor. + LOG(LS_WARNING) << "Clearing message queue to terminate thread."; + message_queue_->Clear(); + } + wake_up_->Set(); // Wake up the output thread. + thread_.Stop(); // Wait for the thread to terminate. +} + +bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) { + rtclog::EventStream event_stream; + event_stream.add_stream(); + event_stream.mutable_stream(0)->Swap(event); + // We create a new event stream per event but because of the way protobufs + // are encoded, events can be merged by concatenating them. Therefore, + // it will look like a single stream when we read it back from file. + bool stop = true; + if (written_bytes_ + static_cast(output_string_.size()) + + event_stream.ByteSize() <= + max_size_bytes_) { + event_stream.AppendToString(&output_string_); + stop = false; + } + // Swap the event back so that we don't mix event types in the queues. + event_stream.mutable_stream(0)->Swap(event); + return stop; +} + +void RtcEventLogHelperThread::LogToMemory() { + RTC_DCHECK(!file_->Open()); + + // Process each event earlier than the current time and append it to the + // appropriate history_. + int64_t current_time = clock_->TimeInMicroseconds(); + if (!has_recent_event_) { + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + } + while (has_recent_event_ && + most_recent_event_->timestamp_us() <= current_time) { + if (IsConfigEvent(*most_recent_event_)) { + config_history_.push_back(std::move(most_recent_event_)); + } else { + history_.push_back(std::move(most_recent_event_)); + } + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + } +} + +void RtcEventLogHelperThread::StartLogFile() { + RTC_DCHECK(file_->Open()); + bool stop = false; + output_string_.clear(); + + // Create and serialize the LOG_START event. + rtclog::Event start_event; + start_event.set_timestamp_us(start_time_); + start_event.set_type(rtclog::Event::LOG_START); + AppendEventToString(&start_event); + + // Serialize the config information for all old streams. + for (auto& event : config_history_) { + AppendEventToString(event.get()); + } + + // Serialize the events in the event queue. + while (!history_.empty() && !stop) { + stop = AppendEventToString(history_.front().get()); + if (!stop) { + history_.pop_front(); + } + } + + // Write to file. + file_->Write(output_string_.data(), output_string_.size()); + written_bytes_ += output_string_.size(); + + // Free the allocated memory since we probably won't need this amount of + // space again. + output_string_.clear(); + output_string_.shrink_to_fit(); + + if (stop) { + RTC_DCHECK(file_->Open()); + StopLogFile(); + } +} + +void RtcEventLogHelperThread::LogToFile() { + RTC_DCHECK(file_->Open()); + output_string_.clear(); + + // Append each event older than both the current time and the stop time + // to the output_string_. + int64_t current_time = clock_->TimeInMicroseconds(); + int64_t time_limit = std::min(current_time, stop_time_); + if (!has_recent_event_) { + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + } + bool stop = false; + while (!stop && has_recent_event_ && + most_recent_event_->timestamp_us() <= time_limit) { + stop = AppendEventToString(most_recent_event_.get()); + if (!stop) { + if (IsConfigEvent(*most_recent_event_)) { + config_history_.push_back(std::move(most_recent_event_)); + } + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + } + } + + // Write string to file. + file_->Write(output_string_.data(), output_string_.size()); + written_bytes_ += output_string_.size(); + + if (!file_->Open()) { + LOG(LS_WARNING) << "WebRTC event log file closed by FileWrapper."; + } + + // We want to stop logging if we have reached the file size limit. We also + // want to stop logging if the remaining events are more recent than the + // time limit, or in other words if we have terminated the loop despite + // having more events in the queue. + if ((has_recent_event_ && most_recent_event_->timestamp_us() > stop_time_) || + stop) { + RTC_DCHECK(file_->Open()); + StopLogFile(); + } +} + +void RtcEventLogHelperThread::StopLogFile() { + RTC_DCHECK(file_->Open()); + output_string_.clear(); + + rtclog::Event end_event; + end_event.set_timestamp_us(stop_time_); + end_event.set_type(rtclog::Event::LOG_END); + AppendEventToString(&end_event); + + if (written_bytes_ + static_cast(output_string_.size()) <= + max_size_bytes_) { + file_->Write(output_string_.data(), output_string_.size()); + written_bytes_ += output_string_.size(); + } + + max_size_bytes_ = std::numeric_limits::max(); + written_bytes_ = 0; + start_time_ = 0; + stop_time_ = std::numeric_limits::max(); + output_string_.clear(); + file_->CloseFile(); + RTC_DCHECK(!file_->Open()); +} + +void RtcEventLogHelperThread::WriteLog() { + ControlMessage message; + + while (true) { + // Process control messages. + while (message_queue_->Remove(&message)) { + switch (message.message_type) { + case ControlMessage::START_FILE: + if (!file_->Open()) { + max_size_bytes_ = message.max_size_bytes; + start_time_ = message.start_time; + stop_time_ = message.stop_time; + file_.swap(message.file); + StartLogFile(); + } else { + // Already started. Ignore message and close file handle. + message.file->CloseFile(); + } + break; + case ControlMessage::STOP_FILE: + if (file_->Open()) { + stop_time_ = message.stop_time; + LogToFile(); // Log remaining events from message queues. + } + // LogToFile might stop on it's own so we need to recheck the state. + if (file_->Open()) { + StopLogFile(); + } + stopped_->Set(); + break; + case ControlMessage::TERMINATE_THREAD: + if (file_->Open()) { + StopLogFile(); + } + return; + } + } + + // Write events to file or memory + if (file_->Open()) { + LogToFile(); + } else { + LogToMemory(); + } + + // Accumulate a new batch of events instead of processing them one at a + // time. + wake_up_->Wait(50); + } +} + +bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) { + RtcEventLogHelperThread* helper = static_cast(obj); + helper->WriteLog(); + return false; +} + +} // namespace webrtc + +#endif // ENABLE_RTC_EVENT_LOG diff --git a/webrtc/call/rtc_event_log_helper_thread.h b/webrtc/call/rtc_event_log_helper_thread.h new file mode 100644 index 0000000000..60ed912b65 --- /dev/null +++ b/webrtc/call/rtc_event_log_helper_thread.h @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_CALL_RTC_EVENT_LOG_HELPER_THREAD_H_ +#define WEBRTC_CALL_RTC_EVENT_LOG_HELPER_THREAD_H_ + +#include +#include +#include +#include +#include + +#include "webrtc/base/constructormagic.h" +#include "webrtc/base/event.h" +#include "webrtc/base/platform_thread.h" +#include "webrtc/base/swap_queue.h" +#include "webrtc/call/ringbuffer.h" +#include "webrtc/system_wrappers/include/clock.h" +#include "webrtc/system_wrappers/include/file_wrapper.h" + +#ifdef ENABLE_RTC_EVENT_LOG +// Files generated at build-time by the protobuf compiler. +#ifdef WEBRTC_ANDROID_PLATFORM_BUILD +#include "external/webrtc/webrtc/call/rtc_event_log.pb.h" +#else +#include "webrtc/call/rtc_event_log.pb.h" +#endif +#endif + +#ifdef ENABLE_RTC_EVENT_LOG + +namespace webrtc { + +class RtcEventLogHelperThread final { + public: + struct ControlMessage { + ControlMessage() + : message_type(STOP_FILE), + file(nullptr), + max_size_bytes(0), + start_time(0), + stop_time(0) {} + enum { START_FILE, STOP_FILE, TERMINATE_THREAD } message_type; + + std::unique_ptr file; // Only used with START_FILE. + int64_t max_size_bytes; // Only used with START_FILE. + int64_t start_time; // Only used with START_FILE. + int64_t stop_time; // Used with all 3 message types. + + friend void swap(ControlMessage& lhs, ControlMessage& rhs) { + using std::swap; + swap(lhs.message_type, rhs.message_type); + lhs.file.swap(rhs.file); + swap(lhs.max_size_bytes, rhs.max_size_bytes); + swap(lhs.start_time, rhs.start_time); + swap(lhs.stop_time, rhs.stop_time); + } + }; + + RtcEventLogHelperThread( + SwapQueue* message_queue, + SwapQueue>* event_queue, + rtc::Event* wake_up, + rtc::Event* file_finished, + const Clock* const clock); + ~RtcEventLogHelperThread(); + + private: + static bool ThreadOutputFunction(void* obj); + + void TerminateThread(); + bool AppendEventToString(rtclog::Event* event); + void AppendEventToHistory(const rtclog::Event& event); + void LogToMemory(); + void StartLogFile(); + void LogToFile(); + void StopLogFile(); + void WriteLog(); + + // Message queues for passing events to the logging thread. + SwapQueue* message_queue_; + SwapQueue>* event_queue_; + + // History containing the most recent events (~ 10 s). + RingBuffer> history_; + + // History containing all past configuration events. + std::vector> config_history_; + + std::unique_ptr file_; + rtc::PlatformThread thread_; + + int64_t max_size_bytes_; + int64_t written_bytes_; + int64_t start_time_; + int64_t stop_time_; + + bool has_recent_event_; + std::unique_ptr most_recent_event_; + + // Temporary space for serializing profobuf data. + std::string output_string_; + + rtc::Event* wake_up_; + rtc::Event* stopped_; + + const Clock* const clock_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcEventLogHelperThread); +}; + +} // namespace webrtc + +#endif // ENABLE_RTC_EVENT_LOG + +#endif // WEBRTC_CALL_RTC_EVENT_LOG_HELPER_THREAD_H_ diff --git a/webrtc/call/rtc_event_log_unittest.cc b/webrtc/call/rtc_event_log_unittest.cc index e3104591d9..a9b913d266 100644 --- a/webrtc/call/rtc_event_log_unittest.cc +++ b/webrtc/call/rtc_event_log_unittest.cc @@ -10,6 +10,7 @@ #ifdef ENABLE_RTC_EVENT_LOG +#include #include #include #include @@ -224,7 +225,7 @@ void VerifySendStreamConfig(const rtclog::Event& event, } void VerifyRtpEvent(const rtclog::Event& event, - bool incoming, + PacketDirection direction, MediaType media_type, const uint8_t* header, size_t header_size, @@ -233,7 +234,7 @@ void VerifyRtpEvent(const rtclog::Event& event, ASSERT_EQ(rtclog::Event::RTP_EVENT, event.type()); const rtclog::RtpPacket& rtp_packet = event.rtp_packet(); ASSERT_TRUE(rtp_packet.has_incoming()); - EXPECT_EQ(incoming, rtp_packet.incoming()); + EXPECT_EQ(direction == kIncomingPacket, rtp_packet.incoming()); ASSERT_TRUE(rtp_packet.has_type()); EXPECT_EQ(media_type, GetRuntimeMediaType(rtp_packet.type())); ASSERT_TRUE(rtp_packet.has_packet_length()); @@ -246,7 +247,7 @@ void VerifyRtpEvent(const rtclog::Event& event, } void VerifyRtcpEvent(const rtclog::Event& event, - bool incoming, + PacketDirection direction, MediaType media_type, const uint8_t* packet, size_t total_size) { @@ -254,7 +255,7 @@ void VerifyRtcpEvent(const rtclog::Event& event, ASSERT_EQ(rtclog::Event::RTCP_EVENT, event.type()); const rtclog::RtcpPacket& rtcp_packet = event.rtcp_packet(); ASSERT_TRUE(rtcp_packet.has_incoming()); - EXPECT_EQ(incoming, rtcp_packet.incoming()); + EXPECT_EQ(direction == kIncomingPacket, rtcp_packet.incoming()); ASSERT_TRUE(rtcp_packet.has_type()); EXPECT_EQ(media_type, GetRuntimeMediaType(rtcp_packet.type())); ASSERT_TRUE(rtcp_packet.has_packet_data()); @@ -292,6 +293,11 @@ void VerifyLogStartEvent(const rtclog::Event& event) { EXPECT_EQ(rtclog::Event::LOG_START, event.type()); } +void VerifyLogEndEvent(const rtclog::Event& event) { + ASSERT_TRUE(IsValidBasicEvent(event)); + EXPECT_EQ(rtclog::Event::LOG_END, event.type()); +} + /* * Bit number i of extension_bitvector is set to indicate the * presence of extension number i from kExtensionTypes / kExtensionNames. @@ -472,9 +478,12 @@ void LogSessionAndReadBack(size_t rtp_count, // When log_dumper goes out of scope, it causes the log file to be flushed // to disk. { - std::unique_ptr log_dumper(RtcEventLog::Create()); + SimulatedClock fake_clock(prng.Rand()); + std::unique_ptr log_dumper(RtcEventLog::Create(&fake_clock)); log_dumper->LogVideoReceiveStreamConfig(receiver_config); + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); log_dumper->LogVideoSendStreamConfig(sender_config); + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); size_t rtcp_index = 1; size_t playout_index = 1; size_t bwe_loss_index = 1; @@ -483,6 +492,7 @@ void LogSessionAndReadBack(size_t rtp_count, (i % 2 == 0) ? kIncomingPacket : kOutgoingPacket, (i % 3 == 0) ? MediaType::AUDIO : MediaType::VIDEO, rtp_packets[i - 1].data(), rtp_packets[i - 1].size()); + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); if (i * rtcp_count >= rtcp_index * rtp_count) { log_dumper->LogRtcpPacket( (rtcp_index % 2 == 0) ? kIncomingPacket : kOutgoingPacket, @@ -490,21 +500,26 @@ void LogSessionAndReadBack(size_t rtp_count, rtcp_packets[rtcp_index - 1].data(), rtcp_packets[rtcp_index - 1].size()); rtcp_index++; + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); } if (i * playout_count >= playout_index * rtp_count) { log_dumper->LogAudioPlayout(playout_ssrcs[playout_index - 1]); playout_index++; + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); } if (i * bwe_loss_count >= bwe_loss_index * rtp_count) { log_dumper->LogBwePacketLossEvent( bwe_loss_updates[bwe_loss_index - 1].first, bwe_loss_updates[bwe_loss_index - 1].second, i); bwe_loss_index++; + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); } if (i == rtp_count / 2) { log_dumper->StartLogging(temp_filename, 10000000); + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); } } + log_dumper->StopLogging(); } // Read the generated file from disk. @@ -516,24 +531,66 @@ void LogSessionAndReadBack(size_t rtp_count, // what we wrote down. For RTCP we log the full packets, but for // RTP we should only log the header. const int event_count = config_count + playout_count + bwe_loss_count + - rtcp_count + rtp_count + 1; + rtcp_count + rtp_count + 2; + EXPECT_GE(1000, event_count); // The events must fit in the message queue. EXPECT_EQ(event_count, parsed_stream.stream_size()); - VerifyReceiveStreamConfig(parsed_stream.stream(0), receiver_config); - VerifySendStreamConfig(parsed_stream.stream(1), sender_config); - size_t event_index = config_count; + if (event_count != parsed_stream.stream_size()) { + // Print the expected and actual event types for easier debugging. + std::map actual_event_counts; + for (size_t i = 0; i < static_cast(parsed_stream.stream_size()); + i++) { + actual_event_counts[parsed_stream.stream(i).type()]++; + } + printf("Actual events: "); + for (auto kv : actual_event_counts) { + printf("%d_count = %zu, ", kv.first, kv.second); + } + printf("\n"); + for (size_t i = 0; i < static_cast(parsed_stream.stream_size()); + i++) { + printf("%4d ", parsed_stream.stream(i).type()); + } + printf("\n"); + printf( + "Expected events: rtp_count = %zu, rtcp_count = %zu," + "playout_count = %zu, bwe_loss_count = %zu\n", + rtp_count, rtcp_count, playout_count, bwe_loss_count); + size_t rtcp_index = 1, playout_index = 1, bwe_loss_index = 1; + printf("strt cfg cfg "); + for (size_t i = 1; i <= rtp_count; i++) { + printf(" rtp "); + if (i * rtcp_count >= rtcp_index * rtp_count) { + printf("rtcp "); + rtcp_index++; + } + if (i * playout_count >= playout_index * rtp_count) { + printf("play "); + playout_index++; + } + if (i * bwe_loss_count >= bwe_loss_index * rtp_count) { + printf("loss "); + bwe_loss_index++; + } + } + printf("\n"); + } + VerifyLogStartEvent(parsed_stream.stream(0)); + VerifyReceiveStreamConfig(parsed_stream.stream(1), receiver_config); + VerifySendStreamConfig(parsed_stream.stream(2), sender_config); + size_t event_index = config_count + 1; size_t rtcp_index = 1; size_t playout_index = 1; size_t bwe_loss_index = 1; for (size_t i = 1; i <= rtp_count; i++) { VerifyRtpEvent(parsed_stream.stream(event_index), - (i % 2 == 0), // Every second packet is incoming. + (i % 2 == 0) ? kIncomingPacket : kOutgoingPacket, (i % 3 == 0) ? MediaType::AUDIO : MediaType::VIDEO, rtp_packets[i - 1].data(), rtp_header_sizes[i - 1], rtp_packets[i - 1].size()); event_index++; if (i * rtcp_count >= rtcp_index * rtp_count) { VerifyRtcpEvent(parsed_stream.stream(event_index), - rtcp_index % 2 == 0, // Every second packet is incoming. + (rtcp_index % 2 == 0) ? kIncomingPacket : kOutgoingPacket, rtcp_index % 3 == 0 ? MediaType::AUDIO : MediaType::VIDEO, rtcp_packets[rtcp_index - 1].data(), rtcp_packets[rtcp_index - 1].size()); @@ -553,10 +610,6 @@ void LogSessionAndReadBack(size_t rtp_count, event_index++; bwe_loss_index++; } - if (i == rtp_count / 2) { - VerifyLogStartEvent(parsed_stream.stream(event_index)); - event_index++; - } } // Clean up temporary file - can be pretty slow. @@ -596,39 +649,15 @@ TEST(RtcEventLogTest, LogSessionAndReadBack) { } } -// Tests that the event queue works correctly, i.e. drops old RTP, RTCP and -// debug events, but keeps config events even if they are older than the limit. -void DropOldEvents(uint32_t extensions_bitvector, - uint32_t csrcs_count, - unsigned int random_seed) { - rtc::Buffer old_rtp_packet; - rtc::Buffer recent_rtp_packet; - rtc::Buffer old_rtcp_packet; - rtc::Buffer recent_rtcp_packet; +TEST(RtcEventLogTest, LogEventAndReadBack) { + Random prng(987654321); - VideoReceiveStream::Config receiver_config(nullptr); - VideoSendStream::Config sender_config(nullptr); - - Random prng(random_seed); - - // Create two RTP packets containing random data. + // Create one RTP and one RTCP packet containing random data. size_t packet_size = prng.Rand(1000, 1100); - old_rtp_packet.SetSize(packet_size); - GenerateRtpPacket(extensions_bitvector, csrcs_count, old_rtp_packet.data(), - packet_size, &prng); - packet_size = prng.Rand(1000, 1100); - recent_rtp_packet.SetSize(packet_size); - size_t recent_header_size = - GenerateRtpPacket(extensions_bitvector, csrcs_count, - recent_rtp_packet.data(), packet_size, &prng); - - // Create two RTCP packets containing random data. - old_rtcp_packet = GenerateRtcpPacket(&prng); - recent_rtcp_packet = GenerateRtcpPacket(&prng); - - // Create configurations for the video streams. - GenerateVideoReceiveConfig(extensions_bitvector, &receiver_config, &prng); - GenerateVideoSendConfig(extensions_bitvector, &sender_config, &prng); + rtc::Buffer rtp_packet(packet_size); + size_t header_size = + GenerateRtpPacket(0, 0, rtp_packet.data(), packet_size, &prng); + rtc::Buffer rtcp_packet = GenerateRtcpPacket(&prng); // Find the name of the current test, in order to use it as a temporary // filename. @@ -636,58 +665,44 @@ void DropOldEvents(uint32_t extensions_bitvector, const std::string temp_filename = test::OutputPath() + test_info->test_case_name() + test_info->name(); - // The log file will be flushed to disk when the log_dumper goes out of scope. - { - std::unique_ptr log_dumper(RtcEventLog::Create()); - // Reduce the time old events are stored to 50 ms. - log_dumper->SetBufferDuration(50000); - log_dumper->LogVideoReceiveStreamConfig(receiver_config); - log_dumper->LogVideoSendStreamConfig(sender_config); - log_dumper->LogRtpHeader(kOutgoingPacket, MediaType::AUDIO, - old_rtp_packet.data(), old_rtp_packet.size()); - log_dumper->LogRtcpPacket(kIncomingPacket, MediaType::AUDIO, - old_rtcp_packet.data(), - old_rtcp_packet.size()); - // Sleep 55 ms to let old events be removed from the queue. - rtc::Thread::SleepMs(55); - log_dumper->StartLogging(temp_filename, 10000000); - log_dumper->LogRtpHeader(kIncomingPacket, MediaType::VIDEO, - recent_rtp_packet.data(), - recent_rtp_packet.size()); - log_dumper->LogRtcpPacket(kOutgoingPacket, MediaType::VIDEO, - recent_rtcp_packet.data(), - recent_rtcp_packet.size()); - } + // Add RTP, start logging, add RTCP and then stop logging + SimulatedClock fake_clock(prng.Rand()); + std::unique_ptr log_dumper(RtcEventLog::Create(&fake_clock)); + + log_dumper->LogRtpHeader(kIncomingPacket, MediaType::VIDEO, rtp_packet.data(), + rtp_packet.size()); + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); + + log_dumper->StartLogging(temp_filename, 10000000); + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); + + log_dumper->LogRtcpPacket(kOutgoingPacket, MediaType::VIDEO, + rtcp_packet.data(), rtcp_packet.size()); + fake_clock.AdvanceTimeMicroseconds(prng.Rand(1, 1000)); + + log_dumper->StopLogging(); // Read the generated file from disk. rtclog::EventStream parsed_stream; ASSERT_TRUE(RtcEventLog::ParseRtcEventLog(temp_filename, &parsed_stream)); // Verify that what we read back from the event log is the same as - // what we wrote. Old RTP and RTCP events should have been discarded, - // but old configuration events should still be available. - EXPECT_EQ(5, parsed_stream.stream_size()); - VerifyReceiveStreamConfig(parsed_stream.stream(0), receiver_config); - VerifySendStreamConfig(parsed_stream.stream(1), sender_config); - VerifyLogStartEvent(parsed_stream.stream(2)); - VerifyRtpEvent(parsed_stream.stream(3), true, MediaType::VIDEO, - recent_rtp_packet.data(), recent_header_size, - recent_rtp_packet.size()); - VerifyRtcpEvent(parsed_stream.stream(4), false, MediaType::VIDEO, - recent_rtcp_packet.data(), recent_rtcp_packet.size()); + // what we wrote down. + EXPECT_EQ(4, parsed_stream.stream_size()); + + VerifyLogStartEvent(parsed_stream.stream(0)); + + VerifyRtpEvent(parsed_stream.stream(1), kIncomingPacket, MediaType::VIDEO, + rtp_packet.data(), header_size, rtp_packet.size()); + + VerifyRtcpEvent(parsed_stream.stream(2), kOutgoingPacket, MediaType::VIDEO, + rtcp_packet.data(), rtcp_packet.size()); + + VerifyLogEndEvent(parsed_stream.stream(3)); // Clean up temporary file - can be pretty slow. remove(temp_filename.c_str()); } - -TEST(RtcEventLogTest, DropOldEvents) { - // Enable all header extensions - uint32_t extensions = (1u << kNumExtensions) - 1; - uint32_t csrcs_count = 2; - DropOldEvents(extensions, csrcs_count, 141421356); - DropOldEvents(extensions, csrcs_count, 173205080); -} - } // namespace webrtc #endif // ENABLE_RTC_EVENT_LOG diff --git a/webrtc/voice_engine/channel_manager.cc b/webrtc/voice_engine/channel_manager.cc index eac2e50919..6071f19548 100644 --- a/webrtc/voice_engine/channel_manager.cc +++ b/webrtc/voice_engine/channel_manager.cc @@ -49,7 +49,7 @@ ChannelManager::ChannelManager(uint32_t instance_id, const Config& config) : instance_id_(instance_id), last_channel_id_(-1), config_(config), - event_log_(RtcEventLog::Create()) {} + event_log_(RtcEventLog::Create(Clock::GetRealTimeClock())) {} ChannelOwner ChannelManager::CreateChannel() { return CreateChannelInternal(config_); diff --git a/webrtc/webrtc.gyp b/webrtc/webrtc.gyp index 6f90833a28..2efb0c297e 100644 --- a/webrtc/webrtc.gyp +++ b/webrtc/webrtc.gyp @@ -141,6 +141,8 @@ 'sources': [ 'call/rtc_event_log.cc', 'call/rtc_event_log.h', + 'call/rtc_event_log_helper_thread.cc', + 'call/rtc_event_log_helper_thread.h', ], 'conditions': [ # If enable_protobuf is defined, we want to compile the protobuf diff --git a/webrtc/webrtc_tests.gypi b/webrtc/webrtc_tests.gypi index 229f7b6565..88e7d6e012 100644 --- a/webrtc/webrtc_tests.gypi +++ b/webrtc/webrtc_tests.gypi @@ -161,6 +161,7 @@ 'call/bitrate_estimator_tests.cc', 'call/call_unittest.cc', 'call/packet_injection_tests.cc', + 'call/ringbuffer_unittest.cc', 'test/common_unittest.cc', 'test/testsupport/metrics/video_metrics_unittest.cc', 'video/call_stats_unittest.cc',