From 53cf3463c0c31039a750d49acf4755ed03043277 Mon Sep 17 00:00:00 2001 From: sprang Date: Tue, 22 Mar 2016 01:51:39 -0700 Subject: [PATCH] Fix race condition in EventTimerPosix The intended signalling from StartTimer() to Process() is that created_at_.tv_sec is set to 0, and timer_event_->Set() is then called in order to wake the process thread from timer_event_->Wait(). When this happens the process thread will return early and the run Process() again. This time it will pick up created_at_.tv_sec = 0 and run a new Wait() call with the desired end time. However if the process thread was NOT blocking on timer_event_->Wait() when timer_event_->Set() was called from StartTimer() it will mean that the first call to timer_event_->Wait() from Process(), AFTER the new time has been configured (count_ = 1), will return early. If the timer is not periodic it means that Set() will never be called, and any calls will Wait() will block until the time out. The solution is to always reset the event in timer_event_ on the first call to timerEvent_->Wait(), after a timer has started. Also some general cleanup. BUG= Review URL: https://codereview.webrtc.org/1812533002 Cr-Commit-Position: refs/heads/master@{#12082} --- .../source/event_timer_posix.cc | 81 ++++--- .../source/event_timer_posix.h | 10 +- .../source/event_timer_posix_unittest.cc | 198 ++++++++++++++++++ .../system_wrappers/system_wrappers_tests.gyp | 1 + 4 files changed, 260 insertions(+), 30 deletions(-) create mode 100644 webrtc/system_wrappers/source/event_timer_posix_unittest.cc diff --git a/webrtc/system_wrappers/source/event_timer_posix.cc b/webrtc/system_wrappers/source/event_timer_posix.cc index 990ff725fb..b46b83857f 100644 --- a/webrtc/system_wrappers/source/event_timer_posix.cc +++ b/webrtc/system_wrappers/source/event_timer_posix.cc @@ -27,16 +27,17 @@ EventTimerWrapper* EventTimerWrapper::Create() { return new EventTimerPosix(); } -const long int E6 = 1000000; -const long int E9 = 1000 * E6; +const int64_t kNanosecondsPerMillisecond = 1000000; +const int64_t kNanosecondsPerSecond = 1000000000; EventTimerPosix::EventTimerPosix() : event_set_(false), timer_thread_(nullptr), created_at_(), periodic_(false), - time_(0), - count_(0) { + time_ms_(0), + count_(0), + is_stopping_(false) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); @@ -70,12 +71,12 @@ bool EventTimerPosix::Set() { return true; } -EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout) { +EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout_ms) { int ret_val = 0; RTC_CHECK_EQ(0, pthread_mutex_lock(&mutex_)); if (!event_set_) { - if (WEBRTC_EVENT_INFINITE != timeout) { + if (WEBRTC_EVENT_INFINITE != timeout_ms) { timespec end_at; #ifndef WEBRTC_MAC clock_gettime(CLOCK_MONOTONIC, &end_at); @@ -87,12 +88,12 @@ EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout) { gettimeofday(&value, &time_zone); TIMEVAL_TO_TIMESPEC(&value, &end_at); #endif - end_at.tv_sec += timeout / 1000; - end_at.tv_nsec += (timeout - (timeout / 1000) * 1000) * E6; + end_at.tv_sec += timeout_ms / 1000; + end_at.tv_nsec += (timeout_ms % 1000) * kNanosecondsPerMillisecond; - if (end_at.tv_nsec >= E9) { + if (end_at.tv_nsec >= kNanosecondsPerSecond) { end_at.tv_sec++; - end_at.tv_nsec -= E9; + end_at.tv_nsec -= kNanosecondsPerSecond; } while (ret_val == 0 && !event_set_) { #if defined(WEBRTC_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC) @@ -119,9 +120,13 @@ EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout) { return ret_val == 0 ? kEventSignaled : kEventTimeout; } -EventTypeWrapper EventTimerPosix::Wait(timespec* end_at) { +EventTypeWrapper EventTimerPosix::Wait(timespec* end_at, bool reset_event) { int ret_val = 0; RTC_CHECK_EQ(0, pthread_mutex_lock(&mutex_)); + if (reset_event) { + // Only wake for new events or timeouts. + event_set_ = false; + } while (ret_val == 0 && !event_set_) { #if defined(WEBRTC_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC) @@ -143,7 +148,12 @@ EventTypeWrapper EventTimerPosix::Wait(timespec* end_at) { return ret_val == 0 ? kEventSignaled : kEventTimeout; } -bool EventTimerPosix::StartTimer(bool periodic, unsigned long time) { +rtc::PlatformThread* EventTimerPosix::CreateThread() { + const char* kThreadName = "WebRtc_event_timer_thread"; + return new rtc::PlatformThread(Run, this, kThreadName); +} + +bool EventTimerPosix::StartTimer(bool periodic, unsigned long time_ms) { pthread_mutex_lock(&mutex_); if (timer_thread_) { if (periodic_) { @@ -151,8 +161,8 @@ bool EventTimerPosix::StartTimer(bool periodic, unsigned long time) { pthread_mutex_unlock(&mutex_); return false; } else { - // New one shot timer - time_ = time; + // New one shot timer. + time_ms_ = time_ms; created_at_.tv_sec = 0; timer_event_->Set(); pthread_mutex_unlock(&mutex_); @@ -160,12 +170,11 @@ bool EventTimerPosix::StartTimer(bool periodic, unsigned long time) { } } - // Start the timer thread + // Start the timer thread. timer_event_.reset(new EventTimerPosix()); - const char* thread_name = "WebRtc_event_timer_thread"; - timer_thread_.reset(new rtc::PlatformThread(Run, this, thread_name)); + timer_thread_.reset(CreateThread()); periodic_ = periodic; - time_ = time; + time_ms_ = time_ms; timer_thread_->Start(); timer_thread_->SetPriority(rtc::kRealtimePriority); pthread_mutex_unlock(&mutex_); @@ -179,9 +188,13 @@ bool EventTimerPosix::Run(void* obj) { bool EventTimerPosix::Process() { pthread_mutex_lock(&mutex_); + if (is_stopping_) { + pthread_mutex_unlock(&mutex_); + return false; + } if (created_at_.tv_sec == 0) { #ifndef WEBRTC_MAC - clock_gettime(CLOCK_MONOTONIC, &created_at_); + RTC_CHECK_EQ(0, clock_gettime(CLOCK_MONOTONIC, &created_at_)); #else timeval value; struct timezone time_zone; @@ -194,17 +207,27 @@ bool EventTimerPosix::Process() { } timespec end_at; - unsigned long long time = time_ * ++count_; - end_at.tv_sec = created_at_.tv_sec + time / 1000; - end_at.tv_nsec = created_at_.tv_nsec + (time - (time / 1000) * 1000) * E6; + unsigned long long total_delta_ms = time_ms_ * ++count_; + if (!periodic_ && count_ >= 1) { + // No need to wake up often if we're not going to signal waiting threads. + total_delta_ms = + std::min(total_delta_ms, 60 * kNanosecondsPerSecond); + } - if (end_at.tv_nsec >= E9) { + end_at.tv_sec = created_at_.tv_sec + total_delta_ms / 1000; + end_at.tv_nsec = created_at_.tv_nsec + + (total_delta_ms % 1000) * kNanosecondsPerMillisecond; + + if (end_at.tv_nsec >= kNanosecondsPerSecond) { end_at.tv_sec++; - end_at.tv_nsec -= E9; + end_at.tv_nsec -= kNanosecondsPerSecond; } pthread_mutex_unlock(&mutex_); - if (timer_event_->Wait(&end_at) == kEventSignaled) + // Reset event on first call so that we don't immediately return here if this + // thread was not blocked on timer_event_->Wait when the StartTimer() call + // was made. + if (timer_event_->Wait(&end_at, count_ == 1) == kEventSignaled) return true; pthread_mutex_lock(&mutex_); @@ -216,9 +239,13 @@ bool EventTimerPosix::Process() { } bool EventTimerPosix::StopTimer() { - if (timer_event_) { + pthread_mutex_lock(&mutex_); + is_stopping_ = true; + pthread_mutex_unlock(&mutex_); + + if (timer_event_) timer_event_->Set(); - } + if (timer_thread_) { timer_thread_->Stop(); timer_thread_.reset(); diff --git a/webrtc/system_wrappers/source/event_timer_posix.h b/webrtc/system_wrappers/source/event_timer_posix.h index bbf51f72db..af3715ee8b 100644 --- a/webrtc/system_wrappers/source/event_timer_posix.h +++ b/webrtc/system_wrappers/source/event_timer_posix.h @@ -37,11 +37,14 @@ class EventTimerPosix : public EventTimerWrapper { bool StopTimer() override; private: + friend class EventTimerPosixTest; + static bool Run(void* obj); bool Process(); - EventTypeWrapper Wait(timespec* end_at); + EventTypeWrapper Wait(timespec* end_at, bool reset_state); + + virtual rtc::PlatformThread* CreateThread(); - private: pthread_cond_t cond_; pthread_mutex_t mutex_; bool event_set_; @@ -52,8 +55,9 @@ class EventTimerPosix : public EventTimerWrapper { timespec created_at_; bool periodic_; - unsigned long time_; // In ms + unsigned long time_ms_; unsigned long count_; + bool is_stopping_; }; } // namespace webrtc diff --git a/webrtc/system_wrappers/source/event_timer_posix_unittest.cc b/webrtc/system_wrappers/source/event_timer_posix_unittest.cc new file mode 100644 index 0000000000..4f4514dbcc --- /dev/null +++ b/webrtc/system_wrappers/source/event_timer_posix_unittest.cc @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/system_wrappers/source/event_timer_posix.h" + +#include "testing/gtest/include/gtest/gtest.h" +#include "webrtc/base/event.h" +#include "webrtc/base/criticalsection.h" + +namespace webrtc { + +enum class ThreadState { + kNotStarted, + kWaiting, + kRequestProcessCall, + kCallingProcess, + kProcessDone, + kContinue, + kExiting, + kDead +}; + +class EventTimerPosixTest : public testing::Test, public EventTimerPosix { + public: + EventTimerPosixTest() + : thread_state_(ThreadState::kNotStarted), + process_event_(false, true), + main_event_(false, true), + process_thread_id_(0), + process_thread_(nullptr) {} + virtual ~EventTimerPosixTest() {} + + rtc::PlatformThread* CreateThread() override { + EXPECT_TRUE(process_thread_ == nullptr); + process_thread_ = + new rtc::PlatformThread(Run, this, "EventTimerPosixTestThread"); + return process_thread_; + } + + static bool Run(void* obj) { + return static_cast(obj)->Process(); + } + + bool Process() { + bool res = ProcessInternal(); + if (!res) { + rtc::CritScope cs(&lock_); + thread_state_ = ThreadState::kDead; + main_event_.Set(); + } + return res; + } + + bool ProcessInternal() { + { + rtc::CritScope cs(&lock_); + if (thread_state_ == ThreadState::kNotStarted) { + if (!ChangeThreadState(ThreadState::kNotStarted, + ThreadState::kContinue)) { + ADD_FAILURE() << "Unable to start process thread"; + return false; + } + process_thread_id_ = rtc::CurrentThreadId(); + } + } + + if (!ChangeThreadState(ThreadState::kContinue, ThreadState::kWaiting)) + return false; + + if (!AwaitThreadState(ThreadState::kRequestProcessCall, + rtc::Event::kForever)) + return false; + + if (!ChangeThreadState(ThreadState::kRequestProcessCall, + ThreadState::kCallingProcess)) + return false; + + EventTimerPosix::Process(); + + if (!ChangeThreadState(ThreadState::kCallingProcess, + ThreadState::kProcessDone)) + return false; + + if (!AwaitThreadState(ThreadState::kContinue, rtc::Event::kForever)) + return false; + + return true; + } + + bool IsProcessThread() { + rtc::CritScope cs(&lock_); + return process_thread_id_ == rtc::CurrentThreadId(); + } + + bool ChangeThreadState(ThreadState prev_state, ThreadState new_state) { + rtc::CritScope cs(&lock_); + if (thread_state_ != prev_state) + return false; + thread_state_ = new_state; + if (IsProcessThread()) { + main_event_.Set(); + } else { + process_event_.Set(); + } + return true; + } + + bool AwaitThreadState(ThreadState state, int timeout) { + rtc::Event* event = IsProcessThread() ? &process_event_ : &main_event_; + do { + rtc::CritScope cs(&lock_); + if (state != ThreadState::kDead && thread_state_ == ThreadState::kExiting) + return false; + if (thread_state_ == state) + return true; + } while (event->Wait(timeout)); + return false; + } + + bool CallProcess(int timeout_ms) { + return AwaitThreadState(ThreadState::kWaiting, timeout_ms) && + ChangeThreadState(ThreadState::kWaiting, + ThreadState::kRequestProcessCall); + } + + bool AwaitProcessDone(int timeout_ms) { + return AwaitThreadState(ThreadState::kProcessDone, timeout_ms) && + ChangeThreadState(ThreadState::kProcessDone, ThreadState::kContinue); + } + + void TearDown() override { + if (process_thread_) { + { + rtc::CritScope cs(&lock_); + if (thread_state_ != ThreadState::kDead) { + thread_state_ = ThreadState::kExiting; + process_event_.Set(); + } + } + ASSERT_TRUE(AwaitThreadState(ThreadState::kDead, 5000)); + } + } + + ThreadState thread_state_; + rtc::CriticalSection lock_; + rtc::Event process_event_; + rtc::Event main_event_; + rtc::PlatformThreadId process_thread_id_; + rtc::PlatformThread* process_thread_; +}; + +TEST_F(EventTimerPosixTest, WaiterBlocksUntilTimeout) { + const int kTimerIntervalMs = 100; + const int kTimeoutMs = 5000; + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + ASSERT_TRUE(CallProcess(kTimeoutMs)); + EventTypeWrapper res = Wait(kTimeoutMs); + EXPECT_EQ(kEventSignaled, res); + ASSERT_TRUE(AwaitProcessDone(kTimeoutMs)); +} + +TEST_F(EventTimerPosixTest, WaiterWakesImmediatelyAfterTimeout) { + const int kTimerIntervalMs = 100; + const int kTimeoutMs = 5000; + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + ASSERT_TRUE(CallProcess(kTimeoutMs)); + ASSERT_TRUE(AwaitProcessDone(kTimeoutMs)); + EventTypeWrapper res = Wait(0); + EXPECT_EQ(kEventSignaled, res); +} + +TEST_F(EventTimerPosixTest, WaiterBlocksUntilTimeoutProcessInactiveOnStart) { + const int kTimerIntervalMs = 100; + const int kTimeoutMs = 5000; + // First call to StartTimer initializes thread. + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + + // Process thread currently _not_ blocking on Process() call. + ASSERT_TRUE(AwaitThreadState(ThreadState::kWaiting, kTimeoutMs)); + + // Start new one-off timer, then call Process(). + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + ASSERT_TRUE(CallProcess(kTimeoutMs)); + + EventTypeWrapper res = Wait(kTimeoutMs); + EXPECT_EQ(kEventSignaled, res); + + ASSERT_TRUE(AwaitProcessDone(kTimeoutMs)); +} + +} // namespace webrtc diff --git a/webrtc/system_wrappers/system_wrappers_tests.gyp b/webrtc/system_wrappers/system_wrappers_tests.gyp index a0ae14d6cf..2cf4dfd1a2 100644 --- a/webrtc/system_wrappers/system_wrappers_tests.gyp +++ b/webrtc/system_wrappers/system_wrappers_tests.gyp @@ -30,6 +30,7 @@ 'source/data_log_helpers_unittest.cc', 'source/data_log_c_helpers_unittest.c', 'source/data_log_c_helpers_unittest.h', + 'source/event_timer_posix_unittest.cc', 'source/metrics_unittest.cc', 'source/ntp_time_unittest.cc', 'source/rtp_to_ntp_unittest.cc',