diff --git a/webrtc/system_wrappers/source/event_timer_posix.cc b/webrtc/system_wrappers/source/event_timer_posix.cc index 990ff725fb..b46b83857f 100644 --- a/webrtc/system_wrappers/source/event_timer_posix.cc +++ b/webrtc/system_wrappers/source/event_timer_posix.cc @@ -27,16 +27,17 @@ EventTimerWrapper* EventTimerWrapper::Create() { return new EventTimerPosix(); } -const long int E6 = 1000000; -const long int E9 = 1000 * E6; +const int64_t kNanosecondsPerMillisecond = 1000000; +const int64_t kNanosecondsPerSecond = 1000000000; EventTimerPosix::EventTimerPosix() : event_set_(false), timer_thread_(nullptr), created_at_(), periodic_(false), - time_(0), - count_(0) { + time_ms_(0), + count_(0), + is_stopping_(false) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); @@ -70,12 +71,12 @@ bool EventTimerPosix::Set() { return true; } -EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout) { +EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout_ms) { int ret_val = 0; RTC_CHECK_EQ(0, pthread_mutex_lock(&mutex_)); if (!event_set_) { - if (WEBRTC_EVENT_INFINITE != timeout) { + if (WEBRTC_EVENT_INFINITE != timeout_ms) { timespec end_at; #ifndef WEBRTC_MAC clock_gettime(CLOCK_MONOTONIC, &end_at); @@ -87,12 +88,12 @@ EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout) { gettimeofday(&value, &time_zone); TIMEVAL_TO_TIMESPEC(&value, &end_at); #endif - end_at.tv_sec += timeout / 1000; - end_at.tv_nsec += (timeout - (timeout / 1000) * 1000) * E6; + end_at.tv_sec += timeout_ms / 1000; + end_at.tv_nsec += (timeout_ms % 1000) * kNanosecondsPerMillisecond; - if (end_at.tv_nsec >= E9) { + if (end_at.tv_nsec >= kNanosecondsPerSecond) { end_at.tv_sec++; - end_at.tv_nsec -= E9; + end_at.tv_nsec -= kNanosecondsPerSecond; } while (ret_val == 0 && !event_set_) { #if defined(WEBRTC_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC) @@ -119,9 +120,13 @@ EventTypeWrapper EventTimerPosix::Wait(unsigned long timeout) { return ret_val == 0 ? kEventSignaled : kEventTimeout; } -EventTypeWrapper EventTimerPosix::Wait(timespec* end_at) { +EventTypeWrapper EventTimerPosix::Wait(timespec* end_at, bool reset_event) { int ret_val = 0; RTC_CHECK_EQ(0, pthread_mutex_lock(&mutex_)); + if (reset_event) { + // Only wake for new events or timeouts. + event_set_ = false; + } while (ret_val == 0 && !event_set_) { #if defined(WEBRTC_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC) @@ -143,7 +148,12 @@ EventTypeWrapper EventTimerPosix::Wait(timespec* end_at) { return ret_val == 0 ? kEventSignaled : kEventTimeout; } -bool EventTimerPosix::StartTimer(bool periodic, unsigned long time) { +rtc::PlatformThread* EventTimerPosix::CreateThread() { + const char* kThreadName = "WebRtc_event_timer_thread"; + return new rtc::PlatformThread(Run, this, kThreadName); +} + +bool EventTimerPosix::StartTimer(bool periodic, unsigned long time_ms) { pthread_mutex_lock(&mutex_); if (timer_thread_) { if (periodic_) { @@ -151,8 +161,8 @@ bool EventTimerPosix::StartTimer(bool periodic, unsigned long time) { pthread_mutex_unlock(&mutex_); return false; } else { - // New one shot timer - time_ = time; + // New one shot timer. + time_ms_ = time_ms; created_at_.tv_sec = 0; timer_event_->Set(); pthread_mutex_unlock(&mutex_); @@ -160,12 +170,11 @@ bool EventTimerPosix::StartTimer(bool periodic, unsigned long time) { } } - // Start the timer thread + // Start the timer thread. timer_event_.reset(new EventTimerPosix()); - const char* thread_name = "WebRtc_event_timer_thread"; - timer_thread_.reset(new rtc::PlatformThread(Run, this, thread_name)); + timer_thread_.reset(CreateThread()); periodic_ = periodic; - time_ = time; + time_ms_ = time_ms; timer_thread_->Start(); timer_thread_->SetPriority(rtc::kRealtimePriority); pthread_mutex_unlock(&mutex_); @@ -179,9 +188,13 @@ bool EventTimerPosix::Run(void* obj) { bool EventTimerPosix::Process() { pthread_mutex_lock(&mutex_); + if (is_stopping_) { + pthread_mutex_unlock(&mutex_); + return false; + } if (created_at_.tv_sec == 0) { #ifndef WEBRTC_MAC - clock_gettime(CLOCK_MONOTONIC, &created_at_); + RTC_CHECK_EQ(0, clock_gettime(CLOCK_MONOTONIC, &created_at_)); #else timeval value; struct timezone time_zone; @@ -194,17 +207,27 @@ bool EventTimerPosix::Process() { } timespec end_at; - unsigned long long time = time_ * ++count_; - end_at.tv_sec = created_at_.tv_sec + time / 1000; - end_at.tv_nsec = created_at_.tv_nsec + (time - (time / 1000) * 1000) * E6; + unsigned long long total_delta_ms = time_ms_ * ++count_; + if (!periodic_ && count_ >= 1) { + // No need to wake up often if we're not going to signal waiting threads. + total_delta_ms = + std::min(total_delta_ms, 60 * kNanosecondsPerSecond); + } - if (end_at.tv_nsec >= E9) { + end_at.tv_sec = created_at_.tv_sec + total_delta_ms / 1000; + end_at.tv_nsec = created_at_.tv_nsec + + (total_delta_ms % 1000) * kNanosecondsPerMillisecond; + + if (end_at.tv_nsec >= kNanosecondsPerSecond) { end_at.tv_sec++; - end_at.tv_nsec -= E9; + end_at.tv_nsec -= kNanosecondsPerSecond; } pthread_mutex_unlock(&mutex_); - if (timer_event_->Wait(&end_at) == kEventSignaled) + // Reset event on first call so that we don't immediately return here if this + // thread was not blocked on timer_event_->Wait when the StartTimer() call + // was made. + if (timer_event_->Wait(&end_at, count_ == 1) == kEventSignaled) return true; pthread_mutex_lock(&mutex_); @@ -216,9 +239,13 @@ bool EventTimerPosix::Process() { } bool EventTimerPosix::StopTimer() { - if (timer_event_) { + pthread_mutex_lock(&mutex_); + is_stopping_ = true; + pthread_mutex_unlock(&mutex_); + + if (timer_event_) timer_event_->Set(); - } + if (timer_thread_) { timer_thread_->Stop(); timer_thread_.reset(); diff --git a/webrtc/system_wrappers/source/event_timer_posix.h b/webrtc/system_wrappers/source/event_timer_posix.h index bbf51f72db..af3715ee8b 100644 --- a/webrtc/system_wrappers/source/event_timer_posix.h +++ b/webrtc/system_wrappers/source/event_timer_posix.h @@ -37,11 +37,14 @@ class EventTimerPosix : public EventTimerWrapper { bool StopTimer() override; private: + friend class EventTimerPosixTest; + static bool Run(void* obj); bool Process(); - EventTypeWrapper Wait(timespec* end_at); + EventTypeWrapper Wait(timespec* end_at, bool reset_state); + + virtual rtc::PlatformThread* CreateThread(); - private: pthread_cond_t cond_; pthread_mutex_t mutex_; bool event_set_; @@ -52,8 +55,9 @@ class EventTimerPosix : public EventTimerWrapper { timespec created_at_; bool periodic_; - unsigned long time_; // In ms + unsigned long time_ms_; unsigned long count_; + bool is_stopping_; }; } // namespace webrtc diff --git a/webrtc/system_wrappers/source/event_timer_posix_unittest.cc b/webrtc/system_wrappers/source/event_timer_posix_unittest.cc new file mode 100644 index 0000000000..4f4514dbcc --- /dev/null +++ b/webrtc/system_wrappers/source/event_timer_posix_unittest.cc @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/system_wrappers/source/event_timer_posix.h" + +#include "testing/gtest/include/gtest/gtest.h" +#include "webrtc/base/event.h" +#include "webrtc/base/criticalsection.h" + +namespace webrtc { + +enum class ThreadState { + kNotStarted, + kWaiting, + kRequestProcessCall, + kCallingProcess, + kProcessDone, + kContinue, + kExiting, + kDead +}; + +class EventTimerPosixTest : public testing::Test, public EventTimerPosix { + public: + EventTimerPosixTest() + : thread_state_(ThreadState::kNotStarted), + process_event_(false, true), + main_event_(false, true), + process_thread_id_(0), + process_thread_(nullptr) {} + virtual ~EventTimerPosixTest() {} + + rtc::PlatformThread* CreateThread() override { + EXPECT_TRUE(process_thread_ == nullptr); + process_thread_ = + new rtc::PlatformThread(Run, this, "EventTimerPosixTestThread"); + return process_thread_; + } + + static bool Run(void* obj) { + return static_cast(obj)->Process(); + } + + bool Process() { + bool res = ProcessInternal(); + if (!res) { + rtc::CritScope cs(&lock_); + thread_state_ = ThreadState::kDead; + main_event_.Set(); + } + return res; + } + + bool ProcessInternal() { + { + rtc::CritScope cs(&lock_); + if (thread_state_ == ThreadState::kNotStarted) { + if (!ChangeThreadState(ThreadState::kNotStarted, + ThreadState::kContinue)) { + ADD_FAILURE() << "Unable to start process thread"; + return false; + } + process_thread_id_ = rtc::CurrentThreadId(); + } + } + + if (!ChangeThreadState(ThreadState::kContinue, ThreadState::kWaiting)) + return false; + + if (!AwaitThreadState(ThreadState::kRequestProcessCall, + rtc::Event::kForever)) + return false; + + if (!ChangeThreadState(ThreadState::kRequestProcessCall, + ThreadState::kCallingProcess)) + return false; + + EventTimerPosix::Process(); + + if (!ChangeThreadState(ThreadState::kCallingProcess, + ThreadState::kProcessDone)) + return false; + + if (!AwaitThreadState(ThreadState::kContinue, rtc::Event::kForever)) + return false; + + return true; + } + + bool IsProcessThread() { + rtc::CritScope cs(&lock_); + return process_thread_id_ == rtc::CurrentThreadId(); + } + + bool ChangeThreadState(ThreadState prev_state, ThreadState new_state) { + rtc::CritScope cs(&lock_); + if (thread_state_ != prev_state) + return false; + thread_state_ = new_state; + if (IsProcessThread()) { + main_event_.Set(); + } else { + process_event_.Set(); + } + return true; + } + + bool AwaitThreadState(ThreadState state, int timeout) { + rtc::Event* event = IsProcessThread() ? &process_event_ : &main_event_; + do { + rtc::CritScope cs(&lock_); + if (state != ThreadState::kDead && thread_state_ == ThreadState::kExiting) + return false; + if (thread_state_ == state) + return true; + } while (event->Wait(timeout)); + return false; + } + + bool CallProcess(int timeout_ms) { + return AwaitThreadState(ThreadState::kWaiting, timeout_ms) && + ChangeThreadState(ThreadState::kWaiting, + ThreadState::kRequestProcessCall); + } + + bool AwaitProcessDone(int timeout_ms) { + return AwaitThreadState(ThreadState::kProcessDone, timeout_ms) && + ChangeThreadState(ThreadState::kProcessDone, ThreadState::kContinue); + } + + void TearDown() override { + if (process_thread_) { + { + rtc::CritScope cs(&lock_); + if (thread_state_ != ThreadState::kDead) { + thread_state_ = ThreadState::kExiting; + process_event_.Set(); + } + } + ASSERT_TRUE(AwaitThreadState(ThreadState::kDead, 5000)); + } + } + + ThreadState thread_state_; + rtc::CriticalSection lock_; + rtc::Event process_event_; + rtc::Event main_event_; + rtc::PlatformThreadId process_thread_id_; + rtc::PlatformThread* process_thread_; +}; + +TEST_F(EventTimerPosixTest, WaiterBlocksUntilTimeout) { + const int kTimerIntervalMs = 100; + const int kTimeoutMs = 5000; + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + ASSERT_TRUE(CallProcess(kTimeoutMs)); + EventTypeWrapper res = Wait(kTimeoutMs); + EXPECT_EQ(kEventSignaled, res); + ASSERT_TRUE(AwaitProcessDone(kTimeoutMs)); +} + +TEST_F(EventTimerPosixTest, WaiterWakesImmediatelyAfterTimeout) { + const int kTimerIntervalMs = 100; + const int kTimeoutMs = 5000; + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + ASSERT_TRUE(CallProcess(kTimeoutMs)); + ASSERT_TRUE(AwaitProcessDone(kTimeoutMs)); + EventTypeWrapper res = Wait(0); + EXPECT_EQ(kEventSignaled, res); +} + +TEST_F(EventTimerPosixTest, WaiterBlocksUntilTimeoutProcessInactiveOnStart) { + const int kTimerIntervalMs = 100; + const int kTimeoutMs = 5000; + // First call to StartTimer initializes thread. + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + + // Process thread currently _not_ blocking on Process() call. + ASSERT_TRUE(AwaitThreadState(ThreadState::kWaiting, kTimeoutMs)); + + // Start new one-off timer, then call Process(). + ASSERT_TRUE(StartTimer(false, kTimerIntervalMs)); + ASSERT_TRUE(CallProcess(kTimeoutMs)); + + EventTypeWrapper res = Wait(kTimeoutMs); + EXPECT_EQ(kEventSignaled, res); + + ASSERT_TRUE(AwaitProcessDone(kTimeoutMs)); +} + +} // namespace webrtc diff --git a/webrtc/system_wrappers/system_wrappers_tests.gyp b/webrtc/system_wrappers/system_wrappers_tests.gyp index a0ae14d6cf..2cf4dfd1a2 100644 --- a/webrtc/system_wrappers/system_wrappers_tests.gyp +++ b/webrtc/system_wrappers/system_wrappers_tests.gyp @@ -30,6 +30,7 @@ 'source/data_log_helpers_unittest.cc', 'source/data_log_c_helpers_unittest.c', 'source/data_log_c_helpers_unittest.h', + 'source/event_timer_posix_unittest.cc', 'source/metrics_unittest.cc', 'source/ntp_time_unittest.cc', 'source/rtp_to_ntp_unittest.cc',