Reland "Refactor the PlatformThread API."

This reverts commit 793bac569fdf1be16cbf24d7871d20d00bbec81b.

Reason for revert: rare compilation error fixed

Original change's description:
> Revert "Refactor the PlatformThread API."
>
> This reverts commit c89fdd716c4c8af608017c76f75bf27e4c3d602e.
>
> Reason for revert: Causes rare compilation error on win-libfuzzer-asan trybot.
> See https://ci.chromium.org/p/chromium/builders/try/win-libfuzzer-asan-rel/713745?
>
> Original change's description:
> > Refactor the PlatformThread API.
> >
> > PlatformThread's API is using old style function pointers, causes
> > casting, is unintuitive and forces artificial call sequences, and
> > is additionally possible to misuse in release mode.
> >
> > Fix this by an API face lift:
> > 1. The class is turned into a handle, which can be empty.
> > 2. The only way of getting a non-empty PlatformThread is by calling
> > SpawnJoinable or SpawnDetached, clearly conveying the semantics to the
> > code reader.
> > 3. Handles can be Finalized, which works differently for joinable and
> > detached threads:
> >   a) Handles for detached threads are simply closed where applicable.
> >   b) Joinable threads are joined before handles are closed.
> > 4. The destructor finalizes handles. No explicit call is needed.
> >
> > Fixed: webrtc:12727
> > Change-Id: Id00a0464edf4fc9e552b6a1fbb5d2e1280e88811
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215075
> > Commit-Queue: Markus Handell <handellm@webrtc.org>
> > Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> > Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> > Reviewed-by: Tommi <tommi@webrtc.org>
> > Cr-Commit-Position: refs/heads/master@{#33923}
>
> # Not skipping CQ checks because original CL landed > 1 day ago.
>
> TBR=handellm@webrtc.org
>
> Bug: webrtc:12727
> Change-Id: Ic0146be8866f6dd3ad9c364fb8646650b8e07419
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/217583
> Reviewed-by: Guido Urdaneta <guidou@webrtc.org>
> Reviewed-by: Markus Handell <handellm@webrtc.org>
> Commit-Queue: Guido Urdaneta <guidou@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33936}

# Not skipping CQ checks because this is a reland.

Bug: webrtc:12727
Change-Id: Ifd6f44eac72fed84474277a1be03eb84d2f4376e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/217881
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33950}
This commit is contained in:
Markus Handell 2021-05-07 15:02:36 +02:00 committed by WebRTC LUCI CQ
parent f95536dd5a
commit ad5037b4a8
38 changed files with 588 additions and 865 deletions

View File

@ -40,21 +40,14 @@ class CompileTimeTestForGuardedBy {
}; };
void RunOnDifferentThread(rtc::FunctionView<void()> run) { void RunOnDifferentThread(rtc::FunctionView<void()> run) {
struct Object { rtc::Event thread_has_run_event;
static void Run(void* obj) { rtc::PlatformThread::SpawnJoinable(
auto* me = static_cast<Object*>(obj); [&] {
me->run(); run();
me->thread_has_run_event.Set(); thread_has_run_event.Set();
} },
"thread");
rtc::FunctionView<void()> run; EXPECT_TRUE(thread_has_run_event.Wait(1000));
rtc::Event thread_has_run_event;
} object{run};
rtc::PlatformThread thread(&Object::Run, &object, "thread");
thread.Start();
EXPECT_TRUE(object.thread_has_run_event.Wait(1000));
thread.Stop();
} }
} // namespace } // namespace

View File

@ -429,21 +429,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
AudioCodingModuleMtTestOldApi() AudioCodingModuleMtTestOldApi()
: AudioCodingModuleTestOldApi(), : AudioCodingModuleTestOldApi(),
send_thread_(
CbSendThread,
this,
"send",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
insert_packet_thread_(
CbInsertPacketThread,
this,
"insert_packet",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
pull_audio_thread_(
CbPullAudioThread,
this,
"pull_audio",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
send_count_(0), send_count_(0),
insert_packet_count_(0), insert_packet_count_(0),
pull_audio_count_(0), pull_audio_count_(0),
@ -460,17 +445,38 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
void StartThreads() { void StartThreads() {
quit_.store(false); quit_.store(false);
send_thread_.Start();
insert_packet_thread_.Start(); const auto attributes =
pull_audio_thread_.Start(); rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
send_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbSendImpl();
}
},
"send", attributes);
insert_packet_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbInsertPacketImpl();
}
},
"insert_packet", attributes);
pull_audio_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbPullAudioImpl();
}
},
"pull_audio", attributes);
} }
void TearDown() { void TearDown() {
AudioCodingModuleTestOldApi::TearDown(); AudioCodingModuleTestOldApi::TearDown();
quit_.store(true); quit_.store(true);
pull_audio_thread_.Stop(); pull_audio_thread_.Finalize();
send_thread_.Stop(); send_thread_.Finalize();
insert_packet_thread_.Stop(); insert_packet_thread_.Finalize();
} }
bool RunTest() { bool RunTest() {
@ -488,14 +494,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
return false; return false;
} }
static void CbSendThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbSendImpl();
}
}
// The send thread doesn't have to care about the current simulated time, // The send thread doesn't have to care about the current simulated time,
// since only the AcmReceiver is using the clock. // since only the AcmReceiver is using the clock.
void CbSendImpl() { void CbSendImpl() {
@ -511,14 +509,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
} }
} }
static void CbInsertPacketThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbInsertPacketImpl();
}
}
void CbInsertPacketImpl() { void CbInsertPacketImpl() {
SleepMs(1); SleepMs(1);
{ {
@ -533,14 +523,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
InsertPacket(); InsertPacket();
} }
static void CbPullAudioThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbPullAudioImpl();
}
}
void CbPullAudioImpl() { void CbPullAudioImpl() {
SleepMs(1); SleepMs(1);
{ {
@ -699,16 +681,6 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
AcmReRegisterIsacMtTestOldApi() AcmReRegisterIsacMtTestOldApi()
: AudioCodingModuleTestOldApi(), : AudioCodingModuleTestOldApi(),
receive_thread_(
CbReceiveThread,
this,
"receive",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registration_thread_(
CbCodecRegistrationThread,
this,
"codec_registration",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registered_(false), codec_registered_(false),
receive_packet_count_(0), receive_packet_count_(0),
next_insert_packet_time_ms_(0), next_insert_packet_time_ms_(0),
@ -740,28 +712,34 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
void StartThreads() { void StartThreads() {
quit_.store(false); quit_.store(false);
receive_thread_.Start(); const auto attributes =
codec_registration_thread_.Start(); rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
receive_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load() && CbReceiveImpl()) {
}
},
"receive", attributes);
codec_registration_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbCodecRegistrationImpl();
}
},
"codec_registration", attributes);
} }
void TearDown() override { void TearDown() override {
AudioCodingModuleTestOldApi::TearDown(); AudioCodingModuleTestOldApi::TearDown();
quit_.store(true); quit_.store(true);
receive_thread_.Stop(); receive_thread_.Finalize();
codec_registration_thread_.Stop(); codec_registration_thread_.Finalize();
} }
bool RunTest() { bool RunTest() {
return test_complete_.Wait(10 * 60 * 1000); // 10 minutes' timeout. return test_complete_.Wait(10 * 60 * 1000); // 10 minutes' timeout.
} }
static void CbReceiveThread(void* context) {
AcmReRegisterIsacMtTestOldApi* fixture =
reinterpret_cast<AcmReRegisterIsacMtTestOldApi*>(context);
while (!fixture->quit_.load() && fixture->CbReceiveImpl()) {
}
}
bool CbReceiveImpl() { bool CbReceiveImpl() {
SleepMs(1); SleepMs(1);
rtc::Buffer encoded; rtc::Buffer encoded;
@ -807,14 +785,6 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
return true; return true;
} }
static void CbCodecRegistrationThread(void* context) {
AcmReRegisterIsacMtTestOldApi* fixture =
reinterpret_cast<AcmReRegisterIsacMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbCodecRegistrationImpl();
}
}
void CbCodecRegistrationImpl() { void CbCodecRegistrationImpl() {
SleepMs(1); SleepMs(1);
if (HasFatalFailure()) { if (HasFatalFailure()) {

View File

@ -216,10 +216,13 @@ int32_t FileAudioDevice::StartPlayout() {
} }
} }
_ptrThreadPlay.reset(new rtc::PlatformThread( _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable(
PlayThreadFunc, this, "webrtc_audio_module_play_thread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); while (PlayThreadProcess()) {
_ptrThreadPlay->Start(); }
},
"webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
RTC_LOG(LS_INFO) << "Started playout capture to output file: " RTC_LOG(LS_INFO) << "Started playout capture to output file: "
<< _outputFilename; << _outputFilename;
@ -233,10 +236,8 @@ int32_t FileAudioDevice::StopPlayout() {
} }
// stop playout thread first // stop playout thread first
if (_ptrThreadPlay) { if (!_ptrThreadPlay.empty())
_ptrThreadPlay->Stop(); _ptrThreadPlay.Finalize();
_ptrThreadPlay.reset();
}
MutexLock lock(&mutex_); MutexLock lock(&mutex_);
@ -276,11 +277,13 @@ int32_t FileAudioDevice::StartRecording() {
} }
} }
_ptrThreadRec.reset(new rtc::PlatformThread( _ptrThreadRec = rtc::PlatformThread::SpawnJoinable(
RecThreadFunc, this, "webrtc_audio_module_capture_thread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); while (RecThreadProcess()) {
}
_ptrThreadRec->Start(); },
"webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
RTC_LOG(LS_INFO) << "Started recording from input file: " << _inputFilename; RTC_LOG(LS_INFO) << "Started recording from input file: " << _inputFilename;
@ -293,10 +296,8 @@ int32_t FileAudioDevice::StopRecording() {
_recording = false; _recording = false;
} }
if (_ptrThreadRec) { if (!_ptrThreadRec.empty())
_ptrThreadRec->Stop(); _ptrThreadRec.Finalize();
_ptrThreadRec.reset();
}
MutexLock lock(&mutex_); MutexLock lock(&mutex_);
_recordingFramesLeft = 0; _recordingFramesLeft = 0;
@ -439,18 +440,6 @@ void FileAudioDevice::AttachAudioBuffer(AudioDeviceBuffer* audioBuffer) {
_ptrAudioBuffer->SetPlayoutChannels(0); _ptrAudioBuffer->SetPlayoutChannels(0);
} }
void FileAudioDevice::PlayThreadFunc(void* pThis) {
FileAudioDevice* device = static_cast<FileAudioDevice*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void FileAudioDevice::RecThreadFunc(void* pThis) {
FileAudioDevice* device = static_cast<FileAudioDevice*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool FileAudioDevice::PlayThreadProcess() { bool FileAudioDevice::PlayThreadProcess() {
if (!_playing) { if (!_playing) {
return false; return false;

View File

@ -17,14 +17,11 @@
#include <string> #include <string>
#include "modules/audio_device/audio_device_generic.h" #include "modules/audio_device/audio_device_generic.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/file_wrapper.h" #include "rtc_base/system/file_wrapper.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
namespace rtc {
class PlatformThread;
} // namespace rtc
namespace webrtc { namespace webrtc {
// This is a fake audio device which plays audio from a file as its microphone // This is a fake audio device which plays audio from a file as its microphone
@ -145,9 +142,8 @@ class FileAudioDevice : public AudioDeviceGeneric {
size_t _recordingFramesIn10MS; size_t _recordingFramesIn10MS;
size_t _playoutFramesIn10MS; size_t _playoutFramesIn10MS;
// TODO(pbos): Make plain members instead of pointers and stop resetting them. rtc::PlatformThread _ptrThreadRec;
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec; rtc::PlatformThread _ptrThreadPlay;
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
bool _playing; bool _playing;
bool _recording; bool _recording;

View File

@ -178,26 +178,13 @@ int32_t AudioDeviceLinuxALSA::Terminate() {
_mixerManager.Close(); _mixerManager.Close();
// RECORDING // RECORDING
if (_ptrThreadRec) { mutex_.Unlock();
rtc::PlatformThread* tmpThread = _ptrThreadRec.release(); _ptrThreadRec.Finalize();
mutex_.Unlock();
tmpThread->Stop();
delete tmpThread;
mutex_.Lock();
}
// PLAYOUT // PLAYOUT
if (_ptrThreadPlay) { _ptrThreadPlay.Finalize();
rtc::PlatformThread* tmpThread = _ptrThreadPlay.release(); mutex_.Lock();
mutex_.Unlock();
tmpThread->Stop();
delete tmpThread;
mutex_.Lock();
}
#if defined(WEBRTC_USE_X11) #if defined(WEBRTC_USE_X11)
if (_XDisplay) { if (_XDisplay) {
XCloseDisplay(_XDisplay); XCloseDisplay(_XDisplay);
@ -1040,11 +1027,13 @@ int32_t AudioDeviceLinuxALSA::StartRecording() {
return -1; return -1;
} }
// RECORDING // RECORDING
_ptrThreadRec.reset(new rtc::PlatformThread( _ptrThreadRec = rtc::PlatformThread::SpawnJoinable(
RecThreadFunc, this, "webrtc_audio_module_capture_thread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); while (RecThreadProcess()) {
}
_ptrThreadRec->Start(); },
"webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
errVal = LATE(snd_pcm_prepare)(_handleRecord); errVal = LATE(snd_pcm_prepare)(_handleRecord);
if (errVal < 0) { if (errVal < 0) {
@ -1088,10 +1077,7 @@ int32_t AudioDeviceLinuxALSA::StopRecordingLocked() {
_recIsInitialized = false; _recIsInitialized = false;
_recording = false; _recording = false;
if (_ptrThreadRec) { _ptrThreadRec.Finalize();
_ptrThreadRec->Stop();
_ptrThreadRec.reset();
}
_recordingFramesLeft = 0; _recordingFramesLeft = 0;
if (_recordingBuffer) { if (_recordingBuffer) {
@ -1158,10 +1144,13 @@ int32_t AudioDeviceLinuxALSA::StartPlayout() {
} }
// PLAYOUT // PLAYOUT
_ptrThreadPlay.reset(new rtc::PlatformThread( _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable(
PlayThreadFunc, this, "webrtc_audio_module_play_thread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); while (PlayThreadProcess()) {
_ptrThreadPlay->Start(); }
},
"webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
int errVal = LATE(snd_pcm_prepare)(_handlePlayout); int errVal = LATE(snd_pcm_prepare)(_handlePlayout);
if (errVal < 0) { if (errVal < 0) {
@ -1191,10 +1180,7 @@ int32_t AudioDeviceLinuxALSA::StopPlayoutLocked() {
_playing = false; _playing = false;
// stop playout thread first // stop playout thread first
if (_ptrThreadPlay) { _ptrThreadPlay.Finalize();
_ptrThreadPlay->Stop();
_ptrThreadPlay.reset();
}
_playoutFramesLeft = 0; _playoutFramesLeft = 0;
delete[] _playoutBuffer; delete[] _playoutBuffer;
@ -1469,18 +1455,6 @@ int32_t AudioDeviceLinuxALSA::ErrorRecovery(int32_t error,
// Thread Methods // Thread Methods
// ============================================================================ // ============================================================================
void AudioDeviceLinuxALSA::PlayThreadFunc(void* pThis) {
AudioDeviceLinuxALSA* device = static_cast<AudioDeviceLinuxALSA*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void AudioDeviceLinuxALSA::RecThreadFunc(void* pThis) {
AudioDeviceLinuxALSA* device = static_cast<AudioDeviceLinuxALSA*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool AudioDeviceLinuxALSA::PlayThreadProcess() { bool AudioDeviceLinuxALSA::PlayThreadProcess() {
if (!_playing) if (!_playing)
return false; return false;

View File

@ -155,10 +155,8 @@ class AudioDeviceLinuxALSA : public AudioDeviceGeneric {
Mutex mutex_; Mutex mutex_;
// TODO(pbos): Make plain members and start/stop instead of resetting these rtc::PlatformThread _ptrThreadRec;
// pointers. A thread can be reused. rtc::PlatformThread _ptrThreadPlay;
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
AudioMixerManagerLinuxALSA _mixerManager; AudioMixerManagerLinuxALSA _mixerManager;

View File

@ -15,6 +15,7 @@
#include "modules/audio_device/linux/latebindingsymboltable_linux.h" #include "modules/audio_device/linux/latebindingsymboltable_linux.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
WebRTCPulseSymbolTable* GetPulseSymbolTable() { WebRTCPulseSymbolTable* GetPulseSymbolTable() {
static WebRTCPulseSymbolTable* pulse_symbol_table = static WebRTCPulseSymbolTable* pulse_symbol_table =
@ -158,18 +159,22 @@ AudioDeviceGeneric::InitStatus AudioDeviceLinuxPulse::Init() {
#endif #endif
// RECORDING // RECORDING
_ptrThreadRec.reset(new rtc::PlatformThread( const auto attributes =
RecThreadFunc, this, "webrtc_audio_module_rec_thread", rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); _ptrThreadRec = rtc::PlatformThread::SpawnJoinable(
[this] {
_ptrThreadRec->Start(); while (RecThreadProcess()) {
}
},
"webrtc_audio_module_rec_thread", attributes);
// PLAYOUT // PLAYOUT
_ptrThreadPlay.reset(new rtc::PlatformThread( _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable(
PlayThreadFunc, this, "webrtc_audio_module_play_thread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); while (PlayThreadProcess()) {
_ptrThreadPlay->Start(); }
},
"webrtc_audio_module_play_thread", attributes);
_initialized = true; _initialized = true;
return InitStatus::OK; return InitStatus::OK;
@ -187,22 +192,12 @@ int32_t AudioDeviceLinuxPulse::Terminate() {
_mixerManager.Close(); _mixerManager.Close();
// RECORDING // RECORDING
if (_ptrThreadRec) { _timeEventRec.Set();
rtc::PlatformThread* tmpThread = _ptrThreadRec.release(); _ptrThreadRec.Finalize();
_timeEventRec.Set();
tmpThread->Stop();
delete tmpThread;
}
// PLAYOUT // PLAYOUT
if (_ptrThreadPlay) { _timeEventPlay.Set();
rtc::PlatformThread* tmpThread = _ptrThreadPlay.release(); _ptrThreadPlay.Finalize();
_timeEventPlay.Set();
tmpThread->Stop();
delete tmpThread;
}
// Terminate PulseAudio // Terminate PulseAudio
if (TerminatePulseAudio() < 0) { if (TerminatePulseAudio() < 0) {
@ -1981,18 +1976,6 @@ int32_t AudioDeviceLinuxPulse::ProcessRecordedData(int8_t* bufferData,
return 0; return 0;
} }
void AudioDeviceLinuxPulse::PlayThreadFunc(void* pThis) {
AudioDeviceLinuxPulse* device = static_cast<AudioDeviceLinuxPulse*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void AudioDeviceLinuxPulse::RecThreadFunc(void* pThis) {
AudioDeviceLinuxPulse* device = static_cast<AudioDeviceLinuxPulse*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool AudioDeviceLinuxPulse::PlayThreadProcess() { bool AudioDeviceLinuxPulse::PlayThreadProcess() {
if (!_timeEventPlay.Wait(1000)) { if (!_timeEventPlay.Wait(1000)) {
return true; return true;

View File

@ -268,9 +268,8 @@ class AudioDeviceLinuxPulse : public AudioDeviceGeneric {
rtc::Event _recStartEvent; rtc::Event _recStartEvent;
rtc::Event _playStartEvent; rtc::Event _playStartEvent;
// TODO(pbos): Remove unique_ptr and use directly without resetting. rtc::PlatformThread _ptrThreadPlay;
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay; rtc::PlatformThread _ptrThreadRec;
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
AudioMixerManagerLinuxPulse _mixerManager; AudioMixerManagerLinuxPulse _mixerManager;

View File

@ -166,8 +166,8 @@ AudioDeviceMac::~AudioDeviceMac() {
Terminate(); Terminate();
} }
RTC_DCHECK(!capture_worker_thread_.get()); RTC_DCHECK(capture_worker_thread_.empty());
RTC_DCHECK(!render_worker_thread_.get()); RTC_DCHECK(render_worker_thread_.empty());
if (_paRenderBuffer) { if (_paRenderBuffer) {
delete _paRenderBuffer; delete _paRenderBuffer;
@ -1308,12 +1308,14 @@ int32_t AudioDeviceMac::StartRecording() {
return -1; return -1;
} }
RTC_DCHECK(!capture_worker_thread_.get()); RTC_DCHECK(capture_worker_thread_.empty());
capture_worker_thread_.reset(new rtc::PlatformThread( capture_worker_thread_ = rtc::PlatformThread::SpawnJoinable(
RunCapture, this, "CaptureWorkerThread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); while (CaptureWorkerThread()) {
RTC_DCHECK(capture_worker_thread_.get()); }
capture_worker_thread_->Start(); },
"CaptureWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
OSStatus err = noErr; OSStatus err = noErr;
if (_twoDevices) { if (_twoDevices) {
@ -1395,10 +1397,9 @@ int32_t AudioDeviceMac::StopRecording() {
// Setting this signal will allow the worker thread to be stopped. // Setting this signal will allow the worker thread to be stopped.
AtomicSet32(&_captureDeviceIsAlive, 0); AtomicSet32(&_captureDeviceIsAlive, 0);
if (capture_worker_thread_.get()) { if (!capture_worker_thread_.empty()) {
mutex_.Unlock(); mutex_.Unlock();
capture_worker_thread_->Stop(); capture_worker_thread_.Finalize();
capture_worker_thread_.reset();
mutex_.Lock(); mutex_.Lock();
} }
@ -1444,11 +1445,14 @@ int32_t AudioDeviceMac::StartPlayout() {
return 0; return 0;
} }
RTC_DCHECK(!render_worker_thread_.get()); RTC_DCHECK(render_worker_thread_.empty());
render_worker_thread_.reset(new rtc::PlatformThread( render_worker_thread_ = rtc::PlatformThread::SpawnJoinable(
RunRender, this, "RenderWorkerThread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); while (RenderWorkerThread()) {
render_worker_thread_->Start(); }
},
"RenderWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
if (_twoDevices || !_recording) { if (_twoDevices || !_recording) {
OSStatus err = noErr; OSStatus err = noErr;
@ -1506,10 +1510,9 @@ int32_t AudioDeviceMac::StopPlayout() {
// Setting this signal will allow the worker thread to be stopped. // Setting this signal will allow the worker thread to be stopped.
AtomicSet32(&_renderDeviceIsAlive, 0); AtomicSet32(&_renderDeviceIsAlive, 0);
if (render_worker_thread_.get()) { if (!render_worker_thread_.empty()) {
mutex_.Unlock(); mutex_.Unlock();
render_worker_thread_->Stop(); render_worker_thread_.Finalize();
render_worker_thread_.reset();
mutex_.Lock(); mutex_.Lock();
} }
@ -2371,12 +2374,6 @@ OSStatus AudioDeviceMac::implInConverterProc(UInt32* numberDataPackets,
return 0; return 0;
} }
void AudioDeviceMac::RunRender(void* ptrThis) {
AudioDeviceMac* device = static_cast<AudioDeviceMac*>(ptrThis);
while (device->RenderWorkerThread()) {
}
}
bool AudioDeviceMac::RenderWorkerThread() { bool AudioDeviceMac::RenderWorkerThread() {
PaRingBufferSize numSamples = PaRingBufferSize numSamples =
ENGINE_PLAY_BUF_SIZE_IN_SAMPLES * _outDesiredFormat.mChannelsPerFrame; ENGINE_PLAY_BUF_SIZE_IN_SAMPLES * _outDesiredFormat.mChannelsPerFrame;
@ -2442,12 +2439,6 @@ bool AudioDeviceMac::RenderWorkerThread() {
return true; return true;
} }
void AudioDeviceMac::RunCapture(void* ptrThis) {
AudioDeviceMac* device = static_cast<AudioDeviceMac*>(ptrThis);
while (device->CaptureWorkerThread()) {
}
}
bool AudioDeviceMac::CaptureWorkerThread() { bool AudioDeviceMac::CaptureWorkerThread() {
OSStatus err = noErr; OSStatus err = noErr;
UInt32 noRecSamples = UInt32 noRecSamples =

View File

@ -21,15 +21,12 @@
#include "modules/audio_device/mac/audio_mixer_manager_mac.h" #include "modules/audio_device/mac/audio_mixer_manager_mac.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
struct PaUtilRingBuffer; struct PaUtilRingBuffer;
namespace rtc {
class PlatformThread;
} // namespace rtc
namespace webrtc { namespace webrtc {
const uint32_t N_REC_SAMPLES_PER_SEC = 48000; const uint32_t N_REC_SAMPLES_PER_SEC = 48000;
@ -271,13 +268,11 @@ class AudioDeviceMac : public AudioDeviceGeneric {
rtc::Event _stopEventRec; rtc::Event _stopEventRec;
rtc::Event _stopEvent; rtc::Event _stopEvent;
// TODO(pbos): Replace with direct members, just start/stop, no need to
// recreate the thread.
// Only valid/running between calls to StartRecording and StopRecording. // Only valid/running between calls to StartRecording and StopRecording.
std::unique_ptr<rtc::PlatformThread> capture_worker_thread_; rtc::PlatformThread capture_worker_thread_;
// Only valid/running between calls to StartPlayout and StopPlayout. // Only valid/running between calls to StartPlayout and StopPlayout.
std::unique_ptr<rtc::PlatformThread> render_worker_thread_; rtc::PlatformThread render_worker_thread_;
AudioMixerManagerMac _mixerManager; AudioMixerManagerMac _mixerManager;

View File

@ -119,11 +119,6 @@ const char* SessionDisconnectReasonToString(
} }
} }
void Run(void* obj) {
RTC_DCHECK(obj);
reinterpret_cast<CoreAudioBase*>(obj)->ThreadRun();
}
// Returns true if the selected audio device supports low latency, i.e, if it // Returns true if the selected audio device supports low latency, i.e, if it
// is possible to initialize the engine using periods less than the default // is possible to initialize the engine using periods less than the default
// period (10ms). // period (10ms).
@ -553,24 +548,19 @@ bool CoreAudioBase::Start() {
// Audio thread should be alive during internal restart since the restart // Audio thread should be alive during internal restart since the restart
// callback is triggered on that thread and it also makes the restart // callback is triggered on that thread and it also makes the restart
// sequence less complex. // sequence less complex.
RTC_DCHECK(audio_thread_); RTC_DCHECK(!audio_thread_.empty());
} }
// Start an audio thread but only if one does not already exist (which is the // Start an audio thread but only if one does not already exist (which is the
// case during restart). // case during restart).
if (!audio_thread_) { if (audio_thread_.empty()) {
audio_thread_ = std::make_unique<rtc::PlatformThread>( const absl::string_view name =
Run, this, IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread", IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread";
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)); audio_thread_ = rtc::PlatformThread::SpawnJoinable(
RTC_DCHECK(audio_thread_); [this] { ThreadRun(); }, name,
audio_thread_->Start(); rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
if (!audio_thread_->IsRunning()) { RTC_DLOG(INFO) << "Started thread with name: " << name
StopThread(); << " and handle: " << *audio_thread_.GetHandle();
RTC_LOG(LS_ERROR) << "Failed to start audio thread";
return false;
}
RTC_DLOG(INFO) << "Started thread with name: " << audio_thread_->name()
<< " and id: " << audio_thread_->GetThreadRef();
} }
// Start streaming data between the endpoint buffer and the audio engine. // Start streaming data between the endpoint buffer and the audio engine.
@ -697,14 +687,11 @@ bool CoreAudioBase::Restart() {
void CoreAudioBase::StopThread() { void CoreAudioBase::StopThread() {
RTC_DLOG(INFO) << __FUNCTION__; RTC_DLOG(INFO) << __FUNCTION__;
RTC_DCHECK(!IsRestarting()); RTC_DCHECK(!IsRestarting());
if (audio_thread_) { if (!audio_thread_.empty()) {
if (audio_thread_->IsRunning()) { RTC_DLOG(INFO) << "Sets stop_event...";
RTC_DLOG(INFO) << "Sets stop_event..."; SetEvent(stop_event_.Get());
SetEvent(stop_event_.Get()); RTC_DLOG(INFO) << "PlatformThread::Finalize...";
RTC_DLOG(INFO) << "PlatformThread::Stop..."; audio_thread_.Finalize();
audio_thread_->Stop();
}
audio_thread_.reset();
// Ensure that we don't quit the main thread loop immediately next // Ensure that we don't quit the main thread loop immediately next
// time Start() is called. // time Start() is called.
@ -717,7 +704,7 @@ bool CoreAudioBase::HandleRestartEvent() {
RTC_DLOG(INFO) << __FUNCTION__ << "[" << DirectionToString(direction()) RTC_DLOG(INFO) << __FUNCTION__ << "[" << DirectionToString(direction())
<< "]"; << "]";
RTC_DCHECK_RUN_ON(&thread_checker_audio_); RTC_DCHECK_RUN_ON(&thread_checker_audio_);
RTC_DCHECK(audio_thread_); RTC_DCHECK(!audio_thread_.empty());
RTC_DCHECK(IsRestarting()); RTC_DCHECK(IsRestarting());
// Let each client (input and/or output) take care of its own restart // Let each client (input and/or output) take care of its own restart
// sequence since each side might need unique actions. // sequence since each side might need unique actions.

View File

@ -158,7 +158,7 @@ class CoreAudioBase : public IAudioSessionEvents {
// Set when restart process starts and cleared when restart stops // Set when restart process starts and cleared when restart stops
// successfully. Accessed atomically. // successfully. Accessed atomically.
std::atomic<bool> is_restarting_; std::atomic<bool> is_restarting_;
std::unique_ptr<rtc::PlatformThread> audio_thread_; rtc::PlatformThread audio_thread_;
Microsoft::WRL::ComPtr<IAudioSessionControl> audio_session_control_; Microsoft::WRL::ComPtr<IAudioSessionControl> audio_session_control_;
void StopThread(); void StopThread();

View File

@ -387,33 +387,6 @@ class AudioProcessingImplLockTest
void SetUp() override; void SetUp() override;
void TearDown() override; void TearDown() override;
// Thread callback for the render thread
static void RenderProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->render_thread_state_.Process();
}
}
// Thread callback for the capture thread
static void CaptureProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->capture_thread_state_.Process();
}
}
// Thread callback for the stats thread
static void StatsProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->stats_thread_state_.Process();
}
}
// Tests whether all the required render and capture side calls have been // Tests whether all the required render and capture side calls have been
// done. // done.
bool TestDone() { bool TestDone() {
@ -423,9 +396,28 @@ class AudioProcessingImplLockTest
// Start the threads used in the test. // Start the threads used in the test.
void StartThreads() { void StartThreads() {
render_thread_.Start(); const auto attributes =
capture_thread_.Start(); rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
stats_thread_.Start(); render_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest())
render_thread_state_.Process();
},
"render", attributes);
capture_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest()) {
capture_thread_state_.Process();
}
},
"capture", attributes);
stats_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest())
stats_thread_state_.Process();
},
"stats", attributes);
} }
// Event handlers for the test. // Event handlers for the test.
@ -434,9 +426,6 @@ class AudioProcessingImplLockTest
rtc::Event capture_call_event_; rtc::Event capture_call_event_;
// Thread related variables. // Thread related variables.
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
rtc::PlatformThread stats_thread_;
mutable RandomGenerator rand_gen_; mutable RandomGenerator rand_gen_;
std::unique_ptr<AudioProcessing> apm_; std::unique_ptr<AudioProcessing> apm_;
@ -445,6 +434,9 @@ class AudioProcessingImplLockTest
RenderProcessor render_thread_state_; RenderProcessor render_thread_state_;
CaptureProcessor capture_thread_state_; CaptureProcessor capture_thread_state_;
StatsProcessor stats_thread_state_; StatsProcessor stats_thread_state_;
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
rtc::PlatformThread stats_thread_;
}; };
// Sleeps a random time between 0 and max_sleep milliseconds. // Sleeps a random time between 0 and max_sleep milliseconds.
@ -485,22 +477,7 @@ void PopulateAudioFrame(float amplitude,
} }
AudioProcessingImplLockTest::AudioProcessingImplLockTest() AudioProcessingImplLockTest::AudioProcessingImplLockTest()
: render_thread_( : apm_(AudioProcessingBuilderForTesting().Create()),
RenderProcessorThreadFunc,
this,
"render",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
capture_thread_(
CaptureProcessorThreadFunc,
this,
"capture",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
stats_thread_(
StatsProcessorThreadFunc,
this,
"stats",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
apm_(AudioProcessingBuilderForTesting().Create()),
render_thread_state_(kMaxFrameSize, render_thread_state_(kMaxFrameSize,
&rand_gen_, &rand_gen_,
&render_call_event_, &render_call_event_,
@ -552,9 +529,6 @@ void AudioProcessingImplLockTest::SetUp() {
void AudioProcessingImplLockTest::TearDown() { void AudioProcessingImplLockTest::TearDown() {
render_call_event_.Set(); render_call_event_.Set();
capture_call_event_.Set(); capture_call_event_.Set();
render_thread_.Stop();
capture_thread_.Stop();
stats_thread_.Stop();
} }
StatsProcessor::StatsProcessor(RandomGenerator* rand_gen, StatsProcessor::StatsProcessor(RandomGenerator* rand_gen,

View File

@ -391,17 +391,7 @@ class TimedThreadApiProcessor {
class CallSimulator : public ::testing::TestWithParam<SimulationConfig> { class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
public: public:
CallSimulator() CallSimulator()
: render_thread_(new rtc::PlatformThread( : rand_gen_(42U),
RenderProcessorThreadFunc,
this,
"render",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
capture_thread_(new rtc::PlatformThread(
CaptureProcessorThreadFunc,
this,
"capture",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
rand_gen_(42U),
simulation_config_(static_cast<SimulationConfig>(GetParam())) {} simulation_config_(static_cast<SimulationConfig>(GetParam())) {}
// Run the call simulation with a timeout. // Run the call simulation with a timeout.
@ -436,13 +426,10 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
static const int kMinNumFramesToProcess = 150; static const int kMinNumFramesToProcess = 150;
static const int32_t kTestTimeout = 3 * 10 * kMinNumFramesToProcess; static const int32_t kTestTimeout = 3 * 10 * kMinNumFramesToProcess;
// ::testing::TestWithParam<> implementation.
void TearDown() override { StopThreads(); }
// Stop all running threads. // Stop all running threads.
void StopThreads() { void StopThreads() {
render_thread_->Stop(); render_thread_.Finalize();
capture_thread_->Stop(); capture_thread_.Finalize();
} }
// Simulator and APM setup. // Simulator and APM setup.
@ -533,32 +520,28 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
kMinNumFramesToProcess, kCaptureInputFloatLevel, num_capture_channels)); kMinNumFramesToProcess, kCaptureInputFloatLevel, num_capture_channels));
} }
// Thread callback for the render thread.
static void RenderProcessorThreadFunc(void* context) {
CallSimulator* call_simulator = reinterpret_cast<CallSimulator*>(context);
while (call_simulator->render_thread_state_->Process()) {
}
}
// Thread callback for the capture thread.
static void CaptureProcessorThreadFunc(void* context) {
CallSimulator* call_simulator = reinterpret_cast<CallSimulator*>(context);
while (call_simulator->capture_thread_state_->Process()) {
}
}
// Start the threads used in the test. // Start the threads used in the test.
void StartThreads() { void StartThreads() {
ASSERT_NO_FATAL_FAILURE(render_thread_->Start()); const auto attributes =
ASSERT_NO_FATAL_FAILURE(capture_thread_->Start()); rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
render_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (render_thread_state_->Process()) {
}
},
"render", attributes);
capture_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (capture_thread_state_->Process()) {
}
},
"capture", attributes);
} }
// Event handler for the test. // Event handler for the test.
rtc::Event test_complete_; rtc::Event test_complete_;
// Thread related variables. // Thread related variables.
std::unique_ptr<rtc::PlatformThread> render_thread_;
std::unique_ptr<rtc::PlatformThread> capture_thread_;
Random rand_gen_; Random rand_gen_;
std::unique_ptr<AudioProcessing> apm_; std::unique_ptr<AudioProcessing> apm_;
@ -567,6 +550,8 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
LockedFlag capture_call_checker_; LockedFlag capture_call_checker_;
std::unique_ptr<TimedThreadApiProcessor> render_thread_state_; std::unique_ptr<TimedThreadApiProcessor> render_thread_state_;
std::unique_ptr<TimedThreadApiProcessor> capture_thread_state_; std::unique_ptr<TimedThreadApiProcessor> capture_thread_state_;
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
}; };
// Implements the callback functionality for the threads. // Implements the callback functionality for the threads.

View File

@ -48,13 +48,12 @@ void TestScreenDrawerLock(
~Task() = default; ~Task() = default;
static void RunTask(void* me) { void RunTask() {
Task* task = static_cast<Task*>(me); std::unique_ptr<ScreenDrawerLock> lock = ctor_();
std::unique_ptr<ScreenDrawerLock> lock = task->ctor_();
ASSERT_TRUE(!!lock); ASSERT_TRUE(!!lock);
task->created_->store(true); created_->store(true);
// Wait for the main thread to get the signal of created_. // Wait for the main thread to get the signal of created_.
while (!task->ready_.load()) { while (!ready_.load()) {
SleepMs(1); SleepMs(1);
} }
// At this point, main thread should begin to create a second lock. Though // At this point, main thread should begin to create a second lock. Though
@ -77,8 +76,8 @@ void TestScreenDrawerLock(
const rtc::FunctionView<std::unique_ptr<ScreenDrawerLock>()> ctor_; const rtc::FunctionView<std::unique_ptr<ScreenDrawerLock>()> ctor_;
} task(&created, ready, ctor); } task(&created, ready, ctor);
rtc::PlatformThread lock_thread(&Task::RunTask, &task, "lock_thread"); auto lock_thread = rtc::PlatformThread::SpawnJoinable(
lock_thread.Start(); [&task] { task.RunTask(); }, "lock_thread");
// Wait for the first lock in Task::RunTask() to be created. // Wait for the first lock in Task::RunTask() to be created.
// TODO(zijiehe): Find a better solution to wait for the creation of the first // TODO(zijiehe): Find a better solution to wait for the creation of the first
@ -95,7 +94,6 @@ void TestScreenDrawerLock(
ASSERT_GT(kLockDurationMs, rtc::TimeMillis() - start_ms); ASSERT_GT(kLockDurationMs, rtc::TimeMillis() - start_ms);
ctor(); ctor();
ASSERT_LE(kLockDurationMs, rtc::TimeMillis() - start_ms); ASSERT_LE(kLockDurationMs, rtc::TimeMillis() - start_ms);
lock_thread.Stop();
} }
} // namespace } // namespace

View File

@ -48,7 +48,6 @@ ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
ProcessThreadImpl::~ProcessThreadImpl() { ProcessThreadImpl::~ProcessThreadImpl() {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_.get());
RTC_DCHECK(!stop_); RTC_DCHECK(!stop_);
while (!delayed_tasks_.empty()) { while (!delayed_tasks_.empty()) {
@ -72,8 +71,8 @@ void ProcessThreadImpl::Delete() {
// Doesn't need locking, because the contending thread isn't running. // Doesn't need locking, because the contending thread isn't running.
void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS { void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_.get()); RTC_DCHECK(thread_.empty());
if (thread_.get()) if (!thread_.empty())
return; return;
RTC_DCHECK(!stop_); RTC_DCHECK(!stop_);
@ -81,14 +80,18 @@ void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
for (ModuleCallback& m : modules_) for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(this); m.module->ProcessThreadAttached(this);
thread_.reset( thread_ = rtc::PlatformThread::SpawnJoinable(
new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_)); [this] {
thread_->Start(); CurrentTaskQueueSetter set_current(this);
while (Process()) {
}
},
thread_name_);
} }
void ProcessThreadImpl::Stop() { void ProcessThreadImpl::Stop() {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
if (!thread_.get()) if (thread_.empty())
return; return;
{ {
@ -98,9 +101,7 @@ void ProcessThreadImpl::Stop() {
} }
wake_up_.Set(); wake_up_.Set();
thread_.Finalize();
thread_->Stop();
thread_.reset();
StopNoLocks(); StopNoLocks();
} }
@ -108,7 +109,7 @@ void ProcessThreadImpl::Stop() {
// No locking needed, since this is called after the contending thread is // No locking needed, since this is called after the contending thread is
// stopped. // stopped.
void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS { void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(!thread_); RTC_DCHECK(thread_.empty());
stop_ = false; stop_ = false;
for (ModuleCallback& m : modules_) for (ModuleCallback& m : modules_)
@ -199,7 +200,7 @@ void ProcessThreadImpl::RegisterModule(Module* module,
// Now that we know the module isn't in the list, we'll call out to notify // Now that we know the module isn't in the list, we'll call out to notify
// the module that it's attached to the worker thread. We don't hold // the module that it's attached to the worker thread. We don't hold
// the lock while we make this call. // the lock while we make this call.
if (thread_.get()) if (!thread_.empty())
module->ProcessThreadAttached(this); module->ProcessThreadAttached(this);
{ {
@ -227,14 +228,6 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) {
module->ProcessThreadAttached(nullptr); module->ProcessThreadAttached(nullptr);
} }
// static
void ProcessThreadImpl::Run(void* obj) {
ProcessThreadImpl* impl = static_cast<ProcessThreadImpl*>(obj);
CurrentTaskQueueSetter set_current(impl);
while (impl->Process()) {
}
}
bool ProcessThreadImpl::Process() { bool ProcessThreadImpl::Process() {
TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_); TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
int64_t now = rtc::TimeMillis(); int64_t now = rtc::TimeMillis();

View File

@ -45,7 +45,6 @@ class ProcessThreadImpl : public ProcessThread {
void DeRegisterModule(Module* module) override; void DeRegisterModule(Module* module) override;
protected: protected:
static void Run(void* obj);
bool Process(); bool Process();
private: private:
@ -97,8 +96,7 @@ class ProcessThreadImpl : public ProcessThread {
SequenceChecker thread_checker_; SequenceChecker thread_checker_;
rtc::Event wake_up_; rtc::Event wake_up_;
// TODO(pbos): Remove unique_ptr and stop recreating the thread. rtc::PlatformThread thread_;
std::unique_ptr<rtc::PlatformThread> thread_;
ModuleList modules_ RTC_GUARDED_BY(mutex_); ModuleList modules_ RTC_GUARDED_BY(mutex_);
// Set to true when calling Process, to allow reentrant calls to WakeUp. // Set to true when calling Process, to allow reentrant calls to WakeUp.

View File

@ -240,12 +240,15 @@ int32_t VideoCaptureModuleV4L2::StartCapture(
} }
// start capture thread; // start capture thread;
if (!_captureThread) { if (_captureThread.empty()) {
quit_ = false; quit_ = false;
_captureThread.reset(new rtc::PlatformThread( _captureThread = rtc::PlatformThread::SpawnJoinable(
VideoCaptureModuleV4L2::CaptureThread, this, "CaptureThread", [this] {
rtc::ThreadAttributes().SetPriority(rtc::kHighPriority))); while (CaptureProcess()) {
_captureThread->Start(); }
},
"CaptureThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kHigh));
} }
// Needed to start UVC camera - from the uvcview application // Needed to start UVC camera - from the uvcview application
@ -261,14 +264,13 @@ int32_t VideoCaptureModuleV4L2::StartCapture(
} }
int32_t VideoCaptureModuleV4L2::StopCapture() { int32_t VideoCaptureModuleV4L2::StopCapture() {
if (_captureThread) { if (!_captureThread.empty()) {
{ {
MutexLock lock(&capture_lock_); MutexLock lock(&capture_lock_);
quit_ = true; quit_ = true;
} }
// Make sure the capture thread stop stop using the critsect. // Make sure the capture thread stops using the mutex.
_captureThread->Stop(); _captureThread.Finalize();
_captureThread.reset();
} }
MutexLock lock(&capture_lock_); MutexLock lock(&capture_lock_);
@ -356,11 +358,6 @@ bool VideoCaptureModuleV4L2::CaptureStarted() {
return _captureStarted; return _captureStarted;
} }
void VideoCaptureModuleV4L2::CaptureThread(void* obj) {
VideoCaptureModuleV4L2* capture = static_cast<VideoCaptureModuleV4L2*>(obj);
while (capture->CaptureProcess()) {
}
}
bool VideoCaptureModuleV4L2::CaptureProcess() { bool VideoCaptureModuleV4L2::CaptureProcess() {
int retVal = 0; int retVal = 0;
fd_set rSet; fd_set rSet;

View File

@ -41,8 +41,7 @@ class VideoCaptureModuleV4L2 : public VideoCaptureImpl {
bool AllocateVideoBuffers(); bool AllocateVideoBuffers();
bool DeAllocateVideoBuffers(); bool DeAllocateVideoBuffers();
// TODO(pbos): Stop using unique_ptr and resetting the thread. rtc::PlatformThread _captureThread;
std::unique_ptr<rtc::PlatformThread> _captureThread;
Mutex capture_lock_; Mutex capture_lock_;
bool quit_ RTC_GUARDED_BY(capture_lock_); bool quit_ RTC_GUARDED_BY(capture_lock_);
int32_t _deviceId; int32_t _deviceId;

View File

@ -245,6 +245,7 @@ rtc_library("platform_thread") {
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
] ]
} }
@ -561,7 +562,10 @@ if (is_win) {
"../api/task_queue", "../api/task_queue",
"synchronization:mutex", "synchronization:mutex",
] ]
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] absl_deps = [
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
} }
} }
@ -1413,6 +1417,7 @@ if (rtc_include_tests) {
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/types:optional",
] ]
} }

View File

@ -123,7 +123,7 @@ void AsyncResolver::Start(const SocketAddress& addr) {
RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK(!destroy_called_); RTC_DCHECK(!destroy_called_);
addr_ = addr; addr_ = addr;
auto thread_function = PlatformThread::SpawnDetached(
[this, addr, caller_task_queue = webrtc::TaskQueueBase::Current(), [this, addr, caller_task_queue = webrtc::TaskQueueBase::Current(),
state = state_] { state = state_] {
std::vector<IPAddress> addresses; std::vector<IPAddress> addresses;
@ -146,14 +146,8 @@ void AsyncResolver::Start(const SocketAddress& addr) {
} }
})); }));
} }
}; },
PlatformThread thread(RunResolution, "AsyncResolver");
new std::function<void()>(std::move(thread_function)),
"NameResolution", ThreadAttributes().SetDetached());
thread.Start();
// Although |thread| is detached, the PlatformThread contract mandates to call
// Stop() before destruction. The call doesn't actually stop anything.
thread.Stop();
} }
bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const { bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const {

View File

@ -30,8 +30,7 @@ const int kProcessingTimeMillisecs = 500;
const int kWorkingThreads = 2; const int kWorkingThreads = 2;
// Consumes approximately kProcessingTimeMillisecs of CPU time in single thread. // Consumes approximately kProcessingTimeMillisecs of CPU time in single thread.
void WorkingFunction(void* counter_pointer) { void WorkingFunction(int64_t* counter) {
int64_t* counter = reinterpret_cast<int64_t*>(counter_pointer);
*counter = 0; *counter = 0;
int64_t stop_cpu_time = int64_t stop_cpu_time =
rtc::GetThreadCpuTimeNanos() + rtc::GetThreadCpuTimeNanos() +
@ -62,14 +61,12 @@ TEST(CpuTimeTest, MAYBE_TEST(TwoThreads)) {
int64_t thread_start_time_nanos = GetThreadCpuTimeNanos(); int64_t thread_start_time_nanos = GetThreadCpuTimeNanos();
int64_t counter1; int64_t counter1;
int64_t counter2; int64_t counter2;
PlatformThread thread1(WorkingFunction, reinterpret_cast<void*>(&counter1), auto thread1 = PlatformThread::SpawnJoinable(
"Thread1"); [&counter1] { WorkingFunction(&counter1); }, "Thread1");
PlatformThread thread2(WorkingFunction, reinterpret_cast<void*>(&counter2), auto thread2 = PlatformThread::SpawnJoinable(
"Thread2"); [&counter2] { WorkingFunction(&counter2); }, "Thread2");
thread1.Start(); thread1.Finalize();
thread2.Start(); thread2.Finalize();
thread1.Stop();
thread2.Stop();
EXPECT_GE(counter1, 0); EXPECT_GE(counter1, 0);
EXPECT_GE(counter2, 0); EXPECT_GE(counter2, 0);

View File

@ -329,33 +329,28 @@ class PerfTestData {
class PerfTestThread { class PerfTestThread {
public: public:
PerfTestThread() : thread_(&ThreadFunc, this, "CsPerf") {}
void Start(PerfTestData* data, int repeats, int id) { void Start(PerfTestData* data, int repeats, int id) {
RTC_DCHECK(!thread_.IsRunning());
RTC_DCHECK(!data_); RTC_DCHECK(!data_);
data_ = data; data_ = data;
repeats_ = repeats; repeats_ = repeats;
my_id_ = id; my_id_ = id;
thread_.Start(); thread_ = PlatformThread::SpawnJoinable(
[this] {
for (int i = 0; i < repeats_; ++i)
data_->AddToCounter(my_id_);
},
"CsPerf");
} }
void Stop() { void Stop() {
RTC_DCHECK(thread_.IsRunning());
RTC_DCHECK(data_); RTC_DCHECK(data_);
thread_.Stop(); thread_.Finalize();
repeats_ = 0; repeats_ = 0;
data_ = nullptr; data_ = nullptr;
my_id_ = 0; my_id_ = 0;
} }
private: private:
static void ThreadFunc(void* param) {
PerfTestThread* me = static_cast<PerfTestThread*>(param);
for (int i = 0; i < me->repeats_; ++i)
me->data_->AddToCounter(me->my_id_);
}
PlatformThread thread_; PlatformThread thread_;
PerfTestData* data_ = nullptr; PerfTestData* data_ = nullptr;
int repeats_ = 0; int repeats_ = 0;

View File

@ -79,19 +79,12 @@ namespace rtc {
namespace tracing { namespace tracing {
namespace { namespace {
static void EventTracingThreadFunc(void* params);
// Atomic-int fast path for avoiding logging when disabled. // Atomic-int fast path for avoiding logging when disabled.
static volatile int g_event_logging_active = 0; static volatile int g_event_logging_active = 0;
// TODO(pbos): Log metadata for all threads, etc. // TODO(pbos): Log metadata for all threads, etc.
class EventLogger final { class EventLogger final {
public: public:
EventLogger()
: logging_thread_(EventTracingThreadFunc,
this,
"EventTracingThread",
ThreadAttributes().SetPriority(kLowPriority)) {}
~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); } ~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); }
void AddTraceEvent(const char* name, void AddTraceEvent(const char* name,
@ -209,7 +202,8 @@ class EventLogger final {
rtc::AtomicOps::CompareAndSwap(&g_event_logging_active, 0, 1)); rtc::AtomicOps::CompareAndSwap(&g_event_logging_active, 0, 1));
// Finally start, everything should be set up now. // Finally start, everything should be set up now.
logging_thread_.Start(); logging_thread_ =
PlatformThread::SpawnJoinable([this] { Log(); }, "EventTracingThread");
TRACE_EVENT_INSTANT0("webrtc", "EventLogger::Start"); TRACE_EVENT_INSTANT0("webrtc", "EventLogger::Start");
} }
@ -223,7 +217,7 @@ class EventLogger final {
// Wake up logging thread to finish writing. // Wake up logging thread to finish writing.
shutdown_event_.Set(); shutdown_event_.Set();
// Join the logging thread. // Join the logging thread.
logging_thread_.Stop(); logging_thread_.Finalize();
} }
private: private:
@ -326,10 +320,6 @@ class EventLogger final {
bool output_file_owned_ = false; bool output_file_owned_ = false;
}; };
static void EventTracingThreadFunc(void* params) {
static_cast<EventLogger*>(params)->Log();
}
static EventLogger* volatile g_event_logger = nullptr; static EventLogger* volatile g_event_logger = nullptr;
static const char* const kDisabledTracePrefix = TRACE_DISABLED_BY_DEFAULT(""); static const char* const kDisabledTracePrefix = TRACE_DISABLED_BY_DEFAULT("");
const unsigned char* InternalGetCategoryEnabled(const char* name) { const unsigned char* InternalGetCategoryEnabled(const char* name) {

View File

@ -43,22 +43,21 @@ TEST(EventTest, AutoReset) {
class SignalerThread { class SignalerThread {
public: public:
SignalerThread() : thread_(&ThreadFn, this, "EventPerf") {}
void Start(Event* writer, Event* reader) { void Start(Event* writer, Event* reader) {
writer_ = writer; writer_ = writer;
reader_ = reader; reader_ = reader;
thread_.Start(); thread_ = PlatformThread::SpawnJoinable(
[this] {
while (!stop_event_.Wait(0)) {
writer_->Set();
reader_->Wait(Event::kForever);
}
},
"EventPerf");
} }
void Stop() { void Stop() {
stop_event_.Set(); stop_event_.Set();
thread_.Stop(); thread_.Finalize();
}
static void ThreadFn(void* param) {
auto* me = static_cast<SignalerThread*>(param);
while (!me->stop_event_.Wait(0)) {
me->writer_->Set();
me->reader_->Wait(Event::kForever);
}
} }
Event stop_event_; Event stop_event_;
Event* writer_; Event* writer_;

View File

@ -160,18 +160,13 @@ TEST(LogTest, MultipleStreams) {
class LogThread { class LogThread {
public: public:
LogThread() : thread_(&ThreadEntry, this, "LogThread") {} void Start() {
~LogThread() { thread_.Stop(); } thread_ = PlatformThread::SpawnJoinable(
[] { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }, "LogThread");
void Start() { thread_.Start(); } }
private: private:
void Run() { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }
static void ThreadEntry(void* p) { static_cast<LogThread*>(p)->Run(); }
PlatformThread thread_; PlatformThread thread_;
Event event_;
}; };
// Ensure we don't crash when adding/removing streams while threads are going. // Ensure we don't crash when adding/removing streams while threads are going.

View File

@ -10,32 +10,37 @@
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include <algorithm>
#include <memory> #include <memory>
#if !defined(WEBRTC_WIN) #if !defined(WEBRTC_WIN)
#include <sched.h> #include <sched.h>
#endif #endif
#include <stdint.h>
#include <time.h>
#include <algorithm>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
namespace rtc { namespace rtc {
namespace { namespace {
struct ThreadStartData {
ThreadRunFunction run_function; #if defined(WEBRTC_WIN)
void* obj; int Win32PriorityFromThreadPriority(ThreadPriority priority) {
std::string thread_name; switch (priority) {
ThreadPriority priority; case ThreadPriority::kLow:
}; return THREAD_PRIORITY_BELOW_NORMAL;
case ThreadPriority::kNormal:
return THREAD_PRIORITY_NORMAL;
case ThreadPriority::kHigh:
return THREAD_PRIORITY_ABOVE_NORMAL;
case ThreadPriority::kRealtime:
return THREAD_PRIORITY_TIME_CRITICAL;
}
}
#endif
bool SetPriority(ThreadPriority priority) { bool SetPriority(ThreadPriority priority) {
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
return SetThreadPriority(GetCurrentThread(), priority) != FALSE; return SetThreadPriority(GetCurrentThread(),
Win32PriorityFromThreadPriority(priority)) != FALSE;
#elif defined(__native_client__) || defined(WEBRTC_FUCHSIA) #elif defined(__native_client__) || defined(WEBRTC_FUCHSIA)
// Setting thread priorities is not supported in NaCl or Fuchsia. // Setting thread priorities is not supported in NaCl or Fuchsia.
return true; return true;
@ -59,21 +64,18 @@ bool SetPriority(ThreadPriority priority) {
const int top_prio = max_prio - 1; const int top_prio = max_prio - 1;
const int low_prio = min_prio + 1; const int low_prio = min_prio + 1;
switch (priority) { switch (priority) {
case kLowPriority: case ThreadPriority::kLow:
param.sched_priority = low_prio; param.sched_priority = low_prio;
break; break;
case kNormalPriority: case ThreadPriority::kNormal:
// The -1 ensures that the kHighPriority is always greater or equal to // The -1 ensures that the kHighPriority is always greater or equal to
// kNormalPriority. // kNormalPriority.
param.sched_priority = (low_prio + top_prio - 1) / 2; param.sched_priority = (low_prio + top_prio - 1) / 2;
break; break;
case kHighPriority: case ThreadPriority::kHigh:
param.sched_priority = std::max(top_prio - 2, low_prio); param.sched_priority = std::max(top_prio - 2, low_prio);
break; break;
case kHighestPriority: case ThreadPriority::kRealtime:
param.sched_priority = std::max(top_prio - 1, low_prio);
break;
case kRealtimePriority:
param.sched_priority = top_prio; param.sched_priority = top_prio;
break; break;
} }
@ -81,124 +83,129 @@ bool SetPriority(ThreadPriority priority) {
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
} }
void RunPlatformThread(std::unique_ptr<ThreadStartData> data) {
rtc::SetCurrentThreadName(data->thread_name.c_str());
data->thread_name.clear();
SetPriority(data->priority);
data->run_function(data->obj);
}
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
DWORD WINAPI StartThread(void* param) { DWORD WINAPI RunPlatformThread(void* param) {
// The GetLastError() function only returns valid results when it is called // The GetLastError() function only returns valid results when it is called
// after a Win32 API function that returns a "failed" result. A crash dump // after a Win32 API function that returns a "failed" result. A crash dump
// contains the result from GetLastError() and to make sure it does not // contains the result from GetLastError() and to make sure it does not
// falsely report a Windows error we call SetLastError here. // falsely report a Windows error we call SetLastError here.
::SetLastError(ERROR_SUCCESS); ::SetLastError(ERROR_SUCCESS);
RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param))); auto function = static_cast<std::function<void()>*>(param);
(*function)();
delete function;
return 0; return 0;
} }
#else #else
void* StartThread(void* param) { void* RunPlatformThread(void* param) {
RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param))); auto function = static_cast<std::function<void()>*>(param);
(*function)();
delete function;
return 0; return 0;
} }
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
} // namespace } // namespace
PlatformThread::PlatformThread(ThreadRunFunction func, PlatformThread::PlatformThread(Handle handle, bool joinable)
void* obj, : handle_(handle), joinable_(joinable) {}
absl::string_view thread_name,
ThreadAttributes attributes) PlatformThread::PlatformThread(PlatformThread&& rhs)
: run_function_(func), : handle_(rhs.handle_), joinable_(rhs.joinable_) {
attributes_(attributes), rhs.handle_ = absl::nullopt;
obj_(obj), }
name_(thread_name) {
RTC_DCHECK(func); PlatformThread& PlatformThread::operator=(PlatformThread&& rhs) {
RTC_DCHECK(!name_.empty()); Finalize();
// TODO(tommi): Consider lowering the limit to 15 (limit on Linux). handle_ = rhs.handle_;
RTC_DCHECK(name_.length() < 64); joinable_ = rhs.joinable_;
rhs.handle_ = absl::nullopt;
return *this;
} }
PlatformThread::~PlatformThread() { PlatformThread::~PlatformThread() {
RTC_DCHECK_RUN_ON(&thread_checker_); Finalize();
RTC_DCHECK(!thread_);
#if defined(WEBRTC_WIN)
RTC_DCHECK(!thread_id_);
#endif // defined(WEBRTC_WIN)
} }
void PlatformThread::Start() { PlatformThread PlatformThread::SpawnJoinable(
RTC_DCHECK_RUN_ON(&thread_checker_); std::function<void()> thread_function,
RTC_DCHECK(!thread_) << "Thread already started?"; absl::string_view name,
ThreadStartData* data = ThreadAttributes attributes) {
new ThreadStartData{run_function_, obj_, name_, attributes_.priority}; return SpawnThread(std::move(thread_function), name, attributes,
/*joinable=*/true);
}
PlatformThread PlatformThread::SpawnDetached(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes) {
return SpawnThread(std::move(thread_function), name, attributes,
/*joinable=*/false);
}
absl::optional<PlatformThread::Handle> PlatformThread::GetHandle() const {
return handle_;
}
#if defined(WEBRTC_WIN)
bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) {
RTC_DCHECK(handle_.has_value());
return handle_.has_value() ? QueueUserAPC(function, *handle_, data) != FALSE
: false;
}
#endif
void PlatformThread::Finalize() {
if (!handle_.has_value())
return;
#if defined(WEBRTC_WIN)
if (joinable_)
WaitForSingleObject(*handle_, INFINITE);
CloseHandle(*handle_);
#else
if (joinable_)
RTC_CHECK_EQ(0, pthread_join(*handle_, nullptr));
#endif
handle_ = absl::nullopt;
}
PlatformThread PlatformThread::SpawnThread(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes,
bool joinable) {
RTC_DCHECK(thread_function);
RTC_DCHECK(!name.empty());
// TODO(tommi): Consider lowering the limit to 15 (limit on Linux).
RTC_DCHECK(name.length() < 64);
auto start_thread_function_ptr =
new std::function<void()>([thread_function = std::move(thread_function),
name = std::string(name), attributes] {
rtc::SetCurrentThreadName(name.c_str());
SetPriority(attributes.priority);
thread_function();
});
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
// See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION. // See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION.
// Set the reserved stack stack size to 1M, which is the default on Windows // Set the reserved stack stack size to 1M, which is the default on Windows
// and Linux. // and Linux.
thread_ = ::CreateThread(nullptr, 1024 * 1024, &StartThread, data, DWORD thread_id = 0;
STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id_); PlatformThread::Handle handle = ::CreateThread(
RTC_CHECK(thread_) << "CreateThread failed"; nullptr, 1024 * 1024, &RunPlatformThread, start_thread_function_ptr,
RTC_DCHECK(thread_id_); STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id);
RTC_CHECK(handle) << "CreateThread failed";
#else #else
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
// Set the stack stack size to 1M. // Set the stack stack size to 1M.
pthread_attr_setstacksize(&attr, 1024 * 1024); pthread_attr_setstacksize(&attr, 1024 * 1024);
pthread_attr_setdetachstate(&attr, attributes_.joinable pthread_attr_setdetachstate(
? PTHREAD_CREATE_JOINABLE &attr, joinable ? PTHREAD_CREATE_JOINABLE : PTHREAD_CREATE_DETACHED);
: PTHREAD_CREATE_DETACHED); PlatformThread::Handle handle;
RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, data)); RTC_CHECK_EQ(0, pthread_create(&handle, &attr, &RunPlatformThread,
start_thread_function_ptr));
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
return PlatformThread(handle, joinable);
} }
bool PlatformThread::IsRunning() const {
RTC_DCHECK_RUN_ON(&thread_checker_);
#if defined(WEBRTC_WIN)
return thread_ != nullptr;
#else
return thread_ != 0;
#endif // defined(WEBRTC_WIN)
}
PlatformThreadRef PlatformThread::GetThreadRef() const {
#if defined(WEBRTC_WIN)
return thread_id_;
#else
return thread_;
#endif // defined(WEBRTC_WIN)
}
void PlatformThread::Stop() {
RTC_DCHECK_RUN_ON(&thread_checker_);
if (!IsRunning())
return;
#if defined(WEBRTC_WIN)
if (attributes_.joinable) {
WaitForSingleObject(thread_, INFINITE);
}
CloseHandle(thread_);
thread_ = nullptr;
thread_id_ = 0;
#else
if (attributes_.joinable) {
RTC_CHECK_EQ(0, pthread_join(thread_, nullptr));
}
thread_ = 0;
#endif // defined(WEBRTC_WIN)
}
#if defined(WEBRTC_WIN)
bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(IsRunning());
return QueueUserAPC(function, thread_, data) != FALSE;
}
#endif
} // namespace rtc } // namespace rtc

View File

@ -11,103 +11,101 @@
#ifndef RTC_BASE_PLATFORM_THREAD_H_ #ifndef RTC_BASE_PLATFORM_THREAD_H_
#define RTC_BASE_PLATFORM_THREAD_H_ #define RTC_BASE_PLATFORM_THREAD_H_
#ifndef WEBRTC_WIN #include <functional>
#include <pthread.h>
#endif
#include <string> #include <string>
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "api/sequence_checker.h" #include "absl/types/optional.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/platform_thread_types.h" #include "rtc_base/platform_thread_types.h"
namespace rtc { namespace rtc {
// Callback function that the spawned thread will enter once spawned. enum class ThreadPriority {
typedef void (*ThreadRunFunction)(void*); kLow = 1,
kNormal,
enum ThreadPriority { kHigh,
#ifdef WEBRTC_WIN kRealtime,
kLowPriority = THREAD_PRIORITY_BELOW_NORMAL,
kNormalPriority = THREAD_PRIORITY_NORMAL,
kHighPriority = THREAD_PRIORITY_ABOVE_NORMAL,
kHighestPriority = THREAD_PRIORITY_HIGHEST,
kRealtimePriority = THREAD_PRIORITY_TIME_CRITICAL
#else
kLowPriority = 1,
kNormalPriority = 2,
kHighPriority = 3,
kHighestPriority = 4,
kRealtimePriority = 5
#endif
}; };
struct ThreadAttributes { struct ThreadAttributes {
ThreadPriority priority = kNormalPriority; ThreadPriority priority = ThreadPriority::kNormal;
bool joinable = true;
ThreadAttributes& SetPriority(ThreadPriority priority_param) { ThreadAttributes& SetPriority(ThreadPriority priority_param) {
priority = priority_param; priority = priority_param;
return *this; return *this;
} }
ThreadAttributes& SetDetached() {
joinable = false;
return *this;
}
}; };
// Represents a simple worker thread. The implementation must be assumed // Represents a simple worker thread.
// to be single threaded, meaning that all methods of the class, must be class PlatformThread final {
// called from the same thread, including instantiation.
class PlatformThread {
public: public:
PlatformThread(ThreadRunFunction func, // Handle is the base platform thread handle.
void* obj, #if defined(WEBRTC_WIN)
absl::string_view thread_name, using Handle = HANDLE;
ThreadAttributes attributes = ThreadAttributes()); #else
using Handle = pthread_t;
#endif // defined(WEBRTC_WIN)
// This ctor creates the PlatformThread with an unset handle (returning true
// in empty()) and is provided for convenience.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread() = default;
// Moves |rhs| into this, storing an empty state in |rhs|.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread(PlatformThread&& rhs);
// Moves |rhs| into this, storing an empty state in |rhs|.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread& operator=(PlatformThread&& rhs);
// For a PlatformThread that's been spawned joinable, the destructor suspends
// the calling thread until the created thread exits unless the thread has
// already exited.
virtual ~PlatformThread(); virtual ~PlatformThread();
const std::string& name() const { return name_; } // Finalizes any allocated resources.
// For a PlatformThread that's been spawned joinable, Finalize() suspends
// the calling thread until the created thread exits unless the thread has
// already exited.
// empty() returns true after completion.
void Finalize();
// Spawns a thread and tries to set thread priority according to the priority // Returns true if default constructed, moved from, or Finalize()ed.
// from when CreateThread was called. bool empty() const { return !handle_.has_value(); }
// Start can only be called after the constructor or after a call to Stop().
void Start();
bool IsRunning() const; // Creates a started joinable thread which will be joined when the returned
// PlatformThread destructs or Finalize() is called.
static PlatformThread SpawnJoinable(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes = ThreadAttributes());
// Returns an identifier for the worker thread that can be used to do // Creates a started detached thread. The caller has to use external
// thread checks. // synchronization as nothing is provided by the PlatformThread construct.
PlatformThreadRef GetThreadRef() const; static PlatformThread SpawnDetached(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes = ThreadAttributes());
// Stop() prepares the PlatformThread for destruction or another call to // Returns the base platform thread handle of this thread.
// Start(). For a PlatformThread that's been created with absl::optional<Handle> GetHandle() const;
// ThreadAttributes::joinable true (the default), Stop() suspends the calling
// thread until the created thread exits unless the thread has already exited.
// Stop() can only be called after calling Start().
void Stop();
protected:
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
// Exposed to derived classes to allow for special cases specific to Windows. // Queue a Windows APC function that runs when the thread is alertable.
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data); bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data);
#endif #endif
private: private:
ThreadRunFunction const run_function_ = nullptr; PlatformThread(Handle handle, bool joinable);
const ThreadAttributes attributes_; static PlatformThread SpawnThread(std::function<void()> thread_function,
void* const obj_; absl::string_view name,
// TODO(pbos): Make sure call sites use string literals and update to a const ThreadAttributes attributes,
// char* instead of a std::string. bool joinable);
const std::string name_;
webrtc::SequenceChecker thread_checker_; absl::optional<Handle> handle_;
#if defined(WEBRTC_WIN) bool joinable_ = false;
HANDLE thread_ = nullptr;
DWORD thread_id_ = 0;
#else
pthread_t thread_ = 0;
#endif // defined(WEBRTC_WIN)
RTC_DISALLOW_COPY_AND_ASSIGN(PlatformThread);
}; };
} // namespace rtc } // namespace rtc

View File

@ -10,69 +10,73 @@
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include "absl/types/optional.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "system_wrappers/include/sleep.h" #include "system_wrappers/include/sleep.h"
#include "test/gmock.h" #include "test/gmock.h"
namespace rtc { namespace rtc {
namespace {
void NullRunFunction(void* obj) {} TEST(PlatformThreadTest, DefaultConstructedIsEmpty) {
PlatformThread thread;
// Function that sets a boolean. EXPECT_EQ(thread.GetHandle(), absl::nullopt);
void SetFlagRunFunction(void* obj) { EXPECT_TRUE(thread.empty());
bool* obj_as_bool = static_cast<bool*>(obj);
*obj_as_bool = true;
} }
void StdFunctionRunFunction(void* obj) { TEST(PlatformThreadTest, StartFinalize) {
std::function<void()>* fun = static_cast<std::function<void()>*>(obj); PlatformThread thread = PlatformThread::SpawnJoinable([] {}, "1");
(*fun)(); EXPECT_NE(thread.GetHandle(), absl::nullopt);
EXPECT_FALSE(thread.empty());
thread.Finalize();
EXPECT_TRUE(thread.empty());
thread = PlatformThread::SpawnDetached([] {}, "2");
EXPECT_FALSE(thread.empty());
thread.Finalize();
EXPECT_TRUE(thread.empty());
} }
} // namespace TEST(PlatformThreadTest, MovesEmpty) {
PlatformThread thread1;
TEST(PlatformThreadTest, StartStop) { PlatformThread thread2 = std::move(thread1);
PlatformThread thread(&NullRunFunction, nullptr, "PlatformThreadTest"); EXPECT_TRUE(thread1.empty());
EXPECT_TRUE(thread.name() == "PlatformThreadTest"); EXPECT_TRUE(thread2.empty());
EXPECT_TRUE(thread.GetThreadRef() == 0);
thread.Start();
EXPECT_TRUE(thread.GetThreadRef() != 0);
thread.Stop();
EXPECT_TRUE(thread.GetThreadRef() == 0);
} }
TEST(PlatformThreadTest, StartStop2) { TEST(PlatformThreadTest, MovesHandles) {
PlatformThread thread1(&NullRunFunction, nullptr, "PlatformThreadTest1"); PlatformThread thread1 = PlatformThread::SpawnJoinable([] {}, "1");
PlatformThread thread2(&NullRunFunction, nullptr, "PlatformThreadTest2"); PlatformThread thread2 = std::move(thread1);
EXPECT_TRUE(thread1.GetThreadRef() == thread2.GetThreadRef()); EXPECT_TRUE(thread1.empty());
thread1.Start(); EXPECT_FALSE(thread2.empty());
thread2.Start(); thread1 = PlatformThread::SpawnDetached([] {}, "2");
EXPECT_TRUE(thread1.GetThreadRef() != thread2.GetThreadRef()); thread2 = std::move(thread1);
thread2.Stop(); EXPECT_TRUE(thread1.empty());
thread1.Stop(); EXPECT_FALSE(thread2.empty());
}
TEST(PlatformThreadTest,
TwoThreadHandlesAreDifferentWhenStartedAndEqualWhenJoined) {
PlatformThread thread1 = PlatformThread();
PlatformThread thread2 = PlatformThread();
EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle());
thread1 = PlatformThread::SpawnJoinable([] {}, "1");
thread2 = PlatformThread::SpawnJoinable([] {}, "2");
EXPECT_NE(thread1.GetHandle(), thread2.GetHandle());
thread1.Finalize();
EXPECT_NE(thread1.GetHandle(), thread2.GetHandle());
thread2.Finalize();
EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle());
} }
TEST(PlatformThreadTest, RunFunctionIsCalled) { TEST(PlatformThreadTest, RunFunctionIsCalled) {
bool flag = false; bool flag = false;
PlatformThread thread(&SetFlagRunFunction, &flag, "RunFunctionIsCalled"); PlatformThread::SpawnJoinable([&] { flag = true; }, "T");
thread.Start();
// At this point, the flag may be either true or false.
thread.Stop();
// We expect the thread to have run at least once.
EXPECT_TRUE(flag); EXPECT_TRUE(flag);
} }
TEST(PlatformThreadTest, JoinsThread) { TEST(PlatformThreadTest, JoinsThread) {
// This test flakes if there are problems with the join implementation. // This test flakes if there are problems with the join implementation.
EXPECT_TRUE(ThreadAttributes().joinable);
rtc::Event event; rtc::Event event;
std::function<void()> thread_function = [&] { event.Set(); }; PlatformThread::SpawnJoinable([&] { event.Set(); }, "T");
PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T");
thread.Start();
thread.Stop();
EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0)); EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0));
} }
@ -83,18 +87,14 @@ TEST(PlatformThreadTest, StopsBeforeDetachedThreadExits) {
rtc::Event thread_started; rtc::Event thread_started;
rtc::Event thread_continue; rtc::Event thread_continue;
rtc::Event thread_exiting; rtc::Event thread_exiting;
std::function<void()> thread_function = [&] { PlatformThread::SpawnDetached(
thread_started.Set(); [&] {
thread_continue.Wait(Event::kForever); thread_started.Set();
flag = true; thread_continue.Wait(Event::kForever);
thread_exiting.Set(); flag = true;
}; thread_exiting.Set();
{ },
PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T", "T");
ThreadAttributes().SetDetached());
thread.Start();
thread.Stop();
}
thread_started.Wait(Event::kForever); thread_started.Wait(Event::kForever);
EXPECT_FALSE(flag); EXPECT_FALSE(flag);
thread_continue.Set(); thread_continue.Set();

View File

@ -127,10 +127,6 @@ class ThreadTask {
rtc::Event end_signal_; rtc::Event end_signal_;
}; };
void RunTask(void* thread_task) {
reinterpret_cast<ThreadTask*>(thread_task)->Run();
}
TEST_F(RateLimitTest, MultiThreadedUsage) { TEST_F(RateLimitTest, MultiThreadedUsage) {
// Simple sanity test, with different threads calling the various methods. // Simple sanity test, with different threads calling the various methods.
// Runs a few simple tasks, each on its own thread, but coordinated with // Runs a few simple tasks, each on its own thread, but coordinated with
@ -149,8 +145,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
EXPECT_TRUE(rate_limiter_->SetWindowSize(kWindowSizeMs / 2)); EXPECT_TRUE(rate_limiter_->SetWindowSize(kWindowSizeMs / 2));
} }
} set_window_size_task(rate_limiter.get()); } set_window_size_task(rate_limiter.get());
rtc::PlatformThread thread1(RunTask, &set_window_size_task, "Thread1"); auto thread1 = rtc::PlatformThread::SpawnJoinable(
thread1.Start(); [&set_window_size_task] { set_window_size_task.Run(); }, "Thread1");
class SetMaxRateTask : public ThreadTask { class SetMaxRateTask : public ThreadTask {
public: public:
@ -160,8 +156,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
void DoRun() override { rate_limiter_->SetMaxRate(kMaxRateBps * 2); } void DoRun() override { rate_limiter_->SetMaxRate(kMaxRateBps * 2); }
} set_max_rate_task(rate_limiter.get()); } set_max_rate_task(rate_limiter.get());
rtc::PlatformThread thread2(RunTask, &set_max_rate_task, "Thread2"); auto thread2 = rtc::PlatformThread::SpawnJoinable(
thread2.Start(); [&set_max_rate_task] { set_max_rate_task.Run(); }, "Thread2");
class UseRateTask : public ThreadTask { class UseRateTask : public ThreadTask {
public: public:
@ -177,8 +173,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
SimulatedClock* const clock_; SimulatedClock* const clock_;
} use_rate_task(rate_limiter.get(), &clock_); } use_rate_task(rate_limiter.get(), &clock_);
rtc::PlatformThread thread3(RunTask, &use_rate_task, "Thread3"); auto thread3 = rtc::PlatformThread::SpawnJoinable(
thread3.Start(); [&use_rate_task] { use_rate_task.Run(); }, "Thread3");
set_window_size_task.start_signal_.Set(); set_window_size_task.start_signal_.Set();
EXPECT_TRUE(set_window_size_task.end_signal_.Wait(kMaxTimeoutMs)); EXPECT_TRUE(set_window_size_task.end_signal_.Wait(kMaxTimeoutMs));
@ -191,10 +187,6 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
// All rate consumed. // All rate consumed.
EXPECT_FALSE(rate_limiter->TryUseRate(1)); EXPECT_FALSE(rate_limiter->TryUseRate(1));
thread1.Stop();
thread2.Stop();
thread3.Stop();
} }
} // namespace webrtc } // namespace webrtc

View File

@ -93,16 +93,12 @@ void EventAssign(struct event* ev,
rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
switch (priority) { switch (priority) {
case Priority::HIGH: case Priority::HIGH:
return rtc::kRealtimePriority; return rtc::ThreadPriority::kRealtime;
case Priority::LOW: case Priority::LOW:
return rtc::kLowPriority; return rtc::ThreadPriority::kLow;
case Priority::NORMAL: case Priority::NORMAL:
return rtc::kNormalPriority; return rtc::ThreadPriority::kNormal;
default:
RTC_NOTREACHED();
break;
} }
return rtc::kNormalPriority;
} }
class TaskQueueLibevent final : public TaskQueueBase { class TaskQueueLibevent final : public TaskQueueBase {
@ -120,7 +116,6 @@ class TaskQueueLibevent final : public TaskQueueBase {
~TaskQueueLibevent() override = default; ~TaskQueueLibevent() override = default;
static void ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT static void OnWakeup(int socket, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT static void RunTimer(int fd, short flags, void* context); // NOLINT
@ -172,11 +167,7 @@ class TaskQueueLibevent::SetTimerTask : public QueuedTask {
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
rtc::ThreadPriority priority) rtc::ThreadPriority priority)
: event_base_(event_base_new()), : event_base_(event_base_new()) {
thread_(&TaskQueueLibevent::ThreadMain,
this,
queue_name,
rtc::ThreadAttributes().SetPriority(priority)) {
int fds[2]; int fds[2];
RTC_CHECK(pipe(fds) == 0); RTC_CHECK(pipe(fds) == 0);
SetNonBlocking(fds[0]); SetNonBlocking(fds[0]);
@ -187,7 +178,18 @@ TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_, EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
EV_READ | EV_PERSIST, OnWakeup, this); EV_READ | EV_PERSIST, OnWakeup, this);
event_add(&wakeup_event_, 0); event_add(&wakeup_event_, 0);
thread_.Start(); thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
{
CurrentTaskQueueSetter set_current(this);
while (is_active_)
event_base_loop(event_base_, 0);
}
for (TimerEvent* timer : pending_timers_)
delete timer;
},
queue_name, rtc::ThreadAttributes().SetPriority(priority));
} }
void TaskQueueLibevent::Delete() { void TaskQueueLibevent::Delete() {
@ -202,7 +204,7 @@ void TaskQueueLibevent::Delete() {
nanosleep(&ts, nullptr); nanosleep(&ts, nullptr);
} }
thread_.Stop(); thread_.Finalize();
event_del(&wakeup_event_); event_del(&wakeup_event_);
@ -255,20 +257,6 @@ void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
} }
} }
// static
void TaskQueueLibevent::ThreadMain(void* context) {
TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
{
CurrentTaskQueueSetter set_current(me);
while (me->is_active_)
event_base_loop(me->event_base_, 0);
}
for (TimerEvent* timer : me->pending_timers_)
delete timer;
}
// static // static
void TaskQueueLibevent::OnWakeup(int socket, void TaskQueueLibevent::OnWakeup(int socket,
short flags, // NOLINT short flags, // NOLINT

View File

@ -36,14 +36,11 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
TaskQueueFactory::Priority priority) { TaskQueueFactory::Priority priority) {
switch (priority) { switch (priority) {
case TaskQueueFactory::Priority::HIGH: case TaskQueueFactory::Priority::HIGH:
return rtc::kRealtimePriority; return rtc::ThreadPriority::kRealtime;
case TaskQueueFactory::Priority::LOW: case TaskQueueFactory::Priority::LOW:
return rtc::kLowPriority; return rtc::ThreadPriority::kLow;
case TaskQueueFactory::Priority::NORMAL: case TaskQueueFactory::Priority::NORMAL:
return rtc::kNormalPriority; return rtc::ThreadPriority::kNormal;
default:
RTC_NOTREACHED();
return rtc::kNormalPriority;
} }
} }
@ -78,8 +75,6 @@ class TaskQueueStdlib final : public TaskQueueBase {
NextTask GetNextTask(); NextTask GetNextTask();
static void ThreadMain(void* context);
void ProcessTasks(); void ProcessTasks();
void NotifyWake(); void NotifyWake();
@ -126,11 +121,13 @@ TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name,
: started_(/*manual_reset=*/false, /*initially_signaled=*/false), : started_(/*manual_reset=*/false, /*initially_signaled=*/false),
stopped_(/*manual_reset=*/false, /*initially_signaled=*/false), stopped_(/*manual_reset=*/false, /*initially_signaled=*/false),
flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false), flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
thread_(&TaskQueueStdlib::ThreadMain, thread_(rtc::PlatformThread::SpawnJoinable(
this, [this] {
queue_name, CurrentTaskQueueSetter set_current(this);
rtc::ThreadAttributes().SetPriority(priority)) { ProcessTasks();
thread_.Start(); },
queue_name,
rtc::ThreadAttributes().SetPriority(priority))) {
started_.Wait(rtc::Event::kForever); started_.Wait(rtc::Event::kForever);
} }
@ -145,7 +142,7 @@ void TaskQueueStdlib::Delete() {
NotifyWake(); NotifyWake();
stopped_.Wait(rtc::Event::kForever); stopped_.Wait(rtc::Event::kForever);
thread_.Stop(); thread_.Finalize();
delete this; delete this;
} }
@ -222,13 +219,6 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
return result; return result;
} }
// static
void TaskQueueStdlib::ThreadMain(void* context) {
TaskQueueStdlib* me = static_cast<TaskQueueStdlib*>(context);
CurrentTaskQueueSetter set_current(me);
me->ProcessTasks();
}
void TaskQueueStdlib::ProcessTasks() { void TaskQueueStdlib::ProcessTasks() {
started_.Set(); started_.Set();

View File

@ -29,16 +29,18 @@
#include <utility> #include <utility>
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/task_queue/queued_task.h" #include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "rtc_base/arraysize.h" #include "rtc_base/arraysize.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/time_utils.h"
namespace webrtc { namespace webrtc {
namespace { namespace {
@ -56,16 +58,12 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
TaskQueueFactory::Priority priority) { TaskQueueFactory::Priority priority) {
switch (priority) { switch (priority) {
case TaskQueueFactory::Priority::HIGH: case TaskQueueFactory::Priority::HIGH:
return rtc::kRealtimePriority; return rtc::ThreadPriority::kRealtime;
case TaskQueueFactory::Priority::LOW: case TaskQueueFactory::Priority::LOW:
return rtc::kLowPriority; return rtc::ThreadPriority::kLow;
case TaskQueueFactory::Priority::NORMAL: case TaskQueueFactory::Priority::NORMAL:
return rtc::kNormalPriority; return rtc::ThreadPriority::kNormal;
default:
RTC_NOTREACHED();
break;
} }
return rtc::kNormalPriority;
} }
int64_t GetTick() { int64_t GetTick() {
@ -167,24 +165,6 @@ class TaskQueueWin : public TaskQueueBase {
void RunPendingTasks(); void RunPendingTasks();
private: private:
static void ThreadMain(void* context);
class WorkerThread : public rtc::PlatformThread {
public:
WorkerThread(rtc::ThreadRunFunction func,
void* obj,
absl::string_view thread_name,
rtc::ThreadPriority priority)
: PlatformThread(func,
obj,
thread_name,
rtc::ThreadAttributes().SetPriority(priority)) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
return rtc::PlatformThread::QueueAPC(apc_function, data);
}
};
void RunThreadMain(); void RunThreadMain();
bool ProcessQueuedMessages(); bool ProcessQueuedMessages();
void RunDueTasks(); void RunDueTasks();
@ -207,7 +187,7 @@ class TaskQueueWin : public TaskQueueBase {
greater<DelayedTaskInfo>> greater<DelayedTaskInfo>>
timer_tasks_; timer_tasks_;
UINT_PTR timer_id_ = 0; UINT_PTR timer_id_ = 0;
WorkerThread thread_; rtc::PlatformThread thread_;
Mutex pending_lock_; Mutex pending_lock_;
std::queue<std::unique_ptr<QueuedTask>> pending_ std::queue<std::unique_ptr<QueuedTask>> pending_
RTC_GUARDED_BY(pending_lock_); RTC_GUARDED_BY(pending_lock_);
@ -216,10 +196,12 @@ class TaskQueueWin : public TaskQueueBase {
TaskQueueWin::TaskQueueWin(absl::string_view queue_name, TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
rtc::ThreadPriority priority) rtc::ThreadPriority priority)
: thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority), : in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
RTC_DCHECK(in_queue_); RTC_DCHECK(in_queue_);
thread_.Start(); thread_ = rtc::PlatformThread::SpawnJoinable(
[this] { RunThreadMain(); }, queue_name,
rtc::ThreadAttributes().SetPriority(priority));
rtc::Event event(false, false); rtc::Event event(false, false);
RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
reinterpret_cast<ULONG_PTR>(&event))); reinterpret_cast<ULONG_PTR>(&event)));
@ -228,11 +210,13 @@ TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
void TaskQueueWin::Delete() { void TaskQueueWin::Delete() {
RTC_DCHECK(!IsCurrent()); RTC_DCHECK(!IsCurrent());
while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { RTC_CHECK(thread_.GetHandle() != absl::nullopt);
while (
!::PostThreadMessage(GetThreadId(*thread_.GetHandle()), WM_QUIT, 0, 0)) {
RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
Sleep(1); Sleep(1);
} }
thread_.Stop(); thread_.Finalize();
::CloseHandle(in_queue_); ::CloseHandle(in_queue_);
delete this; delete this;
} }
@ -255,7 +239,9 @@ void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task,
// and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
// task pointer and timestamp as LPARAM and WPARAM. // task pointer and timestamp as LPARAM and WPARAM.
auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task)); auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0, RTC_CHECK(thread_.GetHandle() != absl::nullopt);
if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()),
WM_QUEUE_DELAYED_TASK, 0,
reinterpret_cast<LPARAM>(task_info))) { reinterpret_cast<LPARAM>(task_info))) {
delete task_info; delete task_info;
} }
@ -277,11 +263,6 @@ void TaskQueueWin::RunPendingTasks() {
} }
} }
// static
void TaskQueueWin::ThreadMain(void* context) {
static_cast<TaskQueueWin*>(context)->RunThreadMain();
}
void TaskQueueWin::RunThreadMain() { void TaskQueueWin::RunThreadMain() {
CurrentTaskQueueSetter set_current(this); CurrentTaskQueueSetter set_current(this);
HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_}; HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_};

View File

@ -153,28 +153,24 @@ class SleepDeadlock : public DeadlockInterface {
} }
}; };
// This is the function that is exectued by the thread that will deadlock and
// have its stacktrace captured.
void ThreadFunction(void* void_params) {
ThreadParams* params = static_cast<ThreadParams*>(void_params);
params->tid = gettid();
params->deadlock_region_start_address = GetCurrentRelativeExecutionAddress();
params->deadlock_start_event.Set();
params->deadlock_impl->Deadlock();
params->deadlock_region_end_address = GetCurrentRelativeExecutionAddress();
params->deadlock_done_event.Set();
}
void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) { void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) {
// Set params that will be sent to other thread. // Set params that will be sent to other thread.
ThreadParams params; ThreadParams params;
params.deadlock_impl = deadlock_impl.get(); params.deadlock_impl = deadlock_impl.get();
// Spawn thread. // Spawn thread.
rtc::PlatformThread thread(&ThreadFunction, &params, "StacktraceTest"); auto thread = rtc::PlatformThread::SpawnJoinable(
thread.Start(); [&params] {
params.tid = gettid();
params.deadlock_region_start_address =
GetCurrentRelativeExecutionAddress();
params.deadlock_start_event.Set();
params.deadlock_impl->Deadlock();
params.deadlock_region_end_address =
GetCurrentRelativeExecutionAddress();
params.deadlock_done_event.Set();
},
"StacktraceTest");
// Wait until the thread has entered the deadlock region, and take a very // Wait until the thread has entered the deadlock region, and take a very
// brief nap to give it time to reach the actual deadlock. // brief nap to give it time to reach the actual deadlock.
@ -198,8 +194,6 @@ void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) {
<< rtc::ToHex(params.deadlock_region_start_address) << ", " << rtc::ToHex(params.deadlock_region_start_address) << ", "
<< rtc::ToHex(params.deadlock_region_end_address) << rtc::ToHex(params.deadlock_region_end_address)
<< "] not contained in: " << StackTraceToString(stack_trace); << "] not contained in: " << StackTraceToString(stack_trace);
thread.Stop();
} }
class LookoutLogSink final : public rtc::LogSink { class LookoutLogSink final : public rtc::LogSink {
@ -259,13 +253,9 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) {
// Start a thread that waits for an event. // Start a thread that waits for an event.
rtc::Event ev; rtc::Event ev;
rtc::PlatformThread thread( auto thread = rtc::PlatformThread::SpawnJoinable(
[](void* arg) { [&ev] { ev.Wait(rtc::Event::kForever); },
auto* ev = static_cast<rtc::Event*>(arg); "TestRtcEventDeadlockDetection");
ev->Wait(rtc::Event::kForever);
},
&ev, "TestRtcEventDeadlockDetection");
thread.Start();
// The message should appear after 3 sec. We'll wait up to 10 sec in an // The message should appear after 3 sec. We'll wait up to 10 sec in an
// attempt to not be flaky. // attempt to not be flaky.
@ -273,7 +263,7 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) {
// Unblock the thread and shut it down. // Unblock the thread and shut it down.
ev.Set(); ev.Set();
thread.Stop(); thread.Finalize();
rtc::LogMessage::RemoveLogToStream(&sink); rtc::LogMessage::RemoveLogToStream(&sink);
} }

View File

@ -142,12 +142,9 @@ void DefaultVideoQualityAnalyzer::Start(
int max_threads_count) { int max_threads_count) {
test_label_ = std::move(test_case_name); test_label_ = std::move(test_case_name);
for (int i = 0; i < max_threads_count; i++) { for (int i = 0; i < max_threads_count; i++) {
auto thread = std::make_unique<rtc::PlatformThread>( thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable(
&DefaultVideoQualityAnalyzer::ProcessComparisonsThread, this, [this] { ProcessComparisons(); },
("DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)).data(), "DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)));
rtc::ThreadAttributes().SetPriority(rtc::kNormalPriority));
thread->Start();
thread_pool_.push_back(std::move(thread));
} }
{ {
MutexLock lock(&lock_); MutexLock lock(&lock_);
@ -547,10 +544,6 @@ void DefaultVideoQualityAnalyzer::Stop() {
} }
StopMeasuringCpuProcessTime(); StopMeasuringCpuProcessTime();
comparison_available_event_.Set(); comparison_available_event_.Set();
for (auto& thread : thread_pool_) {
thread->Stop();
}
// PlatformThread have to be deleted on the same thread, where it was created
thread_pool_.clear(); thread_pool_.clear();
// Perform final Metrics update. On this place analyzer is stopped and no one // Perform final Metrics update. On this place analyzer is stopped and no one
@ -677,10 +670,6 @@ void DefaultVideoQualityAnalyzer::AddComparison(
StopExcludingCpuThreadTime(); StopExcludingCpuThreadTime();
} }
void DefaultVideoQualityAnalyzer::ProcessComparisonsThread(void* obj) {
static_cast<DefaultVideoQualityAnalyzer*>(obj)->ProcessComparisons();
}
void DefaultVideoQualityAnalyzer::ProcessComparisons() { void DefaultVideoQualityAnalyzer::ProcessComparisons() {
while (true) { while (true) {
// Try to pick next comparison to perform from the queue. // Try to pick next comparison to perform from the queue.

View File

@ -560,7 +560,7 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_); std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_);
AnalyzerStats analyzer_stats_ RTC_GUARDED_BY(comparison_lock_); AnalyzerStats analyzer_stats_ RTC_GUARDED_BY(comparison_lock_);
std::vector<std::unique_ptr<rtc::PlatformThread>> thread_pool_; std::vector<rtc::PlatformThread> thread_pool_;
rtc::Event comparison_available_event_; rtc::Event comparison_available_event_;
Mutex cpu_measurement_lock_; Mutex cpu_measurement_lock_;

View File

@ -137,10 +137,12 @@ VideoAnalyzer::VideoAnalyzer(test::LayerFilteringTransport* transport,
} }
for (uint32_t i = 0; i < num_cores; ++i) { for (uint32_t i = 0; i < num_cores; ++i) {
rtc::PlatformThread* thread = comparison_thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable(
new rtc::PlatformThread(&FrameComparisonThread, this, "Analyzer"); [this] {
thread->Start(); while (CompareFrames()) {
comparison_thread_pool_.push_back(thread); }
},
"Analyzer"));
} }
if (!rtp_dump_name.empty()) { if (!rtp_dump_name.empty()) {
@ -155,10 +157,8 @@ VideoAnalyzer::~VideoAnalyzer() {
MutexLock lock(&comparison_lock_); MutexLock lock(&comparison_lock_);
quit_ = true; quit_ = true;
} }
for (rtc::PlatformThread* thread : comparison_thread_pool_) { // Joins all threads.
thread->Stop(); comparison_thread_pool_.clear();
delete thread;
}
} }
void VideoAnalyzer::SetReceiver(PacketReceiver* receiver) { void VideoAnalyzer::SetReceiver(PacketReceiver* receiver) {
@ -533,12 +533,6 @@ void VideoAnalyzer::PollStats() {
memory_usage_.AddSample(rtc::GetProcessResidentSizeBytes()); memory_usage_.AddSample(rtc::GetProcessResidentSizeBytes());
} }
void VideoAnalyzer::FrameComparisonThread(void* obj) {
VideoAnalyzer* analyzer = static_cast<VideoAnalyzer*>(obj);
while (analyzer->CompareFrames()) {
}
}
bool VideoAnalyzer::CompareFrames() { bool VideoAnalyzer::CompareFrames() {
if (AllFramesRecorded()) if (AllFramesRecorded())
return false; return false;

View File

@ -302,7 +302,7 @@ class VideoAnalyzer : public PacketReceiver,
const double avg_ssim_threshold_; const double avg_ssim_threshold_;
bool is_quick_test_enabled_; bool is_quick_test_enabled_;
std::vector<rtc::PlatformThread*> comparison_thread_pool_; std::vector<rtc::PlatformThread> comparison_thread_pool_;
rtc::Event comparison_available_event_; rtc::Event comparison_available_event_;
std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_); std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_);
bool quit_ RTC_GUARDED_BY(comparison_lock_); bool quit_ RTC_GUARDED_BY(comparison_lock_);