Hibernate the thread if there are no events in the queue. Wake it up when an event is added to the queue.

BUG=614192

Review-Url: https://codereview.webrtc.org/2035483003
Cr-Commit-Position: refs/heads/master@{#13070}
This commit is contained in:
terelius 2016-06-08 07:20:29 -07:00 committed by Commit bot
parent 919518613f
commit bea8959687
3 changed files with 70 additions and 52 deletions

View File

@ -103,15 +103,14 @@ class RtcEventLogImpl final : public RtcEventLog {
int32_t total_packets) override;
private:
void StoreEvent(std::unique_ptr<rtclog::Event>* event);
// Message queue for passing control messages to the logging thread.
SwapQueue<RtcEventLogHelperThread::ControlMessage> message_queue_;
// Message queue for passing events to the logging thread.
SwapQueue<std::unique_ptr<rtclog::Event> > event_queue_;
rtc::Event wake_up_;
rtc::Event stopped_;
const Clock* const clock_;
RtcEventLogHelperThread helper_thread_;
@ -165,13 +164,9 @@ RtcEventLogImpl::RtcEventLogImpl(const Clock* clock)
// Allocate buffers for roughly one second of history.
: message_queue_(kControlMessagesPerSecond),
event_queue_(kEventsPerSecond),
wake_up_(false, false),
stopped_(false, false),
clock_(clock),
helper_thread_(&message_queue_,
&event_queue_,
&wake_up_,
&stopped_,
clock),
thread_checker_() {
thread_checker_.DetachFromThread();
@ -201,6 +196,7 @@ bool RtcEventLogImpl::StartLogging(const std::string& file_name,
LOG(LS_ERROR) << "Message queue full. Can't start logging.";
return false;
}
helper_thread_.SignalNewEvent();
LOG(LS_INFO) << "Starting WebRTC event log.";
return true;
}
@ -234,6 +230,7 @@ bool RtcEventLogImpl::StartLogging(rtc::PlatformFile platform_file,
LOG(LS_ERROR) << "Message queue full. Can't start logging.";
return false;
}
helper_thread_.SignalNewEvent();
LOG(LS_INFO) << "Starting WebRTC event log.";
return true;
}
@ -255,8 +252,7 @@ void RtcEventLogImpl::StopLogging() {
message_queue_.Clear();
}
LOG(LS_INFO) << "Stopping WebRTC event log.";
wake_up_.Set(); // Request the output thread to wake up.
stopped_.Wait(rtc::Event::kForever); // Wait for the log to stop.
helper_thread_.WaitForFileFinished();
}
void RtcEventLogImpl::LogVideoReceiveStreamConfig(
@ -292,9 +288,7 @@ void RtcEventLogImpl::LogVideoReceiveStreamConfig(
decoder->set_name(d.payload_name);
decoder->set_payload_type(d.payload_type);
}
if (!event_queue_.Insert(&event)) {
LOG(LS_ERROR) << "Config queue full. Not logging config event.";
}
StoreEvent(&event);
}
void RtcEventLogImpl::LogVideoSendStreamConfig(
@ -324,9 +318,7 @@ void RtcEventLogImpl::LogVideoSendStreamConfig(
rtclog::EncoderConfig* encoder = sender_config->mutable_encoder();
encoder->set_name(config.encoder_settings.payload_name);
encoder->set_payload_type(config.encoder_settings.payload_type);
if (!event_queue_.Insert(&event)) {
LOG(LS_ERROR) << "Config queue full. Not logging config event.";
}
StoreEvent(&event);
}
void RtcEventLogImpl::LogRtpHeader(PacketDirection direction,
@ -356,9 +348,7 @@ void RtcEventLogImpl::LogRtpHeader(PacketDirection direction,
rtp_event->mutable_rtp_packet()->set_type(ConvertMediaType(media_type));
rtp_event->mutable_rtp_packet()->set_packet_length(packet_length);
rtp_event->mutable_rtp_packet()->set_header(header, header_length);
if (!event_queue_.Insert(&rtp_event)) {
LOG(LS_ERROR) << "RTP queue full. Not logging RTP packet.";
}
StoreEvent(&rtp_event);
}
void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction,
@ -416,9 +406,7 @@ void RtcEventLogImpl::LogRtcpPacket(PacketDirection direction,
block_begin += block_size;
}
rtcp_event->mutable_rtcp_packet()->set_packet_data(buffer, buffer_length);
if (!event_queue_.Insert(&rtcp_event)) {
LOG(LS_ERROR) << "RTCP queue full. Not logging RTCP packet.";
}
StoreEvent(&rtcp_event);
}
void RtcEventLogImpl::LogAudioPlayout(uint32_t ssrc) {
@ -427,9 +415,7 @@ void RtcEventLogImpl::LogAudioPlayout(uint32_t ssrc) {
event->set_type(rtclog::Event::AUDIO_PLAYOUT_EVENT);
auto playout_event = event->mutable_audio_playout_event();
playout_event->set_local_ssrc(ssrc);
if (!event_queue_.Insert(&event)) {
LOG(LS_ERROR) << "Playout queue full. Not logging ACM playout.";
}
StoreEvent(&event);
}
void RtcEventLogImpl::LogBwePacketLossEvent(int32_t bitrate,
@ -442,9 +428,14 @@ void RtcEventLogImpl::LogBwePacketLossEvent(int32_t bitrate,
bwe_event->set_bitrate(bitrate);
bwe_event->set_fraction_loss(fraction_loss);
bwe_event->set_total_packets(total_packets);
if (!event_queue_.Insert(&event)) {
LOG(LS_ERROR) << "BWE loss queue full. Not logging BWE update.";
StoreEvent(&event);
}
void RtcEventLogImpl::StoreEvent(std::unique_ptr<rtclog::Event>* event) {
if (!event_queue_.Insert(event)) {
LOG(LS_ERROR) << "WebRTC event log queue full. Dropping event.";
}
helper_thread_.SignalNewEvent();
}
bool RtcEventLog::ParseRtcEventLog(const std::string& file_name,

View File

@ -35,8 +35,6 @@ bool IsConfigEvent(const rtclog::Event& event) {
RtcEventLogHelperThread::RtcEventLogHelperThread(
SwapQueue<ControlMessage>* message_queue,
SwapQueue<std::unique_ptr<rtclog::Event>>* event_queue,
rtc::Event* wake_up,
rtc::Event* stopped,
const Clock* const clock)
: message_queue_(message_queue),
event_queue_(event_queue),
@ -51,13 +49,12 @@ RtcEventLogHelperThread::RtcEventLogHelperThread(
has_recent_event_(false),
most_recent_event_(),
output_string_(),
wake_up_(wake_up),
stopped_(stopped),
wake_periodically_(false, false),
wake_from_hibernation_(false, false),
file_finished_(false, false),
clock_(clock) {
RTC_DCHECK(message_queue_);
RTC_DCHECK(event_queue_);
RTC_DCHECK(wake_up_);
RTC_DCHECK(stopped_);
RTC_DCHECK(clock_);
thread_.Start();
}
@ -75,10 +72,21 @@ RtcEventLogHelperThread::~RtcEventLogHelperThread() {
LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
message_queue_->Clear();
}
wake_up_->Set(); // Wake up the output thread.
wake_from_hibernation_.Set();
wake_periodically_.Set(); // Wake up the output thread.
thread_.Stop(); // Wait for the thread to terminate.
}
void RtcEventLogHelperThread::WaitForFileFinished() {
wake_from_hibernation_.Set();
wake_periodically_.Set();
file_finished_.Wait(rtc::Event::kForever);
}
void RtcEventLogHelperThread::SignalNewEvent() {
wake_from_hibernation_.Set();
}
bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
rtclog::EventStream event_stream;
event_stream.add_stream();
@ -98,8 +106,9 @@ bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
return stop;
}
void RtcEventLogHelperThread::LogToMemory() {
bool RtcEventLogHelperThread::LogToMemory() {
RTC_DCHECK(!file_->Open());
bool message_received = false;
// Process each event earlier than the current time and append it to the
// appropriate history_.
@ -115,7 +124,9 @@ void RtcEventLogHelperThread::LogToMemory() {
history_.push_back(std::move(most_recent_event_));
}
has_recent_event_ = event_queue_->Remove(&most_recent_event_);
message_received = true;
}
return message_received;
}
void RtcEventLogHelperThread::StartLogFile() {
@ -162,9 +173,10 @@ void RtcEventLogHelperThread::StartLogFile() {
}
}
void RtcEventLogHelperThread::LogToFile() {
bool RtcEventLogHelperThread::LogToFile() {
RTC_DCHECK(file_->Open());
output_string_.clear();
bool message_received = false;
// Append each event older than both the current time and the stop time
// to the output_string_.
@ -183,6 +195,7 @@ void RtcEventLogHelperThread::LogToFile() {
}
has_recent_event_ = event_queue_->Remove(&most_recent_event_);
}
message_received = true;
}
// Write string to file.
@ -190,7 +203,7 @@ void RtcEventLogHelperThread::LogToFile() {
LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file.";
// The current FileWrapper implementation closes the file on error.
RTC_DCHECK(!file_->Open());
return;
return message_received;
}
written_bytes_ += output_string_.size();
@ -203,6 +216,7 @@ void RtcEventLogHelperThread::LogToFile() {
RTC_DCHECK(file_->Open());
StopLogFile();
}
return message_received;
}
void RtcEventLogHelperThread::StopLogFile() {
@ -233,10 +247,11 @@ void RtcEventLogHelperThread::StopLogFile() {
RTC_DCHECK(!file_->Open());
}
void RtcEventLogHelperThread::WriteLog() {
void RtcEventLogHelperThread::ProcessEvents() {
ControlMessage message;
while (true) {
bool message_received = false;
// Process control messages.
while (message_queue_->Remove(&message)) {
switch (message.message_type) {
@ -251,6 +266,7 @@ void RtcEventLogHelperThread::WriteLog() {
// Already started. Ignore message and close file handle.
message.file->CloseFile();
}
message_received = true;
break;
case ControlMessage::STOP_FILE:
if (file_->Open()) {
@ -261,7 +277,8 @@ void RtcEventLogHelperThread::WriteLog() {
if (file_->Open()) {
StopLogFile();
}
stopped_->Set();
file_finished_.Set();
message_received = true;
break;
case ControlMessage::TERMINATE_THREAD:
if (file_->Open()) {
@ -271,22 +288,26 @@ void RtcEventLogHelperThread::WriteLog() {
}
}
// Write events to file or memory
// Write events to file or memory.
if (file_->Open()) {
LogToFile();
message_received |= LogToFile();
} else {
LogToMemory();
message_received |= LogToMemory();
}
// Accumulate a new batch of events instead of processing them one at a
// time.
wake_up_->Wait(50);
if (message_received) {
wake_periodically_.Wait(100);
} else {
wake_from_hibernation_.Wait(rtc::Event::kForever);
}
}
}
bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
helper->WriteLog();
helper->ProcessEvents();
return false;
}

View File

@ -67,22 +67,27 @@ class RtcEventLogHelperThread final {
RtcEventLogHelperThread(
SwapQueue<ControlMessage>* message_queue,
SwapQueue<std::unique_ptr<rtclog::Event>>* event_queue,
rtc::Event* wake_up,
rtc::Event* file_finished,
const Clock* const clock);
~RtcEventLogHelperThread();
// This function MUST be called once a STOP_FILE message is added to the
// signalling queue. The function will make sure that the output thread
// wakes up to read the message, and it blocks until the output thread has
// finished writing to the file.
void WaitForFileFinished();
// This fuction MUST be called once an event is added to the event queue.
void SignalNewEvent();
private:
static bool ThreadOutputFunction(void* obj);
void TerminateThread();
bool AppendEventToString(rtclog::Event* event);
void AppendEventToHistory(const rtclog::Event& event);
void LogToMemory();
bool LogToMemory();
void StartLogFile();
void LogToFile();
bool LogToFile();
void StopLogFile();
void WriteLog();
void ProcessEvents();
// Message queues for passing events to the logging thread.
SwapQueue<ControlMessage>* message_queue_;
@ -108,8 +113,9 @@ class RtcEventLogHelperThread final {
// Temporary space for serializing profobuf data.
std::string output_string_;
rtc::Event* wake_up_;
rtc::Event* stopped_;
rtc::Event wake_periodically_;
rtc::Event wake_from_hibernation_;
rtc::Event file_finished_;
const Clock* const clock_;