New lock implementation for mac.

According to my measurements, it's about 100x faster than the native mutex implementation in OSX.  Google "OSX mutex performance" for more info.

BUG=

Review URL: https://codereview.webrtc.org/1594723003

Cr-Commit-Position: refs/heads/master@{#11352}
This commit is contained in:
tommi 2016-01-21 23:47:25 -08:00 committed by Commit bot
parent 2bf9a5f11b
commit ed281e9c9b
3 changed files with 208 additions and 1 deletions

View File

@ -12,17 +12,26 @@
#include "webrtc/base/checks.h"
// TODO(tommi): Split this file up to per-platform implementation files.
namespace rtc {
CriticalSection::CriticalSection() {
#if defined(WEBRTC_WIN)
InitializeCriticalSection(&crit_);
#else
#if defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
lock_queue_ = 0;
owning_thread_ = 0;
recursion_ = 0;
semaphore_ = dispatch_semaphore_create(0);
#else
pthread_mutexattr_t mutex_attribute;
pthread_mutexattr_init(&mutex_attribute);
pthread_mutexattr_settype(&mutex_attribute, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&mutex_, &mutex_attribute);
pthread_mutexattr_destroy(&mutex_attribute);
#endif
CS_DEBUG_CODE(thread_ = 0);
CS_DEBUG_CODE(recursion_count_ = 0);
#endif
@ -31,16 +40,60 @@ CriticalSection::CriticalSection() {
CriticalSection::~CriticalSection() {
#if defined(WEBRTC_WIN)
DeleteCriticalSection(&crit_);
#else
#if defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
dispatch_release(semaphore_);
#else
pthread_mutex_destroy(&mutex_);
#endif
#endif
}
void CriticalSection::Enter() const EXCLUSIVE_LOCK_FUNCTION() {
#if defined(WEBRTC_WIN)
EnterCriticalSection(&crit_);
#else
#if defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
int spin = 3000;
pthread_t self = pthread_self();
bool have_lock = false;
do {
// Instead of calling TryEnter() in this loop, we do two interlocked
// operations, first a read-only one in order to avoid affecting the lock
// cache-line while spinning, in case another thread is using the lock.
if (owning_thread_ != self) {
if (AtomicOps::AcquireLoad(&lock_queue_) == 0) {
if (AtomicOps::CompareAndSwap(&lock_queue_, 0, 1) == 0) {
have_lock = true;
break;
}
}
} else {
AtomicOps::Increment(&lock_queue_);
have_lock = true;
break;
}
sched_yield();
} while (--spin);
if (!have_lock && AtomicOps::Increment(&lock_queue_) > 1) {
// Owning thread cannot be the current thread since TryEnter() would
// have succeeded.
RTC_DCHECK(owning_thread_ != self);
// Wait for the lock to become available.
dispatch_semaphore_wait(semaphore_, DISPATCH_TIME_FOREVER);
RTC_DCHECK(owning_thread_ == 0);
RTC_DCHECK(!recursion_);
}
owning_thread_ = self;
++recursion_;
#else
pthread_mutex_lock(&mutex_);
#endif
#if CS_DEBUG_CHECKS
if (!recursion_count_) {
RTC_DCHECK(!thread_);
@ -56,9 +109,21 @@ void CriticalSection::Enter() const EXCLUSIVE_LOCK_FUNCTION() {
bool CriticalSection::TryEnter() const EXCLUSIVE_TRYLOCK_FUNCTION(true) {
#if defined(WEBRTC_WIN)
return TryEnterCriticalSection(&crit_) != FALSE;
#else
#if defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
if (owning_thread_ != pthread_self()) {
if (AtomicOps::CompareAndSwap(&lock_queue_, 0, 1) != 0)
return false;
owning_thread_ = pthread_self();
RTC_DCHECK(!recursion_);
} else {
AtomicOps::Increment(&lock_queue_);
}
++recursion_;
#else
if (pthread_mutex_trylock(&mutex_) != 0)
return false;
#endif
#if CS_DEBUG_CHECKS
if (!recursion_count_) {
RTC_DCHECK(!thread_);
@ -82,8 +147,19 @@ void CriticalSection::Leave() const UNLOCK_FUNCTION() {
if (!recursion_count_)
thread_ = 0;
#endif
#if defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
RTC_DCHECK_EQ(owning_thread_, pthread_self());
RTC_DCHECK_GE(recursion_, 0);
--recursion_;
if (!recursion_)
owning_thread_ = 0;
if (AtomicOps::Decrement(&lock_queue_) > 0 && !recursion_)
dispatch_semaphore_signal(semaphore_);
#else
pthread_mutex_unlock(&mutex_);
#endif
#endif
}
bool CriticalSection::CurrentThreadIsOwner() const {
@ -135,13 +211,15 @@ bool TryCritScope::locked() const {
}
void GlobalLockPod::Lock() {
#if !defined(WEBRTC_WIN)
#if !defined(WEBRTC_WIN) && (!defined(WEBRTC_MAC) || USE_NATIVE_MUTEX_ON_MAC)
const struct timespec ts_null = {0};
#endif
while (AtomicOps::CompareAndSwap(&lock_acquired, 0, 1)) {
#if defined(WEBRTC_WIN)
::Sleep(0);
#elif defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
sched_yield();
#else
nanosleep(&ts_null, nullptr);
#endif

View File

@ -29,6 +29,13 @@
#include <pthread.h>
#endif
// See notes in the 'Performance' unit test for the effects of this flag.
#define USE_NATIVE_MUTEX_ON_MAC 0
#if defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
#include <dispatch/dispatch.h>
#endif
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
#define CS_DEBUG_CHECKS 1
#endif
@ -62,7 +69,21 @@ class LOCKABLE CriticalSection {
#if defined(WEBRTC_WIN)
mutable CRITICAL_SECTION crit_;
#elif defined(WEBRTC_POSIX)
#if defined(WEBRTC_MAC) && !USE_NATIVE_MUTEX_ON_MAC
// Number of times the lock has been locked + number of threads waiting.
// TODO(tommi): We could use this number and subtract the recursion count
// to find places where we have multiple threads contending on the same lock.
mutable volatile int lock_queue_;
// |recursion_| represents the recursion count + 1 for the thread that owns
// the lock. Only modified by the thread that owns the lock.
mutable int recursion_;
// Used to signal a single waiting thread when the lock becomes available.
mutable dispatch_semaphore_t semaphore_;
// The thread that currently holds the lock. Required to handle recursion.
mutable pthread_t owning_thread_;
#else
mutable pthread_mutex_t mutex_;
#endif
CS_DEBUG_CODE(mutable pthread_t thread_);
CS_DEBUG_CODE(mutable int recursion_count_);
#endif

View File

@ -11,9 +11,12 @@
#include <set>
#include <vector>
#include "webrtc/base/arraysize.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/criticalsection.h"
#include "webrtc/base/event.h"
#include "webrtc/base/gunit.h"
#include "webrtc/base/platform_thread.h"
#include "webrtc/base/scoped_ptr.h"
#include "webrtc/base/scopedptrcollection.h"
#include "webrtc/base/thread.h"
@ -320,4 +323,109 @@ TEST(CriticalSectionTest, IsLocked) {
}
#endif
class PerfTestData {
public:
PerfTestData(int expected_count, Event* event)
: cache_line_barrier_1_(), cache_line_barrier_2_(),
expected_count_(expected_count), event_(event) {
cache_line_barrier_1_[0]++; // Avoid 'is not used'.
cache_line_barrier_2_[0]++; // Avoid 'is not used'.
}
~PerfTestData() {}
void AddToCounter(int add) {
rtc::CritScope cs(&lock_);
my_counter_ += add;
if (my_counter_ == expected_count_)
event_->Set();
}
int64_t total() const {
// Assume that only one thread is running now.
return my_counter_;
}
private:
uint8_t cache_line_barrier_1_[64];
CriticalSection lock_;
uint8_t cache_line_barrier_2_[64];
int64_t my_counter_ = 0;
const int expected_count_;
Event* const event_;
};
class PerfTestThread {
public:
PerfTestThread() : thread_(&ThreadFunc, this, "CsPerf") {}
void Start(PerfTestData* data, int repeats, int id) {
RTC_DCHECK(!thread_.IsRunning());
RTC_DCHECK(!data_);
data_ = data;
repeats_ = repeats;
my_id_ = id;
thread_.Start();
}
void Stop() {
RTC_DCHECK(thread_.IsRunning());
RTC_DCHECK(data_);
thread_.Stop();
repeats_ = 0;
data_ = nullptr;
my_id_ = 0;
}
private:
static bool ThreadFunc(void* param) {
PerfTestThread* me = static_cast<PerfTestThread*>(param);
for (int i = 0; i < me->repeats_; ++i)
me->data_->AddToCounter(me->my_id_);
return false;
}
PlatformThread thread_;
PerfTestData* data_ = nullptr;
int repeats_ = 0;
int my_id_ = 0;
};
// Comparison of output of this test as tested on a MacBook Pro Retina, 15-inch,
// Mid 2014, 2,8 GHz Intel Core i7, 16 GB 1600 MHz DDR3,
// running OS X El Capitan, 10.11.2.
//
// Native mutex implementation:
// Approximate CPU usage:
// System: ~16%
// User mode: ~1.3%
// Idle: ~82%
// Unit test output:
// [ OK ] CriticalSectionTest.Performance (234545 ms)
//
// Special partially spin lock based implementation:
// Approximate CPU usage:
// System: ~75%
// User mode: ~16%
// Idle: ~8%
// Unit test output:
// [ OK ] CriticalSectionTest.Performance (2107 ms)
//
// The test is disabled by default to avoid unecessarily loading the bots.
TEST(CriticalSectionTest, DISABLED_Performance) {
PerfTestThread threads[8];
Event event(false, false);
static const int kThreadRepeats = 10000000;
static const int kExpectedCount = kThreadRepeats * arraysize(threads);
PerfTestData test_data(kExpectedCount, &event);
for (auto& t : threads)
t.Start(&test_data, kThreadRepeats, 1);
event.Wait(Event::kForever);
for (auto& t : threads)
t.Stop();
}
} // namespace rtc