From 0a025345fe7cc4cb5554a25c687f9b98bb15ae4a Mon Sep 17 00:00:00 2001 From: Jianhui Dai Date: Fri, 17 Mar 2023 08:20:04 +0800 Subject: [PATCH] RtcEventLog: Separate `LogToMemory` from TaskQueue to current thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Original implementation uses TaskQueue to async execute both `LogToMemory` and `WriteToOutput`. `LogToMemory` is invoked in high frequency, but the execution takes a very short time. It would be a bit more expensive to post on TaskQueue than execution on current thread with locking. It is because that the TaskQueue switches the thread context for execution. This CL separates `LogToMemory` from TaskQueue to current thread, in order to avoid frequent context switching; And periodically schedule `WriteToOutput` to TaskQueue, not block current thread. Link: https://webrtc-review.googlesource.com/c/src/+/283641 Bug: chromium:1288710 Change-Id: Ic78216aff16d1883b109e360a0892da3ca8f5ecb Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295640 Reviewed-by: Björn Terelius Commit-Queue: Markus Handell Reviewed-by: Markus Handell Reviewed-by: Markus Handell Cr-Commit-Position: refs/heads/main@{#39629} --- logging/rtc_event_log/rtc_event_log_impl.cc | 225 ++++++++++++-------- logging/rtc_event_log/rtc_event_log_impl.h | 42 ++-- 2 files changed, 167 insertions(+), 100 deletions(-) diff --git a/logging/rtc_event_log/rtc_event_log_impl.cc b/logging/rtc_event_log/rtc_event_log_impl.cc index 31f1e553a0..f2b3f22d6a 100644 --- a/logging/rtc_event_log/rtc_event_log_impl.cc +++ b/logging/rtc_event_log/rtc_event_log_impl.cc @@ -17,7 +17,6 @@ #include #include "absl/strings/string_view.h" -#include "absl/types/optional.h" #include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "logging/rtc_event_log/encoder/rtc_event_log_encoder_legacy.h" @@ -55,10 +54,7 @@ RtcEventLogImpl::RtcEventLogImpl(std::unique_ptr encoder, : max_events_in_history_(max_events_in_history), max_config_events_in_history_(max_config_events_in_history), event_encoder_(std::move(encoder)), - num_config_events_written_(0), last_output_ms_(rtc::TimeMillis()), - output_scheduled_(false), - logging_state_started_(false), task_queue_( std::make_unique(task_queue_factory->CreateTaskQueue( "rtc_event_log", @@ -66,7 +62,11 @@ RtcEventLogImpl::RtcEventLogImpl(std::unique_ptr encoder, RtcEventLogImpl::~RtcEventLogImpl() { // If we're logging to the output, this will stop that. Blocking function. - if (logging_state_started_) { + mutex_.Lock(); + bool started = logging_state_started_; + mutex_.Unlock(); + + if (started) { logging_state_checker_.Detach(); StopLogging(); } @@ -80,6 +80,7 @@ RtcEventLogImpl::~RtcEventLogImpl() { bool RtcEventLogImpl::StartLogging(std::unique_ptr output, int64_t output_period_ms) { + RTC_DCHECK(output); RTC_DCHECK(output_period_ms == kImmediateOutput || output_period_ms > 0); if (!output->IsActive()) { @@ -94,17 +95,39 @@ bool RtcEventLogImpl::StartLogging(std::unique_ptr output, << timestamp_us << ", " << utc_time_us << ")."; RTC_DCHECK_RUN_ON(&logging_state_checker_); + MutexLock lock(&mutex_); logging_state_started_ = true; + immediately_output_mode_ = (output_period_ms == kImmediateOutput); + need_schedule_output_ = (output_period_ms != kImmediateOutput); + // Binding to `this` is safe because `this` outlives the `task_queue_`. task_queue_->PostTask([this, output_period_ms, timestamp_us, utc_time_us, - output = std::move(output)]() mutable { + output = std::move(output), + histories = ExtractRecentHistories()]() mutable { RTC_DCHECK_RUN_ON(task_queue_.get()); + RTC_DCHECK(output); RTC_DCHECK(output->IsActive()); output_period_ms_ = output_period_ms; event_output_ = std::move(output); - num_config_events_written_ = 0; + WriteToOutput(event_encoder_->EncodeLogStart(timestamp_us, utc_time_us)); - LogEventsFromMemoryToOutput(); + // Load all configs of previous sessions. + if (!all_config_history_.empty()) { + EventDeque& history = histories.config_history; + history.insert(history.begin(), + std::make_move_iterator(all_config_history_.begin()), + std::make_move_iterator(all_config_history_.end())); + all_config_history_.clear(); + + if (history.size() > max_config_events_in_history_) { + RTC_LOG(LS_WARNING) + << "Dropping config events: " << history.size() + << " exceeds maximum " << max_config_events_in_history_; + history.erase(history.begin(), history.begin() + history.size() - + max_config_events_in_history_); + } + } + LogEventsToOutput(std::move(histories)); }); return true; @@ -124,97 +147,111 @@ void RtcEventLogImpl::StopLogging() { void RtcEventLogImpl::StopLogging(std::function callback) { RTC_DCHECK_RUN_ON(&logging_state_checker_); + MutexLock lock(&mutex_); logging_state_started_ = false; - task_queue_->PostTask([this, callback] { - RTC_DCHECK_RUN_ON(task_queue_.get()); - if (event_output_) { - RTC_DCHECK(event_output_->IsActive()); - LogEventsFromMemoryToOutput(); - } - StopLoggingInternal(); - callback(); - }); + task_queue_->PostTask( + [this, callback, histories = ExtractRecentHistories()]() mutable { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (event_output_) { + RTC_DCHECK(event_output_->IsActive()); + LogEventsToOutput(std::move(histories)); + } + StopLoggingInternal(); + callback(); + }); +} + +RtcEventLogImpl::EventHistories RtcEventLogImpl::ExtractRecentHistories() { + EventHistories histories; + std::swap(histories, recent_); + return histories; } void RtcEventLogImpl::Log(std::unique_ptr event) { RTC_CHECK(event); + MutexLock lock(&mutex_); - // Binding to `this` is safe because `this` outlives the `task_queue_`. - task_queue_->PostTask([this, event = std::move(event)]() mutable { - RTC_DCHECK_RUN_ON(task_queue_.get()); - LogToMemory(std::move(event)); - if (event_output_) - ScheduleOutput(); - }); + LogToMemory(std::move(event)); + if (logging_state_started_) { + if (ShouldOutputImmediately()) { + // Binding to `this` is safe because `this` outlives the `task_queue_`. + task_queue_->PostTask( + [this, histories = ExtractRecentHistories()]() mutable { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (event_output_) { + RTC_DCHECK(event_output_->IsActive()); + LogEventsToOutput(std::move(histories)); + } + }); + } else if (need_schedule_output_) { + need_schedule_output_ = false; + // Binding to `this` is safe because `this` outlives the `task_queue_`. + task_queue_->PostTask([this]() mutable { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (event_output_) { + RTC_DCHECK(event_output_->IsActive()); + ScheduleOutput(); + } + }); + } + } +} + +bool RtcEventLogImpl::ShouldOutputImmediately() { + if (recent_.history.size() >= max_events_in_history_) { + // We have to emergency drain the buffer. We can't wait for the scheduled + // output task because there might be other event incoming before that. + return true; + } + + return immediately_output_mode_; } void RtcEventLogImpl::ScheduleOutput() { - RTC_DCHECK(event_output_ && event_output_->IsActive()); - if (history_.size() >= max_events_in_history_) { - // We have to emergency drain the buffer. We can't wait for the scheduled - // output task because there might be other event incoming before that. - LogEventsFromMemoryToOutput(); - return; - } - - RTC_DCHECK(output_period_ms_.has_value()); - if (*output_period_ms_ == kImmediateOutput) { - // We are already on the `task_queue_` so there is no reason to post a task - // if we want to output immediately. - LogEventsFromMemoryToOutput(); - return; - } - - if (!output_scheduled_) { - output_scheduled_ = true; - // Binding to `this` is safe because `this` outlives the `task_queue_`. - auto output_task = [this]() { - RTC_DCHECK_RUN_ON(task_queue_.get()); - if (event_output_) { - RTC_DCHECK(event_output_->IsActive()); - LogEventsFromMemoryToOutput(); - } - output_scheduled_ = false; - }; - const int64_t now_ms = rtc::TimeMillis(); - const int64_t time_since_output_ms = now_ms - last_output_ms_; - const uint32_t delay = rtc::SafeClamp( - *output_period_ms_ - time_since_output_ms, 0, *output_period_ms_); - task_queue_->PostDelayedTask(std::move(output_task), - TimeDelta::Millis(delay)); - } + RTC_DCHECK(output_period_ms_ != kImmediateOutput); + // Binding to `this` is safe because `this` outlives the `task_queue_`. + auto output_task = [this]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + // Allow scheduled output if the `event_output_` is valid. + if (event_output_) { + RTC_DCHECK(event_output_->IsActive()); + mutex_.Lock(); + RTC_DCHECK(!need_schedule_output_); + // Let the next `Log()` to schedule output. + need_schedule_output_ = true; + EventHistories histories = ExtractRecentHistories(); + mutex_.Unlock(); + LogEventsToOutput(std::move(histories)); + } + }; + const int64_t now_ms = rtc::TimeMillis(); + const int64_t time_since_output_ms = now_ms - last_output_ms_; + const int32_t delay = rtc::SafeClamp(output_period_ms_ - time_since_output_ms, + 0, output_period_ms_); + task_queue_->PostDelayedTask(std::move(output_task), + TimeDelta::Millis(delay)); } void RtcEventLogImpl::LogToMemory(std::unique_ptr event) { - std::deque>& container = - event->IsConfigEvent() ? config_history_ : history_; + EventDeque& container = + event->IsConfigEvent() ? recent_.config_history : recent_.history; const size_t container_max_size = event->IsConfigEvent() ? max_config_events_in_history_ : max_events_in_history_; - if (container.size() >= container_max_size) { - RTC_DCHECK(!event_output_); // Shouldn't lose events if we have an output. + // Shouldn't lose events if started. + if (container.size() >= container_max_size && !logging_state_started_) { container.pop_front(); } container.push_back(std::move(event)); } -void RtcEventLogImpl::LogEventsFromMemoryToOutput() { - RTC_DCHECK(event_output_ && event_output_->IsActive()); +void RtcEventLogImpl::LogEventsToOutput(EventHistories histories) { last_output_ms_ = rtc::TimeMillis(); - // Serialize all stream configurations that haven't already been written to - // this output. `num_config_events_written_` is used to track which configs we - // have already written. (Note that the config may have been written to - // previous outputs; configs are not discarded.) - std::string encoded_configs; - RTC_DCHECK_LE(num_config_events_written_, config_history_.size()); - if (num_config_events_written_ < config_history_.size()) { - const auto begin = config_history_.begin() + num_config_events_written_; - const auto end = config_history_.end(); - encoded_configs = event_encoder_->EncodeBatch(begin, end); - num_config_events_written_ = config_history_.size(); - } + // Serialize the stream configurations. + std::string encoded_configs = event_encoder_->EncodeBatch( + histories.config_history.begin(), histories.config_history.end()); // Serialize the events in the event queue. Note that the write may fail, // for example if we are writing to a file and have reached the maximum limit. @@ -223,11 +260,26 @@ void RtcEventLogImpl::LogEventsFromMemoryToOutput() { // log is started immediately after the first one becomes full, then one // cannot rely on the second log to contain everything that isn't in the first // log; one batch of events might be missing. - std::string encoded_history = - event_encoder_->EncodeBatch(history_.begin(), history_.end()); - history_.clear(); + std::string encoded_history = event_encoder_->EncodeBatch( + histories.history.begin(), histories.history.end()); WriteConfigsAndHistoryToOutput(encoded_configs, encoded_history); + + // Unlike other events, the configs are retained. If we stop/start logging + // again, these configs are used to interpret other events. + all_config_history_.insert( + all_config_history_.end(), + std::make_move_iterator(histories.config_history.begin()), + std::make_move_iterator(histories.config_history.end())); + if (all_config_history_.size() > max_config_events_in_history_) { + RTC_LOG(LS_WARNING) << "Dropping config events: " + << all_config_history_.size() << " exceeds maximum " + << max_config_events_in_history_; + all_config_history_.erase(all_config_history_.begin(), + all_config_history_.begin() + + all_config_history_.size() - + max_config_events_in_history_); + } } void RtcEventLogImpl::WriteConfigsAndHistoryToOutput( @@ -263,13 +315,14 @@ void RtcEventLogImpl::StopLoggingInternal() { } void RtcEventLogImpl::WriteToOutput(absl::string_view output_string) { - RTC_DCHECK(event_output_ && event_output_->IsActive()); - if (!event_output_->Write(output_string)) { - RTC_LOG(LS_ERROR) << "Failed to write RTC event to output."; - // The first failure closes the output. - RTC_DCHECK(!event_output_->IsActive()); - StopOutput(); // Clean-up. - return; + if (event_output_) { + RTC_DCHECK(event_output_->IsActive()); + if (!event_output_->Write(output_string)) { + RTC_LOG(LS_ERROR) << "Failed to write RTC event to output."; + // The first failure closes the output. + RTC_DCHECK(!event_output_->IsActive()); + StopOutput(); // Clean-up. + } } } diff --git a/logging/rtc_event_log/rtc_event_log_impl.h b/logging/rtc_event_log/rtc_event_log_impl.h index 6ffed0ad22..3187a7fe87 100644 --- a/logging/rtc_event_log/rtc_event_log_impl.h +++ b/logging/rtc_event_log/rtc_event_log_impl.h @@ -18,7 +18,6 @@ #include #include "absl/strings/string_view.h" -#include "absl/types/optional.h" #include "api/rtc_event_log/rtc_event.h" #include "api/rtc_event_log/rtc_event_log.h" #include "api/rtc_event_log_output.h" @@ -60,11 +59,23 @@ class RtcEventLogImpl final : public RtcEventLog { void StopLogging() override; void StopLogging(std::function callback) override; + // Records event into `recent_` on current thread, and schedules the output on + // task queue if the buffers are full or `output_period_ms_` is expired. void Log(std::unique_ptr event) override; private: - void LogToMemory(std::unique_ptr event) RTC_RUN_ON(task_queue_); - void LogEventsFromMemoryToOutput() RTC_RUN_ON(task_queue_); + using EventDeque = std::deque>; + + struct EventHistories { + EventDeque config_history; + EventDeque history; + }; + + // Helper to extract and clear `recent_`. + EventHistories ExtractRecentHistories() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void LogToMemory(std::unique_ptr event) + RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void LogEventsToOutput(EventHistories histories) RTC_RUN_ON(task_queue_); void StopOutput() RTC_RUN_ON(task_queue_); @@ -75,6 +86,7 @@ class RtcEventLogImpl final : public RtcEventLog { void StopLoggingInternal() RTC_RUN_ON(task_queue_); + bool ShouldOutputImmediately() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); void ScheduleOutput() RTC_RUN_ON(task_queue_); // Max size of event history. @@ -84,29 +96,31 @@ class RtcEventLogImpl final : public RtcEventLog { const size_t max_config_events_in_history_; // History containing all past configuration events. - std::deque> config_history_ - RTC_GUARDED_BY(*task_queue_); + EventDeque all_config_history_ RTC_GUARDED_BY(task_queue_); - // History containing the most recent (non-configuration) events (~10s). - std::deque> history_ RTC_GUARDED_BY(*task_queue_); + // `config_history` containing the most recent configuration events. + // `history` containing the most recent (non-configuration) events (~10s). + EventHistories recent_ RTC_GUARDED_BY(mutex_); std::unique_ptr event_encoder_ - RTC_GUARDED_BY(*task_queue_); - std::unique_ptr event_output_ RTC_GUARDED_BY(*task_queue_); + RTC_GUARDED_BY(task_queue_); + std::unique_ptr event_output_ RTC_GUARDED_BY(task_queue_); - size_t num_config_events_written_ RTC_GUARDED_BY(*task_queue_); - absl::optional output_period_ms_ RTC_GUARDED_BY(*task_queue_); - int64_t last_output_ms_ RTC_GUARDED_BY(*task_queue_); - bool output_scheduled_ RTC_GUARDED_BY(*task_queue_); + int64_t output_period_ms_ RTC_GUARDED_BY(task_queue_); + int64_t last_output_ms_ RTC_GUARDED_BY(task_queue_); RTC_NO_UNIQUE_ADDRESS SequenceChecker logging_state_checker_; - bool logging_state_started_ RTC_GUARDED_BY(logging_state_checker_); + bool logging_state_started_ RTC_GUARDED_BY(mutex_) = false; + bool immediately_output_mode_ RTC_GUARDED_BY(mutex_) = false; + bool need_schedule_output_ RTC_GUARDED_BY(mutex_) = false; // Since we are posting tasks bound to `this`, it is critical that the event // log and its members outlive `task_queue_`. Keep the `task_queue_` // last to ensure it destructs first, or else tasks living on the queue might // access other members after they've been torn down. std::unique_ptr task_queue_; + + Mutex mutex_; }; } // namespace webrtc