diff --git a/webrtc/call/rtc_event_log.cc b/webrtc/call/rtc_event_log.cc index 3c602b7721..90145735ed 100644 --- a/webrtc/call/rtc_event_log.cc +++ b/webrtc/call/rtc_event_log.cc @@ -103,15 +103,14 @@ class RtcEventLogImpl final : public RtcEventLog { int32_t total_packets) override; private: + void StoreEvent(std::unique_ptr* event); + // Message queue for passing control messages to the logging thread. SwapQueue message_queue_; // Message queue for passing events to the logging thread. SwapQueue > event_queue_; - rtc::Event wake_up_; - rtc::Event stopped_; - const Clock* const clock_; RtcEventLogHelperThread helper_thread_; @@ -165,13 +164,9 @@ RtcEventLogImpl::RtcEventLogImpl(const Clock* clock) // Allocate buffers for roughly one second of history. : message_queue_(kControlMessagesPerSecond), event_queue_(kEventsPerSecond), - wake_up_(false, false), - stopped_(false, false), clock_(clock), helper_thread_(&message_queue_, &event_queue_, - &wake_up_, - &stopped_, clock), thread_checker_() { thread_checker_.DetachFromThread(); @@ -201,6 +196,7 @@ bool RtcEventLogImpl::StartLogging(const std::string& file_name, LOG(LS_ERROR) << "Message queue full. Can't start logging."; return false; } + helper_thread_.SignalNewEvent(); LOG(LS_INFO) << "Starting WebRTC event log."; return true; } @@ -234,6 +230,7 @@ bool RtcEventLogImpl::StartLogging(rtc::PlatformFile platform_file, LOG(LS_ERROR) << "Message queue full. Can't start logging."; return false; } + helper_thread_.SignalNewEvent(); LOG(LS_INFO) << "Starting WebRTC event log."; return true; } @@ -255,8 +252,7 @@ void RtcEventLogImpl::StopLogging() { message_queue_.Clear(); } LOG(LS_INFO) << "Stopping WebRTC event log."; - wake_up_.Set(); // Request the output thread to wake up. - stopped_.Wait(rtc::Event::kForever); // Wait for the log to stop. + helper_thread_.WaitForFileFinished(); } void RtcEventLogImpl::LogVideoReceiveStreamConfig( @@ -292,9 +288,7 @@ void RtcEventLogImpl::LogVideoReceiveStreamConfig( decoder->set_name(d.payload_name); decoder->set_payload_type(d.payload_type); } - if (!event_queue_.Insert(&event)) { - LOG(LS_ERROR) << "Config queue full. Not logging config event."; - } + StoreEvent(&event); } void RtcEventLogImpl::LogVideoSendStreamConfig( @@ -324,9 +318,7 @@ void RtcEventLogImpl::LogVideoSendStreamConfig( rtclog::EncoderConfig* encoder = sender_config->mutable_encoder(); encoder->set_name(config.encoder_settings.payload_name); encoder->set_payload_type(config.encoder_settings.payload_type); - if (!event_queue_.Insert(&event)) { - LOG(LS_ERROR) << "Config queue full. Not logging config event."; - } + StoreEvent(&event); } void RtcEventLogImpl::LogRtpHeader(PacketDirection direction, @@ -356,9 +348,7 @@ void RtcEventLogImpl::LogRtpHeader(PacketDirection direction, rtp_event->mutable_rtp_packet()->set_type(ConvertMediaType(media_type)); rtp_event->mutable_rtp_packet()->set_packet_length(packet_length); rtp_event->mutable_rtp_packet()->set_header(header, header_length); - if (!event_queue_.Insert(&rtp_event)) { - LOG(LS_ERROR) << "RTP queue full. Not logging RTP packet."; - } + StoreEvent(&rtp_event); } void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction, @@ -416,9 +406,7 @@ void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction, block_begin += block_size; } rtcp_event->mutable_rtcp_packet()->set_packet_data(buffer, buffer_length); - if (!event_queue_.Insert(&rtcp_event)) { - LOG(LS_ERROR) << "RTCP queue full. Not logging RTCP packet."; - } + StoreEvent(&rtcp_event); } void RtcEventLogImpl::LogAudioPlayout(uint32_t ssrc) { @@ -427,9 +415,7 @@ void RtcEventLogImpl::LogAudioPlayout(uint32_t ssrc) { event->set_type(rtclog::Event::AUDIO_PLAYOUT_EVENT); auto playout_event = event->mutable_audio_playout_event(); playout_event->set_local_ssrc(ssrc); - if (!event_queue_.Insert(&event)) { - LOG(LS_ERROR) << "Playout queue full. Not logging ACM playout."; - } + StoreEvent(&event); } void RtcEventLogImpl::LogBwePacketLossEvent(int32_t bitrate, @@ -442,9 +428,14 @@ void RtcEventLogImpl::LogBwePacketLossEvent(int32_t bitrate, bwe_event->set_bitrate(bitrate); bwe_event->set_fraction_loss(fraction_loss); bwe_event->set_total_packets(total_packets); - if (!event_queue_.Insert(&event)) { - LOG(LS_ERROR) << "BWE loss queue full. Not logging BWE update."; + StoreEvent(&event); +} + +void RtcEventLogImpl::StoreEvent(std::unique_ptr* event) { + if (!event_queue_.Insert(event)) { + LOG(LS_ERROR) << "WebRTC event log queue full. Dropping event."; } + helper_thread_.SignalNewEvent(); } bool RtcEventLog::ParseRtcEventLog(const std::string& file_name, diff --git a/webrtc/call/rtc_event_log_helper_thread.cc b/webrtc/call/rtc_event_log_helper_thread.cc index 93343129a1..6479908e62 100644 --- a/webrtc/call/rtc_event_log_helper_thread.cc +++ b/webrtc/call/rtc_event_log_helper_thread.cc @@ -35,8 +35,6 @@ bool IsConfigEvent(const rtclog::Event& event) { RtcEventLogHelperThread::RtcEventLogHelperThread( SwapQueue* message_queue, SwapQueue>* event_queue, - rtc::Event* wake_up, - rtc::Event* stopped, const Clock* const clock) : message_queue_(message_queue), event_queue_(event_queue), @@ -51,13 +49,12 @@ RtcEventLogHelperThread::RtcEventLogHelperThread( has_recent_event_(false), most_recent_event_(), output_string_(), - wake_up_(wake_up), - stopped_(stopped), + wake_periodically_(false, false), + wake_from_hibernation_(false, false), + file_finished_(false, false), clock_(clock) { RTC_DCHECK(message_queue_); RTC_DCHECK(event_queue_); - RTC_DCHECK(wake_up_); - RTC_DCHECK(stopped_); RTC_DCHECK(clock_); thread_.Start(); } @@ -75,10 +72,21 @@ RtcEventLogHelperThread::~RtcEventLogHelperThread() { LOG(LS_WARNING) << "Clearing message queue to terminate thread."; message_queue_->Clear(); } - wake_up_->Set(); // Wake up the output thread. + wake_from_hibernation_.Set(); + wake_periodically_.Set(); // Wake up the output thread. thread_.Stop(); // Wait for the thread to terminate. } +void RtcEventLogHelperThread::WaitForFileFinished() { + wake_from_hibernation_.Set(); + wake_periodically_.Set(); + file_finished_.Wait(rtc::Event::kForever); +} + +void RtcEventLogHelperThread::SignalNewEvent() { + wake_from_hibernation_.Set(); +} + bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) { rtclog::EventStream event_stream; event_stream.add_stream(); @@ -98,8 +106,9 @@ bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) { return stop; } -void RtcEventLogHelperThread::LogToMemory() { +bool RtcEventLogHelperThread::LogToMemory() { RTC_DCHECK(!file_->Open()); + bool message_received = false; // Process each event earlier than the current time and append it to the // appropriate history_. @@ -115,7 +124,9 @@ void RtcEventLogHelperThread::LogToMemory() { history_.push_back(std::move(most_recent_event_)); } has_recent_event_ = event_queue_->Remove(&most_recent_event_); + message_received = true; } + return message_received; } void RtcEventLogHelperThread::StartLogFile() { @@ -162,9 +173,10 @@ void RtcEventLogHelperThread::StartLogFile() { } } -void RtcEventLogHelperThread::LogToFile() { +bool RtcEventLogHelperThread::LogToFile() { RTC_DCHECK(file_->Open()); output_string_.clear(); + bool message_received = false; // Append each event older than both the current time and the stop time // to the output_string_. @@ -183,6 +195,7 @@ void RtcEventLogHelperThread::LogToFile() { } has_recent_event_ = event_queue_->Remove(&most_recent_event_); } + message_received = true; } // Write string to file. @@ -190,7 +203,7 @@ void RtcEventLogHelperThread::LogToFile() { LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file."; // The current FileWrapper implementation closes the file on error. RTC_DCHECK(!file_->Open()); - return; + return message_received; } written_bytes_ += output_string_.size(); @@ -203,6 +216,7 @@ void RtcEventLogHelperThread::LogToFile() { RTC_DCHECK(file_->Open()); StopLogFile(); } + return message_received; } void RtcEventLogHelperThread::StopLogFile() { @@ -233,10 +247,11 @@ void RtcEventLogHelperThread::StopLogFile() { RTC_DCHECK(!file_->Open()); } -void RtcEventLogHelperThread::WriteLog() { +void RtcEventLogHelperThread::ProcessEvents() { ControlMessage message; while (true) { + bool message_received = false; // Process control messages. while (message_queue_->Remove(&message)) { switch (message.message_type) { @@ -251,6 +266,7 @@ void RtcEventLogHelperThread::WriteLog() { // Already started. Ignore message and close file handle. message.file->CloseFile(); } + message_received = true; break; case ControlMessage::STOP_FILE: if (file_->Open()) { @@ -261,7 +277,8 @@ void RtcEventLogHelperThread::WriteLog() { if (file_->Open()) { StopLogFile(); } - stopped_->Set(); + file_finished_.Set(); + message_received = true; break; case ControlMessage::TERMINATE_THREAD: if (file_->Open()) { @@ -271,22 +288,26 @@ void RtcEventLogHelperThread::WriteLog() { } } - // Write events to file or memory + // Write events to file or memory. if (file_->Open()) { - LogToFile(); + message_received |= LogToFile(); } else { - LogToMemory(); + message_received |= LogToMemory(); } // Accumulate a new batch of events instead of processing them one at a // time. - wake_up_->Wait(50); + if (message_received) { + wake_periodically_.Wait(100); + } else { + wake_from_hibernation_.Wait(rtc::Event::kForever); + } } } bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) { RtcEventLogHelperThread* helper = static_cast(obj); - helper->WriteLog(); + helper->ProcessEvents(); return false; } diff --git a/webrtc/call/rtc_event_log_helper_thread.h b/webrtc/call/rtc_event_log_helper_thread.h index 60ed912b65..2d5a78586d 100644 --- a/webrtc/call/rtc_event_log_helper_thread.h +++ b/webrtc/call/rtc_event_log_helper_thread.h @@ -67,22 +67,27 @@ class RtcEventLogHelperThread final { RtcEventLogHelperThread( SwapQueue* message_queue, SwapQueue>* event_queue, - rtc::Event* wake_up, - rtc::Event* file_finished, const Clock* const clock); ~RtcEventLogHelperThread(); + // This function MUST be called once a STOP_FILE message is added to the + // signalling queue. The function will make sure that the output thread + // wakes up to read the message, and it blocks until the output thread has + // finished writing to the file. + void WaitForFileFinished(); + + // This fuction MUST be called once an event is added to the event queue. + void SignalNewEvent(); + private: static bool ThreadOutputFunction(void* obj); - void TerminateThread(); bool AppendEventToString(rtclog::Event* event); - void AppendEventToHistory(const rtclog::Event& event); - void LogToMemory(); + bool LogToMemory(); void StartLogFile(); - void LogToFile(); + bool LogToFile(); void StopLogFile(); - void WriteLog(); + void ProcessEvents(); // Message queues for passing events to the logging thread. SwapQueue* message_queue_; @@ -108,8 +113,9 @@ class RtcEventLogHelperThread final { // Temporary space for serializing profobuf data. std::string output_string_; - rtc::Event* wake_up_; - rtc::Event* stopped_; + rtc::Event wake_periodically_; + rtc::Event wake_from_hibernation_; + rtc::Event file_finished_; const Clock* const clock_;