diff --git a/webrtc/logging/BUILD.gn b/webrtc/logging/BUILD.gn index fac7e3d9b4..6b1e549d30 100644 --- a/webrtc/logging/BUILD.gn +++ b/webrtc/logging/BUILD.gn @@ -40,6 +40,8 @@ rtc_static_library("rtc_event_log_impl") { "rtc_event_log/rtc_event_log.cc", "rtc_event_log/rtc_event_log_factory.cc", "rtc_event_log/rtc_event_log_factory.h", + "rtc_event_log/rtc_event_log_helper_thread.cc", + "rtc_event_log/rtc_event_log_helper_thread.h", ] defines = [] @@ -52,8 +54,6 @@ rtc_static_library("rtc_event_log_impl") { "../modules/rtp_rtcp", "../rtc_base:protobuf_utils", "../rtc_base:rtc_base_approved", - "../rtc_base:rtc_task_queue", - "../rtc_base:sequenced_task_checker", "../system_wrappers", ] diff --git a/webrtc/logging/rtc_event_log/rtc_event_log.cc b/webrtc/logging/rtc_event_log/rtc_event_log.cc index d1cad89414..e661e67982 100644 --- a/webrtc/logging/rtc_event_log/rtc_event_log.cc +++ b/webrtc/logging/rtc_event_log/rtc_event_log.cc @@ -10,14 +10,11 @@ #include "webrtc/logging/rtc_event_log/rtc_event_log.h" -#include -#include -#include #include -#include #include #include +#include "webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h" #include "webrtc/modules/audio_coding/audio_network_adaptor/include/audio_network_adaptor.h" #include "webrtc/modules/remote_bitrate_estimator/include/bwe_defines.h" #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -32,67 +29,30 @@ #include "webrtc/modules/rtp_rtcp/source/rtcp_packet/rtpfb.h" #include "webrtc/modules/rtp_rtcp/source/rtcp_packet/sdes.h" #include "webrtc/modules/rtp_rtcp/source/rtcp_packet/sender_report.h" +#include "webrtc/rtc_base/atomicops.h" #include "webrtc/rtc_base/checks.h" #include "webrtc/rtc_base/constructormagic.h" #include "webrtc/rtc_base/event.h" -#include "webrtc/rtc_base/ignore_wundef.h" #include "webrtc/rtc_base/logging.h" #include "webrtc/rtc_base/protobuf_utils.h" -#include "webrtc/rtc_base/ptr_util.h" -#include "webrtc/rtc_base/sequenced_task_checker.h" -#include "webrtc/rtc_base/task_queue.h" -#include "webrtc/rtc_base/thread_annotations.h" +#include "webrtc/rtc_base/swap_queue.h" +#include "webrtc/rtc_base/thread_checker.h" #include "webrtc/rtc_base/timeutils.h" #include "webrtc/system_wrappers/include/file_wrapper.h" -#include "webrtc/typedefs.h" #ifdef ENABLE_RTC_EVENT_LOG // *.pb.h files are generated at build-time by the protobuf compiler. -RTC_PUSH_IGNORING_WUNDEF() #ifdef WEBRTC_ANDROID_PLATFORM_BUILD #include "external/webrtc/webrtc/logging/rtc_event_log/rtc_event_log.pb.h" #else #include "webrtc/logging/rtc_event_log/rtc_event_log.pb.h" #endif -RTC_POP_IGNORING_WUNDEF() #endif namespace webrtc { #ifdef ENABLE_RTC_EVENT_LOG -namespace { -const int kEventsInHistory = 10000; - -bool IsConfigEvent(const rtclog::Event& event) { - rtclog::Event_EventType event_type = event.type(); - return event_type == rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT || - event_type == rtclog::Event::VIDEO_SENDER_CONFIG_EVENT || - event_type == rtclog::Event::AUDIO_RECEIVER_CONFIG_EVENT || - event_type == rtclog::Event::AUDIO_SENDER_CONFIG_EVENT; -} - -// TODO(eladalon): This class exists because C++11 doesn't allow transferring a -// unique_ptr to a lambda (a copy constructor is required). We should get -// rid of this when we move to C++14. -template -class ResourceOwningTask final : public rtc::QueuedTask { - public: - ResourceOwningTask(std::unique_ptr resource, - std::function)> handler) - : resource_(std::move(resource)), handler_(handler) {} - - bool Run() override { - handler_(std::move(resource_)); - return true; - } - - private: - std::unique_ptr resource_; - std::function)> handler_; -}; -} // namespace - class RtcEventLogImpl final : public RtcEventLog { friend std::unique_ptr RtcEventLog::Create(); @@ -135,52 +95,24 @@ class RtcEventLogImpl final : public RtcEventLog { ProbeFailureReason failure_reason) override; private: - void StartLoggingInternal(std::unique_ptr file, - int64_t max_size_bytes); - - RtcEventLogImpl(); // Creation is done by RtcEventLog::Create. + // Private constructor to ensure that creation is done by RtcEventLog::Create. + RtcEventLogImpl(); void StoreEvent(std::unique_ptr event); void LogProbeResult(int id, rtclog::BweProbeResult::ResultType result, int bitrate_bps); - // Appends an event to the output protobuf string, returning true on success. - // Fails and returns false in case the limit on output size prevents the - // event from being added; in this case, the output string is left unchanged. - bool AppendEventToString(rtclog::Event* event, - ProtoString* output_string) RTC_WARN_UNUSED_RESULT; + static volatile int log_count_; - void LogToMemory(std::unique_ptr event); + // Message queue for passing control messages to the logging thread. + SwapQueue message_queue_; - void StartLogFile(); - void LogToFile(std::unique_ptr event); - void StopLogFile(int64_t stop_time); + // Message queue for passing events to the logging thread. + SwapQueue > event_queue_; - // Observe a limit on the number of concurrent logs, so as not to run into - // OS-imposed limits on open files and/or threads/task-queues. - // TODO(eladalon): Known issue - there's a race over |log_count_|. - static std::atomic log_count_; - - // Make sure that the event log is "managed" - created/destroyed, as well - // as started/stopped - from the same thread/task-queue. - rtc::SequencedTaskChecker owner_sequence_checker_; - - // History containing all past configuration events. - std::vector> config_history_ - ACCESS_ON(task_queue_); - - // History containing the most recent (non-configuration) events (~10s). - std::deque> history_ ACCESS_ON(task_queue_); - - std::unique_ptr file_ ACCESS_ON(task_queue_); - - size_t max_size_bytes_ ACCESS_ON(task_queue_); - size_t written_bytes_ ACCESS_ON(task_queue_); - - // Keep this last to ensure it destructs first, or else tasks living on the - // queue might access other members after they've been torn down. - rtc::TaskQueue task_queue_; + RtcEventLogHelperThread helper_thread_; + rtc::ThreadChecker thread_checker_; RTC_DISALLOW_COPY_AND_ASSIGN(RtcEventLogImpl); }; @@ -232,46 +164,66 @@ rtclog::BweProbeResult::ResultType ConvertProbeResultType( return rtclog::BweProbeResult::SUCCESS; } +// The RTP and RTCP buffers reserve space for twice the expected number of +// sent packets because they also contain received packets. +static const int kEventsPerSecond = 1000; +static const int kControlMessagesPerSecond = 10; } // namespace -std::atomic RtcEventLogImpl::log_count_(0); +volatile int RtcEventLogImpl::log_count_ = 0; +// RtcEventLogImpl member functions. RtcEventLogImpl::RtcEventLogImpl() - : file_(FileWrapper::Create()), - max_size_bytes_(std::numeric_limits::max()), - written_bytes_(0), - task_queue_("rtc_event_log") {} + // Allocate buffers for roughly one second of history. + : message_queue_(kControlMessagesPerSecond), + event_queue_(kEventsPerSecond), + helper_thread_(&message_queue_, &event_queue_), + thread_checker_() { + thread_checker_.DetachFromThread(); +} RtcEventLogImpl::~RtcEventLogImpl() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&owner_sequence_checker_); - - // If we're logging to the file, this will stop that. Blocking function. - StopLogging(); - - int count = std::atomic_fetch_sub(&RtcEventLogImpl::log_count_, 1) - 1; + // The RtcEventLogHelperThread destructor closes the file + // and waits for the thread to terminate. + int count = rtc::AtomicOps::Decrement(&RtcEventLogImpl::log_count_); RTC_DCHECK_GE(count, 0); } bool RtcEventLogImpl::StartLogging(const std::string& file_name, int64_t max_size_bytes) { - RTC_DCHECK_CALLED_SEQUENTIALLY(&owner_sequence_checker_); - - auto file = rtc::WrapUnique(FileWrapper::Create()); - if (!file->OpenFile(file_name.c_str(), false)) { + RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RtcEventLogHelperThread::ControlMessage message; + message.message_type = RtcEventLogHelperThread::ControlMessage::START_FILE; + message.max_size_bytes = max_size_bytes <= 0 + ? std::numeric_limits::max() + : max_size_bytes; + message.start_time = rtc::TimeMicros(); + message.stop_time = std::numeric_limits::max(); + message.file.reset(FileWrapper::Create()); + if (!message.file->OpenFile(file_name.c_str(), false)) { LOG(LS_ERROR) << "Can't open file. WebRTC event log not started."; return false; } - - StartLoggingInternal(std::move(file), max_size_bytes); - + if (!message_queue_.Insert(&message)) { + LOG(LS_ERROR) << "Message queue full. Can't start logging."; + return false; + } + helper_thread_.SignalNewEvent(); + LOG(LS_INFO) << "Starting WebRTC event log."; return true; } bool RtcEventLogImpl::StartLogging(rtc::PlatformFile platform_file, int64_t max_size_bytes) { - RTC_DCHECK_CALLED_SEQUENTIALLY(&owner_sequence_checker_); - - auto file = rtc::WrapUnique(FileWrapper::Create()); + RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RtcEventLogHelperThread::ControlMessage message; + message.message_type = RtcEventLogHelperThread::ControlMessage::START_FILE; + message.max_size_bytes = max_size_bytes <= 0 + ? std::numeric_limits::max() + : max_size_bytes; + message.start_time = rtc::TimeMicros(); + message.stop_time = std::numeric_limits::max(); + message.file.reset(FileWrapper::Create()); FILE* file_handle = rtc::FdopenPlatformFileForWriting(platform_file); if (!file_handle) { LOG(LS_ERROR) << "Can't open file. WebRTC event log not started."; @@ -282,36 +234,37 @@ bool RtcEventLogImpl::StartLogging(rtc::PlatformFile platform_file, } return false; } - if (!file->OpenFromFileHandle(file_handle)) { + if (!message.file->OpenFromFileHandle(file_handle)) { LOG(LS_ERROR) << "Can't open file. WebRTC event log not started."; return false; } - - StartLoggingInternal(std::move(file), max_size_bytes); - + if (!message_queue_.Insert(&message)) { + LOG(LS_ERROR) << "Message queue full. Can't start logging."; + return false; + } + helper_thread_.SignalNewEvent(); + LOG(LS_INFO) << "Starting WebRTC event log."; return true; } void RtcEventLogImpl::StopLogging() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&owner_sequence_checker_); - + RTC_DCHECK(thread_checker_.CalledOnValidThread()); + RtcEventLogHelperThread::ControlMessage message; + message.message_type = RtcEventLogHelperThread::ControlMessage::STOP_FILE; + message.stop_time = rtc::TimeMicros(); + while (!message_queue_.Insert(&message)) { + // TODO(terelius): We would like to have a blocking Insert function in the + // SwapQueue, but for the time being we will just clear any previous + // messages. + // Since StopLogging waits for the thread, it is essential that we don't + // clear any STOP_FILE messages. To ensure that there is only one call at a + // time, we require that all calls to StopLogging are made on the same + // thread. + LOG(LS_ERROR) << "Message queue full. Clearing queue to stop logging."; + message_queue_.Clear(); + } LOG(LS_INFO) << "Stopping WebRTC event log."; - - const int64_t stop_time = rtc::TimeMicros(); - - rtc::Event file_finished(true, false); - - task_queue_.PostTask([this, stop_time, &file_finished]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (file_->is_open()) { - StopLogFile(stop_time); - } - file_finished.Set(); - }); - - file_finished.Wait(rtc::Event::kForever); - - LOG(LS_INFO) << "WebRTC event log successfully stopped."; + helper_thread_.WaitForFileFinished(); } void RtcEventLogImpl::LogVideoReceiveStreamConfig( @@ -612,197 +565,12 @@ void RtcEventLogImpl::LogProbeResult(int id, StoreEvent(std::move(event)); } -void RtcEventLogImpl::StartLoggingInternal(std::unique_ptr file, - int64_t max_size_bytes) { - LOG(LS_INFO) << "Starting WebRTC event log."; - - max_size_bytes = (max_size_bytes <= 0) - ? std::numeric_limits::max() - : max_size_bytes; - auto file_handler = [this, - max_size_bytes](std::unique_ptr file) { - RTC_DCHECK_RUN_ON(&task_queue_); - if (!file_->is_open()) { - max_size_bytes_ = max_size_bytes; - file_ = std::move(file); - StartLogFile(); - } else { - // Already started. Ignore message and close file handle. - file->CloseFile(); - } - }; - task_queue_.PostTask(rtc::MakeUnique>( - std::move(file), file_handler)); -} - void RtcEventLogImpl::StoreEvent(std::unique_ptr event) { - RTC_DCHECK(event); - - auto event_handler = [this](std::unique_ptr rtclog_event) { - RTC_DCHECK_RUN_ON(&task_queue_); - if (file_->is_open()) { - LogToFile(std::move(rtclog_event)); - } else { - LogToMemory(std::move(rtclog_event)); - } - }; - - task_queue_.PostTask(rtc::MakeUnique>( - std::move(event), event_handler)); -} - -bool RtcEventLogImpl::AppendEventToString(rtclog::Event* event, - ProtoString* output_string) { - RTC_DCHECK_RUN_ON(&task_queue_); - - // Even though we're only serializing a single event during this call, what - // we intend to get is a list of events, with a tag and length preceding - // each actual event. To produce that, we serialize a list of a single event. - // If we later serialize additional events, the resulting ProtoString will - // be a proper concatenation of all those events. - - rtclog::EventStream event_stream; - event_stream.add_stream(); - - // As a tweak, we swap the new event into the event-stream, write that to - // file, then swap back. This saves on some copying. - rtclog::Event* output_event = event_stream.mutable_stream(0); - output_event->Swap(event); - - bool appended; - size_t potential_new_size = - written_bytes_ + output_string->size() + event_stream.ByteSize(); - if (potential_new_size <= max_size_bytes_) { - event_stream.AppendToString(output_string); - appended = true; - } else { - appended = false; + RTC_DCHECK(event.get() != nullptr); + if (!event_queue_.Insert(&event)) { + LOG(LS_ERROR) << "WebRTC event log queue full. Dropping event."; } - - // When the function returns, the original Event will be unchanged. - output_event->Swap(event); - - return appended; -} - -void RtcEventLogImpl::LogToMemory(std::unique_ptr event) { - RTC_DCHECK_RUN_ON(&task_queue_); - RTC_DCHECK(!file_->is_open()); - - if (IsConfigEvent(*event.get())) { - config_history_.push_back(std::move(event)); - } else { - history_.push_back(std::move(event)); - if (history_.size() > kEventsInHistory) { - history_.pop_front(); - } - } -} - -void RtcEventLogImpl::StartLogFile() { - RTC_DCHECK_RUN_ON(&task_queue_); - RTC_DCHECK(file_->is_open()); - - ProtoString output_string; - - // Create and serialize the LOG_START event. - // The timestamp used will correspond to when logging has started. The log - // may contain events earlier than the LOG_START event. (In general, the - // timestamps in the log are not monotonic.) - rtclog::Event start_event; - start_event.set_timestamp_us(rtc::TimeMicros()); - start_event.set_type(rtclog::Event::LOG_START); - bool appended = AppendEventToString(&start_event, &output_string); - - // Serialize the config information for all old streams, including streams - // which were already logged to previous files. - for (auto& event : config_history_) { - if (!appended) { - break; - } - appended = AppendEventToString(event.get(), &output_string); - } - - // Serialize the events in the event queue. - while (appended && !history_.empty()) { - appended = AppendEventToString(history_.front().get(), &output_string); - if (appended) { - // Known issue - if writing to the file fails, these events will have - // been lost. If we try to open a new file, these events will be missing - // from it. - history_.pop_front(); - } - } - - // Write to file. - if (!file_->Write(output_string.data(), output_string.size())) { - LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file."; - // The current FileWrapper implementation closes the file on error. - RTC_DCHECK(!file_->is_open()); - return; - } - written_bytes_ += output_string.size(); - - if (!appended) { - RTC_DCHECK(file_->is_open()); - StopLogFile(rtc::TimeMicros()); - } -} - -void RtcEventLogImpl::LogToFile(std::unique_ptr event) { - RTC_DCHECK_RUN_ON(&task_queue_); - RTC_DCHECK(file_->is_open()); - - ProtoString output_string; - - bool appended = AppendEventToString(event.get(), &output_string); - - if (IsConfigEvent(*event.get())) { - config_history_.push_back(std::move(event)); - } - - if (!appended) { - RTC_DCHECK(file_->is_open()); - history_.push_back(std::move(event)); - StopLogFile(rtc::TimeMicros()); - return; - } - - // Write string to file. - if (file_->Write(output_string.data(), output_string.size())) { - written_bytes_ += output_string.size(); - } else { - LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file."; - // The current FileWrapper implementation closes the file on error. - RTC_DCHECK(!file_->is_open()); - } -} - -void RtcEventLogImpl::StopLogFile(int64_t stop_time) { - RTC_DCHECK_RUN_ON(&task_queue_); - RTC_DCHECK(file_->is_open()); - - ProtoString output_string; - - rtclog::Event end_event; - end_event.set_timestamp_us(stop_time); - end_event.set_type(rtclog::Event::LOG_END); - bool appended = AppendEventToString(&end_event, &output_string); - - if (appended) { - if (!file_->Write(output_string.data(), output_string.size())) { - LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file."; - // The current FileWrapper implementation closes the file on error. - RTC_DCHECK(!file_->is_open()); - } - written_bytes_ += output_string.size(); - } - - max_size_bytes_ = std::numeric_limits::max(); - written_bytes_ = 0; - - file_->CloseFile(); - RTC_DCHECK(!file_->is_open()); + helper_thread_.SignalNewEvent(); } bool RtcEventLog::ParseRtcEventLog(const std::string& file_name, @@ -826,18 +594,17 @@ bool RtcEventLog::ParseRtcEventLog(const std::string& file_name, // RtcEventLog member functions. std::unique_ptr RtcEventLog::Create() { #ifdef ENABLE_RTC_EVENT_LOG - // TODO(eladalon): Known issue - there's a race over |log_count_| here. constexpr int kMaxLogCount = 5; - int count = 1 + std::atomic_fetch_add(&RtcEventLogImpl::log_count_, 1); + int count = rtc::AtomicOps::Increment(&RtcEventLogImpl::log_count_); if (count > kMaxLogCount) { LOG(LS_WARNING) << "Denied creation of additional WebRTC event logs. " << count - 1 << " logs open already."; - std::atomic_fetch_sub(&RtcEventLogImpl::log_count_, 1); + rtc::AtomicOps::Decrement(&RtcEventLogImpl::log_count_); return std::unique_ptr(new RtcEventLogNullImpl()); } return std::unique_ptr(new RtcEventLogImpl()); #else - return CreateNull(); + return std::unique_ptr(new RtcEventLogNullImpl()); #endif // ENABLE_RTC_EVENT_LOG } diff --git a/webrtc/logging/rtc_event_log/rtc_event_log.h b/webrtc/logging/rtc_event_log/rtc_event_log.h index 087f5d9b77..d26dccbea9 100644 --- a/webrtc/logging/rtc_event_log/rtc_event_log.h +++ b/webrtc/logging/rtc_event_log/rtc_event_log.h @@ -98,17 +98,16 @@ class RtcEventLog { int64_t max_size_bytes) = 0; // Deprecated. Pass an explicit file size limit. - RTC_DEPRECATED bool StartLogging(const std::string& file_name) { + bool StartLogging(const std::string& file_name) { return StartLogging(file_name, 10000000); } // Deprecated. Pass an explicit file size limit. - RTC_DEPRECATED bool StartLogging(rtc::PlatformFile platform_file) { + bool StartLogging(rtc::PlatformFile platform_file) { return StartLogging(platform_file, 10000000); } - // Stops logging to file and waits until the file has been closed, after - // which it would be permissible to read and/or modify it. + // Stops logging to file and waits until the thread has finished. virtual void StopLogging() = 0; // Logs configuration information for a video receive stream. diff --git a/webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.cc b/webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.cc new file mode 100644 index 0000000000..a98336fa52 --- /dev/null +++ b/webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.cc @@ -0,0 +1,315 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h" + +#include + +#include "webrtc/rtc_base/checks.h" +#include "webrtc/rtc_base/logging.h" +#include "webrtc/rtc_base/timeutils.h" + +#ifdef ENABLE_RTC_EVENT_LOG + +namespace webrtc { + +namespace { +const int kEventsInHistory = 10000; + +bool IsConfigEvent(const rtclog::Event& event) { + rtclog::Event_EventType event_type = event.type(); + return event_type == rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT || + event_type == rtclog::Event::VIDEO_SENDER_CONFIG_EVENT || + event_type == rtclog::Event::AUDIO_RECEIVER_CONFIG_EVENT || + event_type == rtclog::Event::AUDIO_SENDER_CONFIG_EVENT; +} +} // namespace + +// RtcEventLogImpl member functions. +RtcEventLogHelperThread::RtcEventLogHelperThread( + SwapQueue* message_queue, + SwapQueue>* event_queue) + : message_queue_(message_queue), + event_queue_(event_queue), + file_(FileWrapper::Create()), + thread_(&ThreadOutputFunction, this, "RtcEventLog thread"), + max_size_bytes_(std::numeric_limits::max()), + written_bytes_(0), + start_time_(0), + stop_time_(std::numeric_limits::max()), + has_recent_event_(false), + wake_periodically_(false, false), + wake_from_hibernation_(false, false), + file_finished_(false, false) { + RTC_DCHECK(message_queue_); + RTC_DCHECK(event_queue_); + thread_.Start(); +} + +RtcEventLogHelperThread::~RtcEventLogHelperThread() { + ControlMessage message; + message.message_type = ControlMessage::TERMINATE_THREAD; + message.stop_time = rtc::TimeMicros(); + while (!message_queue_->Insert(&message)) { + // We can't destroy the event log until we have stopped the thread, + // so clear the message queue and try again. Note that if we clear + // any STOP_FILE events, then the threads calling StopLogging would likely + // wait indefinitely. However, there should not be any such calls as we + // are executing the destructor. + LOG(LS_WARNING) << "Clearing message queue to terminate thread."; + message_queue_->Clear(); + } + wake_from_hibernation_.Set(); + wake_periodically_.Set(); // Wake up the output thread. + thread_.Stop(); // Wait for the thread to terminate. +} + +void RtcEventLogHelperThread::WaitForFileFinished() { + wake_from_hibernation_.Set(); + wake_periodically_.Set(); + file_finished_.Wait(rtc::Event::kForever); +} + +void RtcEventLogHelperThread::SignalNewEvent() { + wake_from_hibernation_.Set(); +} + +bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) { + rtclog::EventStream event_stream; + event_stream.add_stream(); + event_stream.mutable_stream(0)->Swap(event); + // We create a new event stream per event but because of the way protobufs + // are encoded, events can be merged by concatenating them. Therefore, + // it will look like a single stream when we read it back from file. + bool stop = true; + if (written_bytes_ + static_cast(output_string_.size()) + + event_stream.ByteSize() <= + max_size_bytes_) { + event_stream.AppendToString(&output_string_); + stop = false; + } + // Swap the event back so that we don't mix event types in the queues. + event_stream.mutable_stream(0)->Swap(event); + return stop; +} + +bool RtcEventLogHelperThread::LogToMemory() { + RTC_DCHECK(!file_->is_open()); + bool message_received = false; + + // Process each event earlier than the current time and append it to the + // appropriate history_. + int64_t current_time = rtc::TimeMicros(); + if (!has_recent_event_) { + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + } + while (has_recent_event_ && + most_recent_event_->timestamp_us() <= current_time) { + if (IsConfigEvent(*most_recent_event_)) { + config_history_.push_back(std::move(most_recent_event_)); + } else { + history_.push_back(std::move(most_recent_event_)); + if (history_.size() > kEventsInHistory) + history_.pop_front(); + } + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + message_received = true; + } + return message_received; +} + +void RtcEventLogHelperThread::StartLogFile() { + RTC_DCHECK(file_->is_open()); + bool stop = false; + output_string_.clear(); + + // Create and serialize the LOG_START event. + rtclog::Event start_event; + start_event.set_timestamp_us(start_time_); + start_event.set_type(rtclog::Event::LOG_START); + AppendEventToString(&start_event); + + // Serialize the config information for all old streams. + for (auto& event : config_history_) { + AppendEventToString(event.get()); + } + + // Serialize the events in the event queue. + while (!history_.empty() && !stop) { + stop = AppendEventToString(history_.front().get()); + if (!stop) { + history_.pop_front(); + } + } + + // Write to file. + if (!file_->Write(output_string_.data(), output_string_.size())) { + LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file."; + // The current FileWrapper implementation closes the file on error. + RTC_DCHECK(!file_->is_open()); + return; + } + written_bytes_ += output_string_.size(); + + // Free the allocated memory since we probably won't need this amount of + // space again. + output_string_.clear(); + output_string_.shrink_to_fit(); + + if (stop) { + RTC_DCHECK(file_->is_open()); + StopLogFile(); + } +} + +bool RtcEventLogHelperThread::LogToFile() { + RTC_DCHECK(file_->is_open()); + output_string_.clear(); + bool message_received = false; + + // Append each event older than both the current time and the stop time + // to the output_string_. + int64_t current_time = rtc::TimeMicros(); + int64_t time_limit = std::min(current_time, stop_time_); + if (!has_recent_event_) { + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + } + bool stop = false; + while (!stop && has_recent_event_ && + most_recent_event_->timestamp_us() <= time_limit) { + stop = AppendEventToString(most_recent_event_.get()); + if (!stop) { + if (IsConfigEvent(*most_recent_event_)) { + config_history_.push_back(std::move(most_recent_event_)); + } + has_recent_event_ = event_queue_->Remove(&most_recent_event_); + } + message_received = true; + } + + // Write string to file. + if (!file_->Write(output_string_.data(), output_string_.size())) { + LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file."; + // The current FileWrapper implementation closes the file on error. + RTC_DCHECK(!file_->is_open()); + return message_received; + } + written_bytes_ += output_string_.size(); + + // We want to stop logging if we have reached the file size limit. We also + // want to stop logging if the remaining events are more recent than the + // time limit, or in other words if we have terminated the loop despite + // having more events in the queue. + if ((has_recent_event_ && most_recent_event_->timestamp_us() > stop_time_) || + stop) { + RTC_DCHECK(file_->is_open()); + StopLogFile(); + } + return message_received; +} + +void RtcEventLogHelperThread::StopLogFile() { + RTC_DCHECK(file_->is_open()); + output_string_.clear(); + + rtclog::Event end_event; + // This function can be called either because we have reached the stop time, + // or because we have reached the log file size limit. Therefore, use the + // current time if we have not reached the time limit. + end_event.set_timestamp_us( + std::min(stop_time_, rtc::TimeMicros())); + end_event.set_type(rtclog::Event::LOG_END); + AppendEventToString(&end_event); + + if (written_bytes_ + static_cast(output_string_.size()) <= + max_size_bytes_) { + if (!file_->Write(output_string_.data(), output_string_.size())) { + LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file."; + // The current FileWrapper implementation closes the file on error. + RTC_DCHECK(!file_->is_open()); + } + written_bytes_ += output_string_.size(); + } + + max_size_bytes_ = std::numeric_limits::max(); + written_bytes_ = 0; + start_time_ = 0; + stop_time_ = std::numeric_limits::max(); + output_string_.clear(); + file_->CloseFile(); + RTC_DCHECK(!file_->is_open()); +} + +void RtcEventLogHelperThread::ProcessEvents() { + ControlMessage message; + + while (true) { + bool message_received = false; + // Process control messages. + while (message_queue_->Remove(&message)) { + switch (message.message_type) { + case ControlMessage::START_FILE: + if (!file_->is_open()) { + max_size_bytes_ = message.max_size_bytes; + start_time_ = message.start_time; + stop_time_ = message.stop_time; + file_.swap(message.file); + StartLogFile(); + } else { + // Already started. Ignore message and close file handle. + message.file->CloseFile(); + } + message_received = true; + break; + case ControlMessage::STOP_FILE: + if (file_->is_open()) { + stop_time_ = message.stop_time; + LogToFile(); // Log remaining events from message queues. + } + // LogToFile might stop on it's own so we need to recheck the state. + if (file_->is_open()) { + StopLogFile(); + } + file_finished_.Set(); + message_received = true; + break; + case ControlMessage::TERMINATE_THREAD: + if (file_->is_open()) { + StopLogFile(); + } + return; + } + } + + // Write events to file or memory. + if (file_->is_open()) { + message_received |= LogToFile(); + } else { + message_received |= LogToMemory(); + } + + // Accumulate a new batch of events instead of processing them one at a + // time. + if (message_received) { + wake_periodically_.Wait(100); + } else { + wake_from_hibernation_.Wait(rtc::Event::kForever); + } + } +} + +void RtcEventLogHelperThread::ThreadOutputFunction(void* obj) { + RtcEventLogHelperThread* helper = static_cast(obj); + helper->ProcessEvents(); +} + +} // namespace webrtc + +#endif // ENABLE_RTC_EVENT_LOG diff --git a/webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h b/webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h new file mode 100644 index 0000000000..0d9ad4dc96 --- /dev/null +++ b/webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_LOGGING_RTC_EVENT_LOG_RTC_EVENT_LOG_HELPER_THREAD_H_ +#define WEBRTC_LOGGING_RTC_EVENT_LOG_RTC_EVENT_LOG_HELPER_THREAD_H_ + +#include +#include +#include +#include +#include + +#include "webrtc/rtc_base/constructormagic.h" +#include "webrtc/rtc_base/event.h" +#include "webrtc/rtc_base/ignore_wundef.h" +#include "webrtc/rtc_base/platform_thread.h" +#include "webrtc/rtc_base/protobuf_utils.h" +#include "webrtc/rtc_base/swap_queue.h" +#include "webrtc/system_wrappers/include/file_wrapper.h" + +#ifdef ENABLE_RTC_EVENT_LOG +// *.ph.h files are generated at build-time by the protobuf compiler. +RTC_PUSH_IGNORING_WUNDEF() +#ifdef WEBRTC_ANDROID_PLATFORM_BUILD +#include "external/webrtc/webrtc/logging/rtc_event_log/rtc_event_log.pb.h" +#else +#include "webrtc/logging/rtc_event_log/rtc_event_log.pb.h" +#endif +RTC_POP_IGNORING_WUNDEF() +#endif + +#ifdef ENABLE_RTC_EVENT_LOG + +namespace webrtc { + +class RtcEventLogHelperThread final { + public: + struct ControlMessage { + ControlMessage() + : message_type(STOP_FILE), + file(nullptr), + max_size_bytes(0), + start_time(0), + stop_time(0) {} + enum { START_FILE, STOP_FILE, TERMINATE_THREAD } message_type; + + std::unique_ptr file; // Only used with START_FILE. + int64_t max_size_bytes; // Only used with START_FILE. + int64_t start_time; // Only used with START_FILE. + int64_t stop_time; // Used with all 3 message types. + + friend void swap(ControlMessage& lhs, ControlMessage& rhs) { + using std::swap; + swap(lhs.message_type, rhs.message_type); + lhs.file.swap(rhs.file); + swap(lhs.max_size_bytes, rhs.max_size_bytes); + swap(lhs.start_time, rhs.start_time); + swap(lhs.stop_time, rhs.stop_time); + } + }; + + RtcEventLogHelperThread( + SwapQueue* message_queue, + SwapQueue>* event_queue); + ~RtcEventLogHelperThread(); + + // This function MUST be called once a STOP_FILE message is added to the + // signalling queue. The function will make sure that the output thread + // wakes up to read the message, and it blocks until the output thread has + // finished writing to the file. + void WaitForFileFinished(); + + // This fuction MUST be called once an event is added to the event queue. + void SignalNewEvent(); + + private: + static void ThreadOutputFunction(void* obj); + + bool AppendEventToString(rtclog::Event* event); + bool LogToMemory(); + void StartLogFile(); + bool LogToFile(); + void StopLogFile(); + void ProcessEvents(); + + // Message queues for passing events to the logging thread. + SwapQueue* message_queue_; + SwapQueue>* event_queue_; + + // History containing the most recent events (~ 10 s). + std::deque> history_; + + // History containing all past configuration events. + std::vector> config_history_; + + std::unique_ptr file_; + rtc::PlatformThread thread_; + + int64_t max_size_bytes_; + int64_t written_bytes_; + int64_t start_time_; + int64_t stop_time_; + + bool has_recent_event_; + std::unique_ptr most_recent_event_; + + // Temporary space for serializing profobuf data. + ProtoString output_string_; + + rtc::Event wake_periodically_; + rtc::Event wake_from_hibernation_; + rtc::Event file_finished_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcEventLogHelperThread); +}; + +} // namespace webrtc + +#endif // ENABLE_RTC_EVENT_LOG + +#endif // WEBRTC_LOGGING_RTC_EVENT_LOG_RTC_EVENT_LOG_HELPER_THREAD_H_ diff --git a/webrtc/pc/peerconnection.cc b/webrtc/pc/peerconnection.cc index 99b87c0012..e22a09ece0 100644 --- a/webrtc/pc/peerconnection.cc +++ b/webrtc/pc/peerconnection.cc @@ -1337,11 +1337,11 @@ void PeerConnection::Close() { rtc::Bind(&cricket::PortAllocator::DiscardCandidatePool, port_allocator_.get())); - factory_->worker_thread()->Invoke(RTC_FROM_HERE, [this] { - call_.reset(); - // The event log must outlive call (and any other object that uses it). - event_log_.reset(); - }); + factory_->worker_thread()->Invoke(RTC_FROM_HERE, + [this] { call_.reset(); }); + + // The event log must outlive call (and any other object that uses it). + event_log_.reset(); } void PeerConnection::OnSessionStateChange(WebRtcSession* /*session*/, diff --git a/webrtc/pc/peerconnectionfactory.cc b/webrtc/pc/peerconnectionfactory.cc index 6da4669773..d4ee405f1a 100644 --- a/webrtc/pc/peerconnectionfactory.cc +++ b/webrtc/pc/peerconnectionfactory.cc @@ -262,9 +262,8 @@ PeerConnectionFactory::CreatePeerConnection( allocator.get(), options_.network_ignore_mask)); std::unique_ptr event_log = - worker_thread_->Invoke>( - RTC_FROM_HERE, - rtc::Bind(&PeerConnectionFactory::CreateRtcEventLog_w, this)); + event_log_factory_ ? event_log_factory_->CreateRtcEventLog() + : rtc::MakeUnique(); std::unique_ptr call = worker_thread_->Invoke>( RTC_FROM_HERE, @@ -332,11 +331,6 @@ rtc::Thread* PeerConnectionFactory::network_thread() { return network_thread_; } -std::unique_ptr PeerConnectionFactory::CreateRtcEventLog_w() { - return event_log_factory_ ? event_log_factory_->CreateRtcEventLog() - : rtc::MakeUnique(); -} - std::unique_ptr PeerConnectionFactory::CreateCall_w( RtcEventLog* event_log) { const int kMinBandwidthBps = 30000; diff --git a/webrtc/pc/peerconnectionfactory.h b/webrtc/pc/peerconnectionfactory.h index 82440e7275..ebd3016934 100644 --- a/webrtc/pc/peerconnectionfactory.h +++ b/webrtc/pc/peerconnectionfactory.h @@ -121,7 +121,6 @@ class PeerConnectionFactory : public PeerConnectionFactoryInterface { virtual ~PeerConnectionFactory(); private: - std::unique_ptr CreateRtcEventLog_w(); std::unique_ptr CreateCall_w(RtcEventLog* event_log); bool wraps_current_thread_;