From ad5037b4a81cf97c54c5b15fda8ee898f1827e58 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Fri, 7 May 2021 15:02:36 +0200 Subject: [PATCH] Reland "Refactor the PlatformThread API." This reverts commit 793bac569fdf1be16cbf24d7871d20d00bbec81b. Reason for revert: rare compilation error fixed Original change's description: > Revert "Refactor the PlatformThread API." > > This reverts commit c89fdd716c4c8af608017c76f75bf27e4c3d602e. > > Reason for revert: Causes rare compilation error on win-libfuzzer-asan trybot. > See https://ci.chromium.org/p/chromium/builders/try/win-libfuzzer-asan-rel/713745? > > Original change's description: > > Refactor the PlatformThread API. > > > > PlatformThread's API is using old style function pointers, causes > > casting, is unintuitive and forces artificial call sequences, and > > is additionally possible to misuse in release mode. > > > > Fix this by an API face lift: > > 1. The class is turned into a handle, which can be empty. > > 2. The only way of getting a non-empty PlatformThread is by calling > > SpawnJoinable or SpawnDetached, clearly conveying the semantics to the > > code reader. > > 3. Handles can be Finalized, which works differently for joinable and > > detached threads: > > a) Handles for detached threads are simply closed where applicable. > > b) Joinable threads are joined before handles are closed. > > 4. The destructor finalizes handles. No explicit call is needed. > > > > Fixed: webrtc:12727 > > Change-Id: Id00a0464edf4fc9e552b6a1fbb5d2e1280e88811 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215075 > > Commit-Queue: Markus Handell > > Reviewed-by: Harald Alvestrand > > Reviewed-by: Mirko Bonadei > > Reviewed-by: Tommi > > Cr-Commit-Position: refs/heads/master@{#33923} > > # Not skipping CQ checks because original CL landed > 1 day ago. > > TBR=handellm@webrtc.org > > Bug: webrtc:12727 > Change-Id: Ic0146be8866f6dd3ad9c364fb8646650b8e07419 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/217583 > Reviewed-by: Guido Urdaneta > Reviewed-by: Markus Handell > Commit-Queue: Guido Urdaneta > Cr-Commit-Position: refs/heads/master@{#33936} # Not skipping CQ checks because this is a reland. Bug: webrtc:12727 Change-Id: Ifd6f44eac72fed84474277a1be03eb84d2f4376e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/217881 Commit-Queue: Mirko Bonadei Reviewed-by: Mirko Bonadei Reviewed-by: Markus Handell Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/master@{#33950} --- api/sequence_checker_unittest.cc | 23 +- .../acm2/audio_coding_module_unittest.cc | 118 ++++------ .../audio_device/dummy/file_audio_device.cc | 47 ++-- .../audio_device/dummy/file_audio_device.h | 10 +- .../linux/audio_device_alsa_linux.cc | 66 ++---- .../linux/audio_device_alsa_linux.h | 6 +- .../linux/audio_device_pulse_linux.cc | 55 ++--- .../linux/audio_device_pulse_linux.h | 5 +- modules/audio_device/mac/audio_device_mac.cc | 53 ++--- modules/audio_device/mac/audio_device_mac.h | 11 +- .../audio_device/win/core_audio_base_win.cc | 43 ++-- .../audio_device/win/core_audio_base_win.h | 2 +- .../audio_processing_impl_locking_unittest.cc | 78 +++---- .../audio_processing_performance_unittest.cc | 53 ++--- .../desktop_capture/screen_drawer_unittest.cc | 14 +- modules/utility/source/process_thread_impl.cc | 33 ++- modules/utility/source/process_thread_impl.h | 4 +- .../linux/video_capture_linux.cc | 25 +- .../video_capture/linux/video_capture_linux.h | 3 +- rtc_base/BUILD.gn | 7 +- rtc_base/async_resolver.cc | 12 +- rtc_base/cpu_time_unittest.cc | 17 +- .../recursive_critical_section_unittest.cc | 19 +- rtc_base/event_tracer.cc | 16 +- rtc_base/event_unittest.cc | 19 +- rtc_base/logging_unittest.cc | 13 +- rtc_base/platform_thread.cc | 221 +++++++++--------- rtc_base/platform_thread.h | 134 ++++++----- rtc_base/platform_thread_unittest.cc | 106 ++++----- rtc_base/rate_limiter_unittest.cc | 20 +- rtc_base/task_queue_libevent.cc | 46 ++-- rtc_base/task_queue_stdlib.cc | 32 +-- rtc_base/task_queue_win.cc | 57 ++--- .../stacktrace/stacktrace_unittest.cc | 42 ++-- .../video/default_video_quality_analyzer.cc | 17 +- .../video/default_video_quality_analyzer.h | 2 +- video/video_analyzer.cc | 22 +- video/video_analyzer.h | 2 +- 38 files changed, 588 insertions(+), 865 deletions(-) diff --git a/api/sequence_checker_unittest.cc b/api/sequence_checker_unittest.cc index 4029b8c9a0..21a0894a8e 100644 --- a/api/sequence_checker_unittest.cc +++ b/api/sequence_checker_unittest.cc @@ -40,21 +40,14 @@ class CompileTimeTestForGuardedBy { }; void RunOnDifferentThread(rtc::FunctionView run) { - struct Object { - static void Run(void* obj) { - auto* me = static_cast(obj); - me->run(); - me->thread_has_run_event.Set(); - } - - rtc::FunctionView run; - rtc::Event thread_has_run_event; - } object{run}; - - rtc::PlatformThread thread(&Object::Run, &object, "thread"); - thread.Start(); - EXPECT_TRUE(object.thread_has_run_event.Wait(1000)); - thread.Stop(); + rtc::Event thread_has_run_event; + rtc::PlatformThread::SpawnJoinable( + [&] { + run(); + thread_has_run_event.Set(); + }, + "thread"); + EXPECT_TRUE(thread_has_run_event.Wait(1000)); } } // namespace diff --git a/modules/audio_coding/acm2/audio_coding_module_unittest.cc b/modules/audio_coding/acm2/audio_coding_module_unittest.cc index 7a962e5ce3..5b0577745c 100644 --- a/modules/audio_coding/acm2/audio_coding_module_unittest.cc +++ b/modules/audio_coding/acm2/audio_coding_module_unittest.cc @@ -429,21 +429,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { AudioCodingModuleMtTestOldApi() : AudioCodingModuleTestOldApi(), - send_thread_( - CbSendThread, - this, - "send", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), - insert_packet_thread_( - CbInsertPacketThread, - this, - "insert_packet", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), - pull_audio_thread_( - CbPullAudioThread, - this, - "pull_audio", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), send_count_(0), insert_packet_count_(0), pull_audio_count_(0), @@ -460,17 +445,38 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { void StartThreads() { quit_.store(false); - send_thread_.Start(); - insert_packet_thread_.Start(); - pull_audio_thread_.Start(); + + const auto attributes = + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); + send_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!quit_.load()) { + CbSendImpl(); + } + }, + "send", attributes); + insert_packet_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!quit_.load()) { + CbInsertPacketImpl(); + } + }, + "insert_packet", attributes); + pull_audio_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!quit_.load()) { + CbPullAudioImpl(); + } + }, + "pull_audio", attributes); } void TearDown() { AudioCodingModuleTestOldApi::TearDown(); quit_.store(true); - pull_audio_thread_.Stop(); - send_thread_.Stop(); - insert_packet_thread_.Stop(); + pull_audio_thread_.Finalize(); + send_thread_.Finalize(); + insert_packet_thread_.Finalize(); } bool RunTest() { @@ -488,14 +494,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { return false; } - static void CbSendThread(void* context) { - AudioCodingModuleMtTestOldApi* fixture = - reinterpret_cast(context); - while (!fixture->quit_.load()) { - fixture->CbSendImpl(); - } - } - // The send thread doesn't have to care about the current simulated time, // since only the AcmReceiver is using the clock. void CbSendImpl() { @@ -511,14 +509,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { } } - static void CbInsertPacketThread(void* context) { - AudioCodingModuleMtTestOldApi* fixture = - reinterpret_cast(context); - while (!fixture->quit_.load()) { - fixture->CbInsertPacketImpl(); - } - } - void CbInsertPacketImpl() { SleepMs(1); { @@ -533,14 +523,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { InsertPacket(); } - static void CbPullAudioThread(void* context) { - AudioCodingModuleMtTestOldApi* fixture = - reinterpret_cast(context); - while (!fixture->quit_.load()) { - fixture->CbPullAudioImpl(); - } - } - void CbPullAudioImpl() { SleepMs(1); { @@ -699,16 +681,6 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi { AcmReRegisterIsacMtTestOldApi() : AudioCodingModuleTestOldApi(), - receive_thread_( - CbReceiveThread, - this, - "receive", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), - codec_registration_thread_( - CbCodecRegistrationThread, - this, - "codec_registration", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), codec_registered_(false), receive_packet_count_(0), next_insert_packet_time_ms_(0), @@ -740,28 +712,34 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi { void StartThreads() { quit_.store(false); - receive_thread_.Start(); - codec_registration_thread_.Start(); + const auto attributes = + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); + receive_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!quit_.load() && CbReceiveImpl()) { + } + }, + "receive", attributes); + codec_registration_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!quit_.load()) { + CbCodecRegistrationImpl(); + } + }, + "codec_registration", attributes); } void TearDown() override { AudioCodingModuleTestOldApi::TearDown(); quit_.store(true); - receive_thread_.Stop(); - codec_registration_thread_.Stop(); + receive_thread_.Finalize(); + codec_registration_thread_.Finalize(); } bool RunTest() { return test_complete_.Wait(10 * 60 * 1000); // 10 minutes' timeout. } - static void CbReceiveThread(void* context) { - AcmReRegisterIsacMtTestOldApi* fixture = - reinterpret_cast(context); - while (!fixture->quit_.load() && fixture->CbReceiveImpl()) { - } - } - bool CbReceiveImpl() { SleepMs(1); rtc::Buffer encoded; @@ -807,14 +785,6 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi { return true; } - static void CbCodecRegistrationThread(void* context) { - AcmReRegisterIsacMtTestOldApi* fixture = - reinterpret_cast(context); - while (!fixture->quit_.load()) { - fixture->CbCodecRegistrationImpl(); - } - } - void CbCodecRegistrationImpl() { SleepMs(1); if (HasFatalFailure()) { diff --git a/modules/audio_device/dummy/file_audio_device.cc b/modules/audio_device/dummy/file_audio_device.cc index 90bba05296..e345a16c44 100644 --- a/modules/audio_device/dummy/file_audio_device.cc +++ b/modules/audio_device/dummy/file_audio_device.cc @@ -216,10 +216,13 @@ int32_t FileAudioDevice::StartPlayout() { } } - _ptrThreadPlay.reset(new rtc::PlatformThread( - PlayThreadFunc, this, "webrtc_audio_module_play_thread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - _ptrThreadPlay->Start(); + _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( + [this] { + while (PlayThreadProcess()) { + } + }, + "webrtc_audio_module_play_thread", + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); RTC_LOG(LS_INFO) << "Started playout capture to output file: " << _outputFilename; @@ -233,10 +236,8 @@ int32_t FileAudioDevice::StopPlayout() { } // stop playout thread first - if (_ptrThreadPlay) { - _ptrThreadPlay->Stop(); - _ptrThreadPlay.reset(); - } + if (!_ptrThreadPlay.empty()) + _ptrThreadPlay.Finalize(); MutexLock lock(&mutex_); @@ -276,11 +277,13 @@ int32_t FileAudioDevice::StartRecording() { } } - _ptrThreadRec.reset(new rtc::PlatformThread( - RecThreadFunc, this, "webrtc_audio_module_capture_thread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - - _ptrThreadRec->Start(); + _ptrThreadRec = rtc::PlatformThread::SpawnJoinable( + [this] { + while (RecThreadProcess()) { + } + }, + "webrtc_audio_module_capture_thread", + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); RTC_LOG(LS_INFO) << "Started recording from input file: " << _inputFilename; @@ -293,10 +296,8 @@ int32_t FileAudioDevice::StopRecording() { _recording = false; } - if (_ptrThreadRec) { - _ptrThreadRec->Stop(); - _ptrThreadRec.reset(); - } + if (!_ptrThreadRec.empty()) + _ptrThreadRec.Finalize(); MutexLock lock(&mutex_); _recordingFramesLeft = 0; @@ -439,18 +440,6 @@ void FileAudioDevice::AttachAudioBuffer(AudioDeviceBuffer* audioBuffer) { _ptrAudioBuffer->SetPlayoutChannels(0); } -void FileAudioDevice::PlayThreadFunc(void* pThis) { - FileAudioDevice* device = static_cast(pThis); - while (device->PlayThreadProcess()) { - } -} - -void FileAudioDevice::RecThreadFunc(void* pThis) { - FileAudioDevice* device = static_cast(pThis); - while (device->RecThreadProcess()) { - } -} - bool FileAudioDevice::PlayThreadProcess() { if (!_playing) { return false; diff --git a/modules/audio_device/dummy/file_audio_device.h b/modules/audio_device/dummy/file_audio_device.h index ecb3f2f533..f4a6b76586 100644 --- a/modules/audio_device/dummy/file_audio_device.h +++ b/modules/audio_device/dummy/file_audio_device.h @@ -17,14 +17,11 @@ #include #include "modules/audio_device/audio_device_generic.h" +#include "rtc_base/platform_thread.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/file_wrapper.h" #include "rtc_base/time_utils.h" -namespace rtc { -class PlatformThread; -} // namespace rtc - namespace webrtc { // This is a fake audio device which plays audio from a file as its microphone @@ -145,9 +142,8 @@ class FileAudioDevice : public AudioDeviceGeneric { size_t _recordingFramesIn10MS; size_t _playoutFramesIn10MS; - // TODO(pbos): Make plain members instead of pointers and stop resetting them. - std::unique_ptr _ptrThreadRec; - std::unique_ptr _ptrThreadPlay; + rtc::PlatformThread _ptrThreadRec; + rtc::PlatformThread _ptrThreadPlay; bool _playing; bool _recording; diff --git a/modules/audio_device/linux/audio_device_alsa_linux.cc b/modules/audio_device/linux/audio_device_alsa_linux.cc index eb3466258e..9e6bd168fc 100644 --- a/modules/audio_device/linux/audio_device_alsa_linux.cc +++ b/modules/audio_device/linux/audio_device_alsa_linux.cc @@ -178,26 +178,13 @@ int32_t AudioDeviceLinuxALSA::Terminate() { _mixerManager.Close(); // RECORDING - if (_ptrThreadRec) { - rtc::PlatformThread* tmpThread = _ptrThreadRec.release(); - mutex_.Unlock(); - - tmpThread->Stop(); - delete tmpThread; - - mutex_.Lock(); - } + mutex_.Unlock(); + _ptrThreadRec.Finalize(); // PLAYOUT - if (_ptrThreadPlay) { - rtc::PlatformThread* tmpThread = _ptrThreadPlay.release(); - mutex_.Unlock(); + _ptrThreadPlay.Finalize(); + mutex_.Lock(); - tmpThread->Stop(); - delete tmpThread; - - mutex_.Lock(); - } #if defined(WEBRTC_USE_X11) if (_XDisplay) { XCloseDisplay(_XDisplay); @@ -1040,11 +1027,13 @@ int32_t AudioDeviceLinuxALSA::StartRecording() { return -1; } // RECORDING - _ptrThreadRec.reset(new rtc::PlatformThread( - RecThreadFunc, this, "webrtc_audio_module_capture_thread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - - _ptrThreadRec->Start(); + _ptrThreadRec = rtc::PlatformThread::SpawnJoinable( + [this] { + while (RecThreadProcess()) { + } + }, + "webrtc_audio_module_capture_thread", + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); errVal = LATE(snd_pcm_prepare)(_handleRecord); if (errVal < 0) { @@ -1088,10 +1077,7 @@ int32_t AudioDeviceLinuxALSA::StopRecordingLocked() { _recIsInitialized = false; _recording = false; - if (_ptrThreadRec) { - _ptrThreadRec->Stop(); - _ptrThreadRec.reset(); - } + _ptrThreadRec.Finalize(); _recordingFramesLeft = 0; if (_recordingBuffer) { @@ -1158,10 +1144,13 @@ int32_t AudioDeviceLinuxALSA::StartPlayout() { } // PLAYOUT - _ptrThreadPlay.reset(new rtc::PlatformThread( - PlayThreadFunc, this, "webrtc_audio_module_play_thread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - _ptrThreadPlay->Start(); + _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( + [this] { + while (PlayThreadProcess()) { + } + }, + "webrtc_audio_module_play_thread", + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); int errVal = LATE(snd_pcm_prepare)(_handlePlayout); if (errVal < 0) { @@ -1191,10 +1180,7 @@ int32_t AudioDeviceLinuxALSA::StopPlayoutLocked() { _playing = false; // stop playout thread first - if (_ptrThreadPlay) { - _ptrThreadPlay->Stop(); - _ptrThreadPlay.reset(); - } + _ptrThreadPlay.Finalize(); _playoutFramesLeft = 0; delete[] _playoutBuffer; @@ -1469,18 +1455,6 @@ int32_t AudioDeviceLinuxALSA::ErrorRecovery(int32_t error, // Thread Methods // ============================================================================ -void AudioDeviceLinuxALSA::PlayThreadFunc(void* pThis) { - AudioDeviceLinuxALSA* device = static_cast(pThis); - while (device->PlayThreadProcess()) { - } -} - -void AudioDeviceLinuxALSA::RecThreadFunc(void* pThis) { - AudioDeviceLinuxALSA* device = static_cast(pThis); - while (device->RecThreadProcess()) { - } -} - bool AudioDeviceLinuxALSA::PlayThreadProcess() { if (!_playing) return false; diff --git a/modules/audio_device/linux/audio_device_alsa_linux.h b/modules/audio_device/linux/audio_device_alsa_linux.h index 410afcf42c..1f4a231640 100644 --- a/modules/audio_device/linux/audio_device_alsa_linux.h +++ b/modules/audio_device/linux/audio_device_alsa_linux.h @@ -155,10 +155,8 @@ class AudioDeviceLinuxALSA : public AudioDeviceGeneric { Mutex mutex_; - // TODO(pbos): Make plain members and start/stop instead of resetting these - // pointers. A thread can be reused. - std::unique_ptr _ptrThreadRec; - std::unique_ptr _ptrThreadPlay; + rtc::PlatformThread _ptrThreadRec; + rtc::PlatformThread _ptrThreadPlay; AudioMixerManagerLinuxALSA _mixerManager; diff --git a/modules/audio_device/linux/audio_device_pulse_linux.cc b/modules/audio_device/linux/audio_device_pulse_linux.cc index 942e60da53..7742420fc2 100644 --- a/modules/audio_device/linux/audio_device_pulse_linux.cc +++ b/modules/audio_device/linux/audio_device_pulse_linux.cc @@ -15,6 +15,7 @@ #include "modules/audio_device/linux/latebindingsymboltable_linux.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/platform_thread.h" WebRTCPulseSymbolTable* GetPulseSymbolTable() { static WebRTCPulseSymbolTable* pulse_symbol_table = @@ -158,18 +159,22 @@ AudioDeviceGeneric::InitStatus AudioDeviceLinuxPulse::Init() { #endif // RECORDING - _ptrThreadRec.reset(new rtc::PlatformThread( - RecThreadFunc, this, "webrtc_audio_module_rec_thread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - - _ptrThreadRec->Start(); + const auto attributes = + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); + _ptrThreadRec = rtc::PlatformThread::SpawnJoinable( + [this] { + while (RecThreadProcess()) { + } + }, + "webrtc_audio_module_rec_thread", attributes); // PLAYOUT - _ptrThreadPlay.reset(new rtc::PlatformThread( - PlayThreadFunc, this, "webrtc_audio_module_play_thread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - _ptrThreadPlay->Start(); - + _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( + [this] { + while (PlayThreadProcess()) { + } + }, + "webrtc_audio_module_play_thread", attributes); _initialized = true; return InitStatus::OK; @@ -187,22 +192,12 @@ int32_t AudioDeviceLinuxPulse::Terminate() { _mixerManager.Close(); // RECORDING - if (_ptrThreadRec) { - rtc::PlatformThread* tmpThread = _ptrThreadRec.release(); - - _timeEventRec.Set(); - tmpThread->Stop(); - delete tmpThread; - } + _timeEventRec.Set(); + _ptrThreadRec.Finalize(); // PLAYOUT - if (_ptrThreadPlay) { - rtc::PlatformThread* tmpThread = _ptrThreadPlay.release(); - - _timeEventPlay.Set(); - tmpThread->Stop(); - delete tmpThread; - } + _timeEventPlay.Set(); + _ptrThreadPlay.Finalize(); // Terminate PulseAudio if (TerminatePulseAudio() < 0) { @@ -1981,18 +1976,6 @@ int32_t AudioDeviceLinuxPulse::ProcessRecordedData(int8_t* bufferData, return 0; } -void AudioDeviceLinuxPulse::PlayThreadFunc(void* pThis) { - AudioDeviceLinuxPulse* device = static_cast(pThis); - while (device->PlayThreadProcess()) { - } -} - -void AudioDeviceLinuxPulse::RecThreadFunc(void* pThis) { - AudioDeviceLinuxPulse* device = static_cast(pThis); - while (device->RecThreadProcess()) { - } -} - bool AudioDeviceLinuxPulse::PlayThreadProcess() { if (!_timeEventPlay.Wait(1000)) { return true; diff --git a/modules/audio_device/linux/audio_device_pulse_linux.h b/modules/audio_device/linux/audio_device_pulse_linux.h index cfad6b1c15..0cf89ef011 100644 --- a/modules/audio_device/linux/audio_device_pulse_linux.h +++ b/modules/audio_device/linux/audio_device_pulse_linux.h @@ -268,9 +268,8 @@ class AudioDeviceLinuxPulse : public AudioDeviceGeneric { rtc::Event _recStartEvent; rtc::Event _playStartEvent; - // TODO(pbos): Remove unique_ptr and use directly without resetting. - std::unique_ptr _ptrThreadPlay; - std::unique_ptr _ptrThreadRec; + rtc::PlatformThread _ptrThreadPlay; + rtc::PlatformThread _ptrThreadRec; AudioMixerManagerLinuxPulse _mixerManager; diff --git a/modules/audio_device/mac/audio_device_mac.cc b/modules/audio_device/mac/audio_device_mac.cc index f143a43f00..2088b017a0 100644 --- a/modules/audio_device/mac/audio_device_mac.cc +++ b/modules/audio_device/mac/audio_device_mac.cc @@ -166,8 +166,8 @@ AudioDeviceMac::~AudioDeviceMac() { Terminate(); } - RTC_DCHECK(!capture_worker_thread_.get()); - RTC_DCHECK(!render_worker_thread_.get()); + RTC_DCHECK(capture_worker_thread_.empty()); + RTC_DCHECK(render_worker_thread_.empty()); if (_paRenderBuffer) { delete _paRenderBuffer; @@ -1308,12 +1308,14 @@ int32_t AudioDeviceMac::StartRecording() { return -1; } - RTC_DCHECK(!capture_worker_thread_.get()); - capture_worker_thread_.reset(new rtc::PlatformThread( - RunCapture, this, "CaptureWorkerThread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - RTC_DCHECK(capture_worker_thread_.get()); - capture_worker_thread_->Start(); + RTC_DCHECK(capture_worker_thread_.empty()); + capture_worker_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (CaptureWorkerThread()) { + } + }, + "CaptureWorkerThread", + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); OSStatus err = noErr; if (_twoDevices) { @@ -1395,10 +1397,9 @@ int32_t AudioDeviceMac::StopRecording() { // Setting this signal will allow the worker thread to be stopped. AtomicSet32(&_captureDeviceIsAlive, 0); - if (capture_worker_thread_.get()) { + if (!capture_worker_thread_.empty()) { mutex_.Unlock(); - capture_worker_thread_->Stop(); - capture_worker_thread_.reset(); + capture_worker_thread_.Finalize(); mutex_.Lock(); } @@ -1444,11 +1445,14 @@ int32_t AudioDeviceMac::StartPlayout() { return 0; } - RTC_DCHECK(!render_worker_thread_.get()); - render_worker_thread_.reset(new rtc::PlatformThread( - RunRender, this, "RenderWorkerThread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); - render_worker_thread_->Start(); + RTC_DCHECK(render_worker_thread_.empty()); + render_worker_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (RenderWorkerThread()) { + } + }, + "RenderWorkerThread", + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); if (_twoDevices || !_recording) { OSStatus err = noErr; @@ -1506,10 +1510,9 @@ int32_t AudioDeviceMac::StopPlayout() { // Setting this signal will allow the worker thread to be stopped. AtomicSet32(&_renderDeviceIsAlive, 0); - if (render_worker_thread_.get()) { + if (!render_worker_thread_.empty()) { mutex_.Unlock(); - render_worker_thread_->Stop(); - render_worker_thread_.reset(); + render_worker_thread_.Finalize(); mutex_.Lock(); } @@ -2371,12 +2374,6 @@ OSStatus AudioDeviceMac::implInConverterProc(UInt32* numberDataPackets, return 0; } -void AudioDeviceMac::RunRender(void* ptrThis) { - AudioDeviceMac* device = static_cast(ptrThis); - while (device->RenderWorkerThread()) { - } -} - bool AudioDeviceMac::RenderWorkerThread() { PaRingBufferSize numSamples = ENGINE_PLAY_BUF_SIZE_IN_SAMPLES * _outDesiredFormat.mChannelsPerFrame; @@ -2442,12 +2439,6 @@ bool AudioDeviceMac::RenderWorkerThread() { return true; } -void AudioDeviceMac::RunCapture(void* ptrThis) { - AudioDeviceMac* device = static_cast(ptrThis); - while (device->CaptureWorkerThread()) { - } -} - bool AudioDeviceMac::CaptureWorkerThread() { OSStatus err = noErr; UInt32 noRecSamples = diff --git a/modules/audio_device/mac/audio_device_mac.h b/modules/audio_device/mac/audio_device_mac.h index 985db9da52..f9504b64b5 100644 --- a/modules/audio_device/mac/audio_device_mac.h +++ b/modules/audio_device/mac/audio_device_mac.h @@ -21,15 +21,12 @@ #include "modules/audio_device/mac/audio_mixer_manager_mac.h" #include "rtc_base/event.h" #include "rtc_base/logging.h" +#include "rtc_base/platform_thread.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" struct PaUtilRingBuffer; -namespace rtc { -class PlatformThread; -} // namespace rtc - namespace webrtc { const uint32_t N_REC_SAMPLES_PER_SEC = 48000; @@ -271,13 +268,11 @@ class AudioDeviceMac : public AudioDeviceGeneric { rtc::Event _stopEventRec; rtc::Event _stopEvent; - // TODO(pbos): Replace with direct members, just start/stop, no need to - // recreate the thread. // Only valid/running between calls to StartRecording and StopRecording. - std::unique_ptr capture_worker_thread_; + rtc::PlatformThread capture_worker_thread_; // Only valid/running between calls to StartPlayout and StopPlayout. - std::unique_ptr render_worker_thread_; + rtc::PlatformThread render_worker_thread_; AudioMixerManagerMac _mixerManager; diff --git a/modules/audio_device/win/core_audio_base_win.cc b/modules/audio_device/win/core_audio_base_win.cc index 59debc07a9..7d93fcb14a 100644 --- a/modules/audio_device/win/core_audio_base_win.cc +++ b/modules/audio_device/win/core_audio_base_win.cc @@ -119,11 +119,6 @@ const char* SessionDisconnectReasonToString( } } -void Run(void* obj) { - RTC_DCHECK(obj); - reinterpret_cast(obj)->ThreadRun(); -} - // Returns true if the selected audio device supports low latency, i.e, if it // is possible to initialize the engine using periods less than the default // period (10ms). @@ -553,24 +548,19 @@ bool CoreAudioBase::Start() { // Audio thread should be alive during internal restart since the restart // callback is triggered on that thread and it also makes the restart // sequence less complex. - RTC_DCHECK(audio_thread_); + RTC_DCHECK(!audio_thread_.empty()); } // Start an audio thread but only if one does not already exist (which is the // case during restart). - if (!audio_thread_) { - audio_thread_ = std::make_unique( - Run, this, IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)); - RTC_DCHECK(audio_thread_); - audio_thread_->Start(); - if (!audio_thread_->IsRunning()) { - StopThread(); - RTC_LOG(LS_ERROR) << "Failed to start audio thread"; - return false; - } - RTC_DLOG(INFO) << "Started thread with name: " << audio_thread_->name() - << " and id: " << audio_thread_->GetThreadRef(); + if (audio_thread_.empty()) { + const absl::string_view name = + IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread"; + audio_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { ThreadRun(); }, name, + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); + RTC_DLOG(INFO) << "Started thread with name: " << name + << " and handle: " << *audio_thread_.GetHandle(); } // Start streaming data between the endpoint buffer and the audio engine. @@ -697,14 +687,11 @@ bool CoreAudioBase::Restart() { void CoreAudioBase::StopThread() { RTC_DLOG(INFO) << __FUNCTION__; RTC_DCHECK(!IsRestarting()); - if (audio_thread_) { - if (audio_thread_->IsRunning()) { - RTC_DLOG(INFO) << "Sets stop_event..."; - SetEvent(stop_event_.Get()); - RTC_DLOG(INFO) << "PlatformThread::Stop..."; - audio_thread_->Stop(); - } - audio_thread_.reset(); + if (!audio_thread_.empty()) { + RTC_DLOG(INFO) << "Sets stop_event..."; + SetEvent(stop_event_.Get()); + RTC_DLOG(INFO) << "PlatformThread::Finalize..."; + audio_thread_.Finalize(); // Ensure that we don't quit the main thread loop immediately next // time Start() is called. @@ -717,7 +704,7 @@ bool CoreAudioBase::HandleRestartEvent() { RTC_DLOG(INFO) << __FUNCTION__ << "[" << DirectionToString(direction()) << "]"; RTC_DCHECK_RUN_ON(&thread_checker_audio_); - RTC_DCHECK(audio_thread_); + RTC_DCHECK(!audio_thread_.empty()); RTC_DCHECK(IsRestarting()); // Let each client (input and/or output) take care of its own restart // sequence since each side might need unique actions. diff --git a/modules/audio_device/win/core_audio_base_win.h b/modules/audio_device/win/core_audio_base_win.h index 2a57636640..afcc6a684d 100644 --- a/modules/audio_device/win/core_audio_base_win.h +++ b/modules/audio_device/win/core_audio_base_win.h @@ -158,7 +158,7 @@ class CoreAudioBase : public IAudioSessionEvents { // Set when restart process starts and cleared when restart stops // successfully. Accessed atomically. std::atomic is_restarting_; - std::unique_ptr audio_thread_; + rtc::PlatformThread audio_thread_; Microsoft::WRL::ComPtr audio_session_control_; void StopThread(); diff --git a/modules/audio_processing/audio_processing_impl_locking_unittest.cc b/modules/audio_processing/audio_processing_impl_locking_unittest.cc index 1f065ffe52..66c1251d4c 100644 --- a/modules/audio_processing/audio_processing_impl_locking_unittest.cc +++ b/modules/audio_processing/audio_processing_impl_locking_unittest.cc @@ -387,33 +387,6 @@ class AudioProcessingImplLockTest void SetUp() override; void TearDown() override; - // Thread callback for the render thread - static void RenderProcessorThreadFunc(void* context) { - AudioProcessingImplLockTest* impl = - reinterpret_cast(context); - while (!impl->MaybeEndTest()) { - impl->render_thread_state_.Process(); - } - } - - // Thread callback for the capture thread - static void CaptureProcessorThreadFunc(void* context) { - AudioProcessingImplLockTest* impl = - reinterpret_cast(context); - while (!impl->MaybeEndTest()) { - impl->capture_thread_state_.Process(); - } - } - - // Thread callback for the stats thread - static void StatsProcessorThreadFunc(void* context) { - AudioProcessingImplLockTest* impl = - reinterpret_cast(context); - while (!impl->MaybeEndTest()) { - impl->stats_thread_state_.Process(); - } - } - // Tests whether all the required render and capture side calls have been // done. bool TestDone() { @@ -423,9 +396,28 @@ class AudioProcessingImplLockTest // Start the threads used in the test. void StartThreads() { - render_thread_.Start(); - capture_thread_.Start(); - stats_thread_.Start(); + const auto attributes = + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); + render_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!MaybeEndTest()) + render_thread_state_.Process(); + }, + "render", attributes); + capture_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!MaybeEndTest()) { + capture_thread_state_.Process(); + } + }, + "capture", attributes); + + stats_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (!MaybeEndTest()) + stats_thread_state_.Process(); + }, + "stats", attributes); } // Event handlers for the test. @@ -434,9 +426,6 @@ class AudioProcessingImplLockTest rtc::Event capture_call_event_; // Thread related variables. - rtc::PlatformThread render_thread_; - rtc::PlatformThread capture_thread_; - rtc::PlatformThread stats_thread_; mutable RandomGenerator rand_gen_; std::unique_ptr apm_; @@ -445,6 +434,9 @@ class AudioProcessingImplLockTest RenderProcessor render_thread_state_; CaptureProcessor capture_thread_state_; StatsProcessor stats_thread_state_; + rtc::PlatformThread render_thread_; + rtc::PlatformThread capture_thread_; + rtc::PlatformThread stats_thread_; }; // Sleeps a random time between 0 and max_sleep milliseconds. @@ -485,22 +477,7 @@ void PopulateAudioFrame(float amplitude, } AudioProcessingImplLockTest::AudioProcessingImplLockTest() - : render_thread_( - RenderProcessorThreadFunc, - this, - "render", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), - capture_thread_( - CaptureProcessorThreadFunc, - this, - "capture", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), - stats_thread_( - StatsProcessorThreadFunc, - this, - "stats", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), - apm_(AudioProcessingBuilderForTesting().Create()), + : apm_(AudioProcessingBuilderForTesting().Create()), render_thread_state_(kMaxFrameSize, &rand_gen_, &render_call_event_, @@ -552,9 +529,6 @@ void AudioProcessingImplLockTest::SetUp() { void AudioProcessingImplLockTest::TearDown() { render_call_event_.Set(); capture_call_event_.Set(); - render_thread_.Stop(); - capture_thread_.Stop(); - stats_thread_.Stop(); } StatsProcessor::StatsProcessor(RandomGenerator* rand_gen, diff --git a/modules/audio_processing/audio_processing_performance_unittest.cc b/modules/audio_processing/audio_processing_performance_unittest.cc index 9063cf4a93..9585850296 100644 --- a/modules/audio_processing/audio_processing_performance_unittest.cc +++ b/modules/audio_processing/audio_processing_performance_unittest.cc @@ -391,17 +391,7 @@ class TimedThreadApiProcessor { class CallSimulator : public ::testing::TestWithParam { public: CallSimulator() - : render_thread_(new rtc::PlatformThread( - RenderProcessorThreadFunc, - this, - "render", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))), - capture_thread_(new rtc::PlatformThread( - CaptureProcessorThreadFunc, - this, - "capture", - rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))), - rand_gen_(42U), + : rand_gen_(42U), simulation_config_(static_cast(GetParam())) {} // Run the call simulation with a timeout. @@ -436,13 +426,10 @@ class CallSimulator : public ::testing::TestWithParam { static const int kMinNumFramesToProcess = 150; static const int32_t kTestTimeout = 3 * 10 * kMinNumFramesToProcess; - // ::testing::TestWithParam<> implementation. - void TearDown() override { StopThreads(); } - // Stop all running threads. void StopThreads() { - render_thread_->Stop(); - capture_thread_->Stop(); + render_thread_.Finalize(); + capture_thread_.Finalize(); } // Simulator and APM setup. @@ -533,32 +520,28 @@ class CallSimulator : public ::testing::TestWithParam { kMinNumFramesToProcess, kCaptureInputFloatLevel, num_capture_channels)); } - // Thread callback for the render thread. - static void RenderProcessorThreadFunc(void* context) { - CallSimulator* call_simulator = reinterpret_cast(context); - while (call_simulator->render_thread_state_->Process()) { - } - } - - // Thread callback for the capture thread. - static void CaptureProcessorThreadFunc(void* context) { - CallSimulator* call_simulator = reinterpret_cast(context); - while (call_simulator->capture_thread_state_->Process()) { - } - } - // Start the threads used in the test. void StartThreads() { - ASSERT_NO_FATAL_FAILURE(render_thread_->Start()); - ASSERT_NO_FATAL_FAILURE(capture_thread_->Start()); + const auto attributes = + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); + render_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (render_thread_state_->Process()) { + } + }, + "render", attributes); + capture_thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + while (capture_thread_state_->Process()) { + } + }, + "capture", attributes); } // Event handler for the test. rtc::Event test_complete_; // Thread related variables. - std::unique_ptr render_thread_; - std::unique_ptr capture_thread_; Random rand_gen_; std::unique_ptr apm_; @@ -567,6 +550,8 @@ class CallSimulator : public ::testing::TestWithParam { LockedFlag capture_call_checker_; std::unique_ptr render_thread_state_; std::unique_ptr capture_thread_state_; + rtc::PlatformThread render_thread_; + rtc::PlatformThread capture_thread_; }; // Implements the callback functionality for the threads. diff --git a/modules/desktop_capture/screen_drawer_unittest.cc b/modules/desktop_capture/screen_drawer_unittest.cc index c38eee6991..2394260105 100644 --- a/modules/desktop_capture/screen_drawer_unittest.cc +++ b/modules/desktop_capture/screen_drawer_unittest.cc @@ -48,13 +48,12 @@ void TestScreenDrawerLock( ~Task() = default; - static void RunTask(void* me) { - Task* task = static_cast(me); - std::unique_ptr lock = task->ctor_(); + void RunTask() { + std::unique_ptr lock = ctor_(); ASSERT_TRUE(!!lock); - task->created_->store(true); + created_->store(true); // Wait for the main thread to get the signal of created_. - while (!task->ready_.load()) { + while (!ready_.load()) { SleepMs(1); } // At this point, main thread should begin to create a second lock. Though @@ -77,8 +76,8 @@ void TestScreenDrawerLock( const rtc::FunctionView()> ctor_; } task(&created, ready, ctor); - rtc::PlatformThread lock_thread(&Task::RunTask, &task, "lock_thread"); - lock_thread.Start(); + auto lock_thread = rtc::PlatformThread::SpawnJoinable( + [&task] { task.RunTask(); }, "lock_thread"); // Wait for the first lock in Task::RunTask() to be created. // TODO(zijiehe): Find a better solution to wait for the creation of the first @@ -95,7 +94,6 @@ void TestScreenDrawerLock( ASSERT_GT(kLockDurationMs, rtc::TimeMillis() - start_ms); ctor(); ASSERT_LE(kLockDurationMs, rtc::TimeMillis() - start_ms); - lock_thread.Stop(); } } // namespace diff --git a/modules/utility/source/process_thread_impl.cc b/modules/utility/source/process_thread_impl.cc index dc2a0066e9..cdc2fa1005 100644 --- a/modules/utility/source/process_thread_impl.cc +++ b/modules/utility/source/process_thread_impl.cc @@ -48,7 +48,6 @@ ProcessThreadImpl::ProcessThreadImpl(const char* thread_name) ProcessThreadImpl::~ProcessThreadImpl() { RTC_DCHECK(thread_checker_.IsCurrent()); - RTC_DCHECK(!thread_.get()); RTC_DCHECK(!stop_); while (!delayed_tasks_.empty()) { @@ -72,8 +71,8 @@ void ProcessThreadImpl::Delete() { // Doesn't need locking, because the contending thread isn't running. void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS { RTC_DCHECK(thread_checker_.IsCurrent()); - RTC_DCHECK(!thread_.get()); - if (thread_.get()) + RTC_DCHECK(thread_.empty()); + if (!thread_.empty()) return; RTC_DCHECK(!stop_); @@ -81,14 +80,18 @@ void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS { for (ModuleCallback& m : modules_) m.module->ProcessThreadAttached(this); - thread_.reset( - new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_)); - thread_->Start(); + thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + CurrentTaskQueueSetter set_current(this); + while (Process()) { + } + }, + thread_name_); } void ProcessThreadImpl::Stop() { RTC_DCHECK(thread_checker_.IsCurrent()); - if (!thread_.get()) + if (thread_.empty()) return; { @@ -98,9 +101,7 @@ void ProcessThreadImpl::Stop() { } wake_up_.Set(); - - thread_->Stop(); - thread_.reset(); + thread_.Finalize(); StopNoLocks(); } @@ -108,7 +109,7 @@ void ProcessThreadImpl::Stop() { // No locking needed, since this is called after the contending thread is // stopped. void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS { - RTC_DCHECK(!thread_); + RTC_DCHECK(thread_.empty()); stop_ = false; for (ModuleCallback& m : modules_) @@ -199,7 +200,7 @@ void ProcessThreadImpl::RegisterModule(Module* module, // Now that we know the module isn't in the list, we'll call out to notify // the module that it's attached to the worker thread. We don't hold // the lock while we make this call. - if (thread_.get()) + if (!thread_.empty()) module->ProcessThreadAttached(this); { @@ -227,14 +228,6 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) { module->ProcessThreadAttached(nullptr); } -// static -void ProcessThreadImpl::Run(void* obj) { - ProcessThreadImpl* impl = static_cast(obj); - CurrentTaskQueueSetter set_current(impl); - while (impl->Process()) { - } -} - bool ProcessThreadImpl::Process() { TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_); int64_t now = rtc::TimeMillis(); diff --git a/modules/utility/source/process_thread_impl.h b/modules/utility/source/process_thread_impl.h index b83994cef8..b667bfc68a 100644 --- a/modules/utility/source/process_thread_impl.h +++ b/modules/utility/source/process_thread_impl.h @@ -45,7 +45,6 @@ class ProcessThreadImpl : public ProcessThread { void DeRegisterModule(Module* module) override; protected: - static void Run(void* obj); bool Process(); private: @@ -97,8 +96,7 @@ class ProcessThreadImpl : public ProcessThread { SequenceChecker thread_checker_; rtc::Event wake_up_; - // TODO(pbos): Remove unique_ptr and stop recreating the thread. - std::unique_ptr thread_; + rtc::PlatformThread thread_; ModuleList modules_ RTC_GUARDED_BY(mutex_); // Set to true when calling Process, to allow reentrant calls to WakeUp. diff --git a/modules/video_capture/linux/video_capture_linux.cc b/modules/video_capture/linux/video_capture_linux.cc index 49237cdf19..10f9713ec3 100644 --- a/modules/video_capture/linux/video_capture_linux.cc +++ b/modules/video_capture/linux/video_capture_linux.cc @@ -240,12 +240,15 @@ int32_t VideoCaptureModuleV4L2::StartCapture( } // start capture thread; - if (!_captureThread) { + if (_captureThread.empty()) { quit_ = false; - _captureThread.reset(new rtc::PlatformThread( - VideoCaptureModuleV4L2::CaptureThread, this, "CaptureThread", - rtc::ThreadAttributes().SetPriority(rtc::kHighPriority))); - _captureThread->Start(); + _captureThread = rtc::PlatformThread::SpawnJoinable( + [this] { + while (CaptureProcess()) { + } + }, + "CaptureThread", + rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kHigh)); } // Needed to start UVC camera - from the uvcview application @@ -261,14 +264,13 @@ int32_t VideoCaptureModuleV4L2::StartCapture( } int32_t VideoCaptureModuleV4L2::StopCapture() { - if (_captureThread) { + if (!_captureThread.empty()) { { MutexLock lock(&capture_lock_); quit_ = true; } - // Make sure the capture thread stop stop using the critsect. - _captureThread->Stop(); - _captureThread.reset(); + // Make sure the capture thread stops using the mutex. + _captureThread.Finalize(); } MutexLock lock(&capture_lock_); @@ -356,11 +358,6 @@ bool VideoCaptureModuleV4L2::CaptureStarted() { return _captureStarted; } -void VideoCaptureModuleV4L2::CaptureThread(void* obj) { - VideoCaptureModuleV4L2* capture = static_cast(obj); - while (capture->CaptureProcess()) { - } -} bool VideoCaptureModuleV4L2::CaptureProcess() { int retVal = 0; fd_set rSet; diff --git a/modules/video_capture/linux/video_capture_linux.h b/modules/video_capture/linux/video_capture_linux.h index ddb5d5ba87..fa06d72b8d 100644 --- a/modules/video_capture/linux/video_capture_linux.h +++ b/modules/video_capture/linux/video_capture_linux.h @@ -41,8 +41,7 @@ class VideoCaptureModuleV4L2 : public VideoCaptureImpl { bool AllocateVideoBuffers(); bool DeAllocateVideoBuffers(); - // TODO(pbos): Stop using unique_ptr and resetting the thread. - std::unique_ptr _captureThread; + rtc::PlatformThread _captureThread; Mutex capture_lock_; bool quit_ RTC_GUARDED_BY(capture_lock_); int32_t _deviceId; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 4d186c7040..501ca01541 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -245,6 +245,7 @@ rtc_library("platform_thread") { absl_deps = [ "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", ] } @@ -561,7 +562,10 @@ if (is_win) { "../api/task_queue", "synchronization:mutex", ] - absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] + absl_deps = [ + "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", + ] } } @@ -1413,6 +1417,7 @@ if (rtc_include_tests) { absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/memory", + "//third_party/abseil-cpp/absl/types:optional", ] } diff --git a/rtc_base/async_resolver.cc b/rtc_base/async_resolver.cc index 9e6a2bae1c..d482b4e681 100644 --- a/rtc_base/async_resolver.cc +++ b/rtc_base/async_resolver.cc @@ -123,7 +123,7 @@ void AsyncResolver::Start(const SocketAddress& addr) { RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK(!destroy_called_); addr_ = addr; - auto thread_function = + PlatformThread::SpawnDetached( [this, addr, caller_task_queue = webrtc::TaskQueueBase::Current(), state = state_] { std::vector addresses; @@ -146,14 +146,8 @@ void AsyncResolver::Start(const SocketAddress& addr) { } })); } - }; - PlatformThread thread(RunResolution, - new std::function(std::move(thread_function)), - "NameResolution", ThreadAttributes().SetDetached()); - thread.Start(); - // Although |thread| is detached, the PlatformThread contract mandates to call - // Stop() before destruction. The call doesn't actually stop anything. - thread.Stop(); + }, + "AsyncResolver"); } bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const { diff --git a/rtc_base/cpu_time_unittest.cc b/rtc_base/cpu_time_unittest.cc index 675e86307c..94f82f4306 100644 --- a/rtc_base/cpu_time_unittest.cc +++ b/rtc_base/cpu_time_unittest.cc @@ -30,8 +30,7 @@ const int kProcessingTimeMillisecs = 500; const int kWorkingThreads = 2; // Consumes approximately kProcessingTimeMillisecs of CPU time in single thread. -void WorkingFunction(void* counter_pointer) { - int64_t* counter = reinterpret_cast(counter_pointer); +void WorkingFunction(int64_t* counter) { *counter = 0; int64_t stop_cpu_time = rtc::GetThreadCpuTimeNanos() + @@ -62,14 +61,12 @@ TEST(CpuTimeTest, MAYBE_TEST(TwoThreads)) { int64_t thread_start_time_nanos = GetThreadCpuTimeNanos(); int64_t counter1; int64_t counter2; - PlatformThread thread1(WorkingFunction, reinterpret_cast(&counter1), - "Thread1"); - PlatformThread thread2(WorkingFunction, reinterpret_cast(&counter2), - "Thread2"); - thread1.Start(); - thread2.Start(); - thread1.Stop(); - thread2.Stop(); + auto thread1 = PlatformThread::SpawnJoinable( + [&counter1] { WorkingFunction(&counter1); }, "Thread1"); + auto thread2 = PlatformThread::SpawnJoinable( + [&counter2] { WorkingFunction(&counter2); }, "Thread2"); + thread1.Finalize(); + thread2.Finalize(); EXPECT_GE(counter1, 0); EXPECT_GE(counter2, 0); diff --git a/rtc_base/deprecated/recursive_critical_section_unittest.cc b/rtc_base/deprecated/recursive_critical_section_unittest.cc index 3fb7c519c1..9256a76f58 100644 --- a/rtc_base/deprecated/recursive_critical_section_unittest.cc +++ b/rtc_base/deprecated/recursive_critical_section_unittest.cc @@ -329,33 +329,28 @@ class PerfTestData { class PerfTestThread { public: - PerfTestThread() : thread_(&ThreadFunc, this, "CsPerf") {} - void Start(PerfTestData* data, int repeats, int id) { - RTC_DCHECK(!thread_.IsRunning()); RTC_DCHECK(!data_); data_ = data; repeats_ = repeats; my_id_ = id; - thread_.Start(); + thread_ = PlatformThread::SpawnJoinable( + [this] { + for (int i = 0; i < repeats_; ++i) + data_->AddToCounter(my_id_); + }, + "CsPerf"); } void Stop() { - RTC_DCHECK(thread_.IsRunning()); RTC_DCHECK(data_); - thread_.Stop(); + thread_.Finalize(); repeats_ = 0; data_ = nullptr; my_id_ = 0; } private: - static void ThreadFunc(void* param) { - PerfTestThread* me = static_cast(param); - for (int i = 0; i < me->repeats_; ++i) - me->data_->AddToCounter(me->my_id_); - } - PlatformThread thread_; PerfTestData* data_ = nullptr; int repeats_ = 0; diff --git a/rtc_base/event_tracer.cc b/rtc_base/event_tracer.cc index 0eae375708..1a2b41ec5c 100644 --- a/rtc_base/event_tracer.cc +++ b/rtc_base/event_tracer.cc @@ -79,19 +79,12 @@ namespace rtc { namespace tracing { namespace { -static void EventTracingThreadFunc(void* params); - // Atomic-int fast path for avoiding logging when disabled. static volatile int g_event_logging_active = 0; // TODO(pbos): Log metadata for all threads, etc. class EventLogger final { public: - EventLogger() - : logging_thread_(EventTracingThreadFunc, - this, - "EventTracingThread", - ThreadAttributes().SetPriority(kLowPriority)) {} ~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); } void AddTraceEvent(const char* name, @@ -209,7 +202,8 @@ class EventLogger final { rtc::AtomicOps::CompareAndSwap(&g_event_logging_active, 0, 1)); // Finally start, everything should be set up now. - logging_thread_.Start(); + logging_thread_ = + PlatformThread::SpawnJoinable([this] { Log(); }, "EventTracingThread"); TRACE_EVENT_INSTANT0("webrtc", "EventLogger::Start"); } @@ -223,7 +217,7 @@ class EventLogger final { // Wake up logging thread to finish writing. shutdown_event_.Set(); // Join the logging thread. - logging_thread_.Stop(); + logging_thread_.Finalize(); } private: @@ -326,10 +320,6 @@ class EventLogger final { bool output_file_owned_ = false; }; -static void EventTracingThreadFunc(void* params) { - static_cast(params)->Log(); -} - static EventLogger* volatile g_event_logger = nullptr; static const char* const kDisabledTracePrefix = TRACE_DISABLED_BY_DEFAULT(""); const unsigned char* InternalGetCategoryEnabled(const char* name) { diff --git a/rtc_base/event_unittest.cc b/rtc_base/event_unittest.cc index 31118877cf..a634d6e426 100644 --- a/rtc_base/event_unittest.cc +++ b/rtc_base/event_unittest.cc @@ -43,22 +43,21 @@ TEST(EventTest, AutoReset) { class SignalerThread { public: - SignalerThread() : thread_(&ThreadFn, this, "EventPerf") {} void Start(Event* writer, Event* reader) { writer_ = writer; reader_ = reader; - thread_.Start(); + thread_ = PlatformThread::SpawnJoinable( + [this] { + while (!stop_event_.Wait(0)) { + writer_->Set(); + reader_->Wait(Event::kForever); + } + }, + "EventPerf"); } void Stop() { stop_event_.Set(); - thread_.Stop(); - } - static void ThreadFn(void* param) { - auto* me = static_cast(param); - while (!me->stop_event_.Wait(0)) { - me->writer_->Set(); - me->reader_->Wait(Event::kForever); - } + thread_.Finalize(); } Event stop_event_; Event* writer_; diff --git a/rtc_base/logging_unittest.cc b/rtc_base/logging_unittest.cc index 225d66d13d..dc1208f3f6 100644 --- a/rtc_base/logging_unittest.cc +++ b/rtc_base/logging_unittest.cc @@ -160,18 +160,13 @@ TEST(LogTest, MultipleStreams) { class LogThread { public: - LogThread() : thread_(&ThreadEntry, this, "LogThread") {} - ~LogThread() { thread_.Stop(); } - - void Start() { thread_.Start(); } + void Start() { + thread_ = PlatformThread::SpawnJoinable( + [] { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }, "LogThread"); + } private: - void Run() { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; } - - static void ThreadEntry(void* p) { static_cast(p)->Run(); } - PlatformThread thread_; - Event event_; }; // Ensure we don't crash when adding/removing streams while threads are going. diff --git a/rtc_base/platform_thread.cc b/rtc_base/platform_thread.cc index c5f3bc3951..6d369d747e 100644 --- a/rtc_base/platform_thread.cc +++ b/rtc_base/platform_thread.cc @@ -10,32 +10,37 @@ #include "rtc_base/platform_thread.h" +#include #include #if !defined(WEBRTC_WIN) #include #endif -#include -#include -#include - -#include "absl/memory/memory.h" #include "rtc_base/checks.h" namespace rtc { - namespace { -struct ThreadStartData { - ThreadRunFunction run_function; - void* obj; - std::string thread_name; - ThreadPriority priority; -}; + +#if defined(WEBRTC_WIN) +int Win32PriorityFromThreadPriority(ThreadPriority priority) { + switch (priority) { + case ThreadPriority::kLow: + return THREAD_PRIORITY_BELOW_NORMAL; + case ThreadPriority::kNormal: + return THREAD_PRIORITY_NORMAL; + case ThreadPriority::kHigh: + return THREAD_PRIORITY_ABOVE_NORMAL; + case ThreadPriority::kRealtime: + return THREAD_PRIORITY_TIME_CRITICAL; + } +} +#endif bool SetPriority(ThreadPriority priority) { #if defined(WEBRTC_WIN) - return SetThreadPriority(GetCurrentThread(), priority) != FALSE; + return SetThreadPriority(GetCurrentThread(), + Win32PriorityFromThreadPriority(priority)) != FALSE; #elif defined(__native_client__) || defined(WEBRTC_FUCHSIA) // Setting thread priorities is not supported in NaCl or Fuchsia. return true; @@ -59,21 +64,18 @@ bool SetPriority(ThreadPriority priority) { const int top_prio = max_prio - 1; const int low_prio = min_prio + 1; switch (priority) { - case kLowPriority: + case ThreadPriority::kLow: param.sched_priority = low_prio; break; - case kNormalPriority: + case ThreadPriority::kNormal: // The -1 ensures that the kHighPriority is always greater or equal to // kNormalPriority. param.sched_priority = (low_prio + top_prio - 1) / 2; break; - case kHighPriority: + case ThreadPriority::kHigh: param.sched_priority = std::max(top_prio - 2, low_prio); break; - case kHighestPriority: - param.sched_priority = std::max(top_prio - 1, low_prio); - break; - case kRealtimePriority: + case ThreadPriority::kRealtime: param.sched_priority = top_prio; break; } @@ -81,124 +83,129 @@ bool SetPriority(ThreadPriority priority) { #endif // defined(WEBRTC_WIN) } -void RunPlatformThread(std::unique_ptr data) { - rtc::SetCurrentThreadName(data->thread_name.c_str()); - data->thread_name.clear(); - SetPriority(data->priority); - data->run_function(data->obj); -} - #if defined(WEBRTC_WIN) -DWORD WINAPI StartThread(void* param) { +DWORD WINAPI RunPlatformThread(void* param) { // The GetLastError() function only returns valid results when it is called // after a Win32 API function that returns a "failed" result. A crash dump // contains the result from GetLastError() and to make sure it does not // falsely report a Windows error we call SetLastError here. ::SetLastError(ERROR_SUCCESS); - RunPlatformThread(absl::WrapUnique(static_cast(param))); + auto function = static_cast*>(param); + (*function)(); + delete function; return 0; } #else -void* StartThread(void* param) { - RunPlatformThread(absl::WrapUnique(static_cast(param))); +void* RunPlatformThread(void* param) { + auto function = static_cast*>(param); + (*function)(); + delete function; return 0; } #endif // defined(WEBRTC_WIN) } // namespace -PlatformThread::PlatformThread(ThreadRunFunction func, - void* obj, - absl::string_view thread_name, - ThreadAttributes attributes) - : run_function_(func), - attributes_(attributes), - obj_(obj), - name_(thread_name) { - RTC_DCHECK(func); - RTC_DCHECK(!name_.empty()); - // TODO(tommi): Consider lowering the limit to 15 (limit on Linux). - RTC_DCHECK(name_.length() < 64); +PlatformThread::PlatformThread(Handle handle, bool joinable) + : handle_(handle), joinable_(joinable) {} + +PlatformThread::PlatformThread(PlatformThread&& rhs) + : handle_(rhs.handle_), joinable_(rhs.joinable_) { + rhs.handle_ = absl::nullopt; +} + +PlatformThread& PlatformThread::operator=(PlatformThread&& rhs) { + Finalize(); + handle_ = rhs.handle_; + joinable_ = rhs.joinable_; + rhs.handle_ = absl::nullopt; + return *this; } PlatformThread::~PlatformThread() { - RTC_DCHECK_RUN_ON(&thread_checker_); - RTC_DCHECK(!thread_); -#if defined(WEBRTC_WIN) - RTC_DCHECK(!thread_id_); -#endif // defined(WEBRTC_WIN) + Finalize(); } -void PlatformThread::Start() { - RTC_DCHECK_RUN_ON(&thread_checker_); - RTC_DCHECK(!thread_) << "Thread already started?"; - ThreadStartData* data = - new ThreadStartData{run_function_, obj_, name_, attributes_.priority}; +PlatformThread PlatformThread::SpawnJoinable( + std::function thread_function, + absl::string_view name, + ThreadAttributes attributes) { + return SpawnThread(std::move(thread_function), name, attributes, + /*joinable=*/true); +} + +PlatformThread PlatformThread::SpawnDetached( + std::function thread_function, + absl::string_view name, + ThreadAttributes attributes) { + return SpawnThread(std::move(thread_function), name, attributes, + /*joinable=*/false); +} + +absl::optional PlatformThread::GetHandle() const { + return handle_; +} + +#if defined(WEBRTC_WIN) +bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) { + RTC_DCHECK(handle_.has_value()); + return handle_.has_value() ? QueueUserAPC(function, *handle_, data) != FALSE + : false; +} +#endif + +void PlatformThread::Finalize() { + if (!handle_.has_value()) + return; +#if defined(WEBRTC_WIN) + if (joinable_) + WaitForSingleObject(*handle_, INFINITE); + CloseHandle(*handle_); +#else + if (joinable_) + RTC_CHECK_EQ(0, pthread_join(*handle_, nullptr)); +#endif + handle_ = absl::nullopt; +} + +PlatformThread PlatformThread::SpawnThread( + std::function thread_function, + absl::string_view name, + ThreadAttributes attributes, + bool joinable) { + RTC_DCHECK(thread_function); + RTC_DCHECK(!name.empty()); + // TODO(tommi): Consider lowering the limit to 15 (limit on Linux). + RTC_DCHECK(name.length() < 64); + auto start_thread_function_ptr = + new std::function([thread_function = std::move(thread_function), + name = std::string(name), attributes] { + rtc::SetCurrentThreadName(name.c_str()); + SetPriority(attributes.priority); + thread_function(); + }); #if defined(WEBRTC_WIN) // See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION. // Set the reserved stack stack size to 1M, which is the default on Windows // and Linux. - thread_ = ::CreateThread(nullptr, 1024 * 1024, &StartThread, data, - STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id_); - RTC_CHECK(thread_) << "CreateThread failed"; - RTC_DCHECK(thread_id_); + DWORD thread_id = 0; + PlatformThread::Handle handle = ::CreateThread( + nullptr, 1024 * 1024, &RunPlatformThread, start_thread_function_ptr, + STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id); + RTC_CHECK(handle) << "CreateThread failed"; #else pthread_attr_t attr; pthread_attr_init(&attr); // Set the stack stack size to 1M. pthread_attr_setstacksize(&attr, 1024 * 1024); - pthread_attr_setdetachstate(&attr, attributes_.joinable - ? PTHREAD_CREATE_JOINABLE - : PTHREAD_CREATE_DETACHED); - RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, data)); + pthread_attr_setdetachstate( + &attr, joinable ? PTHREAD_CREATE_JOINABLE : PTHREAD_CREATE_DETACHED); + PlatformThread::Handle handle; + RTC_CHECK_EQ(0, pthread_create(&handle, &attr, &RunPlatformThread, + start_thread_function_ptr)); pthread_attr_destroy(&attr); #endif // defined(WEBRTC_WIN) + return PlatformThread(handle, joinable); } -bool PlatformThread::IsRunning() const { - RTC_DCHECK_RUN_ON(&thread_checker_); -#if defined(WEBRTC_WIN) - return thread_ != nullptr; -#else - return thread_ != 0; -#endif // defined(WEBRTC_WIN) -} - -PlatformThreadRef PlatformThread::GetThreadRef() const { -#if defined(WEBRTC_WIN) - return thread_id_; -#else - return thread_; -#endif // defined(WEBRTC_WIN) -} - -void PlatformThread::Stop() { - RTC_DCHECK_RUN_ON(&thread_checker_); - if (!IsRunning()) - return; - -#if defined(WEBRTC_WIN) - if (attributes_.joinable) { - WaitForSingleObject(thread_, INFINITE); - } - CloseHandle(thread_); - thread_ = nullptr; - thread_id_ = 0; -#else - if (attributes_.joinable) { - RTC_CHECK_EQ(0, pthread_join(thread_, nullptr)); - } - thread_ = 0; -#endif // defined(WEBRTC_WIN) -} - -#if defined(WEBRTC_WIN) -bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) { - RTC_DCHECK_RUN_ON(&thread_checker_); - RTC_DCHECK(IsRunning()); - - return QueueUserAPC(function, thread_, data) != FALSE; -} -#endif - } // namespace rtc diff --git a/rtc_base/platform_thread.h b/rtc_base/platform_thread.h index 35c0e27432..11ccfae3d0 100644 --- a/rtc_base/platform_thread.h +++ b/rtc_base/platform_thread.h @@ -11,103 +11,101 @@ #ifndef RTC_BASE_PLATFORM_THREAD_H_ #define RTC_BASE_PLATFORM_THREAD_H_ -#ifndef WEBRTC_WIN -#include -#endif +#include #include #include "absl/strings/string_view.h" -#include "api/sequence_checker.h" -#include "rtc_base/constructor_magic.h" +#include "absl/types/optional.h" #include "rtc_base/platform_thread_types.h" namespace rtc { -// Callback function that the spawned thread will enter once spawned. -typedef void (*ThreadRunFunction)(void*); - -enum ThreadPriority { -#ifdef WEBRTC_WIN - kLowPriority = THREAD_PRIORITY_BELOW_NORMAL, - kNormalPriority = THREAD_PRIORITY_NORMAL, - kHighPriority = THREAD_PRIORITY_ABOVE_NORMAL, - kHighestPriority = THREAD_PRIORITY_HIGHEST, - kRealtimePriority = THREAD_PRIORITY_TIME_CRITICAL -#else - kLowPriority = 1, - kNormalPriority = 2, - kHighPriority = 3, - kHighestPriority = 4, - kRealtimePriority = 5 -#endif +enum class ThreadPriority { + kLow = 1, + kNormal, + kHigh, + kRealtime, }; struct ThreadAttributes { - ThreadPriority priority = kNormalPriority; - bool joinable = true; - + ThreadPriority priority = ThreadPriority::kNormal; ThreadAttributes& SetPriority(ThreadPriority priority_param) { priority = priority_param; return *this; } - ThreadAttributes& SetDetached() { - joinable = false; - return *this; - } }; -// Represents a simple worker thread. The implementation must be assumed -// to be single threaded, meaning that all methods of the class, must be -// called from the same thread, including instantiation. -class PlatformThread { +// Represents a simple worker thread. +class PlatformThread final { public: - PlatformThread(ThreadRunFunction func, - void* obj, - absl::string_view thread_name, - ThreadAttributes attributes = ThreadAttributes()); + // Handle is the base platform thread handle. +#if defined(WEBRTC_WIN) + using Handle = HANDLE; +#else + using Handle = pthread_t; +#endif // defined(WEBRTC_WIN) + // This ctor creates the PlatformThread with an unset handle (returning true + // in empty()) and is provided for convenience. + // TODO(bugs.webrtc.org/12727) Look into if default and move support can be + // removed. + PlatformThread() = default; + + // Moves |rhs| into this, storing an empty state in |rhs|. + // TODO(bugs.webrtc.org/12727) Look into if default and move support can be + // removed. + PlatformThread(PlatformThread&& rhs); + + // Moves |rhs| into this, storing an empty state in |rhs|. + // TODO(bugs.webrtc.org/12727) Look into if default and move support can be + // removed. + PlatformThread& operator=(PlatformThread&& rhs); + + // For a PlatformThread that's been spawned joinable, the destructor suspends + // the calling thread until the created thread exits unless the thread has + // already exited. virtual ~PlatformThread(); - const std::string& name() const { return name_; } + // Finalizes any allocated resources. + // For a PlatformThread that's been spawned joinable, Finalize() suspends + // the calling thread until the created thread exits unless the thread has + // already exited. + // empty() returns true after completion. + void Finalize(); - // Spawns a thread and tries to set thread priority according to the priority - // from when CreateThread was called. - // Start can only be called after the constructor or after a call to Stop(). - void Start(); + // Returns true if default constructed, moved from, or Finalize()ed. + bool empty() const { return !handle_.has_value(); } - bool IsRunning() const; + // Creates a started joinable thread which will be joined when the returned + // PlatformThread destructs or Finalize() is called. + static PlatformThread SpawnJoinable( + std::function thread_function, + absl::string_view name, + ThreadAttributes attributes = ThreadAttributes()); - // Returns an identifier for the worker thread that can be used to do - // thread checks. - PlatformThreadRef GetThreadRef() const; + // Creates a started detached thread. The caller has to use external + // synchronization as nothing is provided by the PlatformThread construct. + static PlatformThread SpawnDetached( + std::function thread_function, + absl::string_view name, + ThreadAttributes attributes = ThreadAttributes()); - // Stop() prepares the PlatformThread for destruction or another call to - // Start(). For a PlatformThread that's been created with - // ThreadAttributes::joinable true (the default), Stop() suspends the calling - // thread until the created thread exits unless the thread has already exited. - // Stop() can only be called after calling Start(). - void Stop(); + // Returns the base platform thread handle of this thread. + absl::optional GetHandle() const; - protected: #if defined(WEBRTC_WIN) - // Exposed to derived classes to allow for special cases specific to Windows. + // Queue a Windows APC function that runs when the thread is alertable. bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data); #endif private: - ThreadRunFunction const run_function_ = nullptr; - const ThreadAttributes attributes_; - void* const obj_; - // TODO(pbos): Make sure call sites use string literals and update to a const - // char* instead of a std::string. - const std::string name_; - webrtc::SequenceChecker thread_checker_; -#if defined(WEBRTC_WIN) - HANDLE thread_ = nullptr; - DWORD thread_id_ = 0; -#else - pthread_t thread_ = 0; -#endif // defined(WEBRTC_WIN) - RTC_DISALLOW_COPY_AND_ASSIGN(PlatformThread); + PlatformThread(Handle handle, bool joinable); + static PlatformThread SpawnThread(std::function thread_function, + absl::string_view name, + ThreadAttributes attributes, + bool joinable); + + absl::optional handle_; + bool joinable_ = false; }; } // namespace rtc diff --git a/rtc_base/platform_thread_unittest.cc b/rtc_base/platform_thread_unittest.cc index d09772fddc..0da822cf85 100644 --- a/rtc_base/platform_thread_unittest.cc +++ b/rtc_base/platform_thread_unittest.cc @@ -10,69 +10,73 @@ #include "rtc_base/platform_thread.h" +#include "absl/types/optional.h" #include "rtc_base/event.h" #include "system_wrappers/include/sleep.h" #include "test/gmock.h" namespace rtc { -namespace { -void NullRunFunction(void* obj) {} - -// Function that sets a boolean. -void SetFlagRunFunction(void* obj) { - bool* obj_as_bool = static_cast(obj); - *obj_as_bool = true; +TEST(PlatformThreadTest, DefaultConstructedIsEmpty) { + PlatformThread thread; + EXPECT_EQ(thread.GetHandle(), absl::nullopt); + EXPECT_TRUE(thread.empty()); } -void StdFunctionRunFunction(void* obj) { - std::function* fun = static_cast*>(obj); - (*fun)(); +TEST(PlatformThreadTest, StartFinalize) { + PlatformThread thread = PlatformThread::SpawnJoinable([] {}, "1"); + EXPECT_NE(thread.GetHandle(), absl::nullopt); + EXPECT_FALSE(thread.empty()); + thread.Finalize(); + EXPECT_TRUE(thread.empty()); + thread = PlatformThread::SpawnDetached([] {}, "2"); + EXPECT_FALSE(thread.empty()); + thread.Finalize(); + EXPECT_TRUE(thread.empty()); } -} // namespace - -TEST(PlatformThreadTest, StartStop) { - PlatformThread thread(&NullRunFunction, nullptr, "PlatformThreadTest"); - EXPECT_TRUE(thread.name() == "PlatformThreadTest"); - EXPECT_TRUE(thread.GetThreadRef() == 0); - thread.Start(); - EXPECT_TRUE(thread.GetThreadRef() != 0); - thread.Stop(); - EXPECT_TRUE(thread.GetThreadRef() == 0); +TEST(PlatformThreadTest, MovesEmpty) { + PlatformThread thread1; + PlatformThread thread2 = std::move(thread1); + EXPECT_TRUE(thread1.empty()); + EXPECT_TRUE(thread2.empty()); } -TEST(PlatformThreadTest, StartStop2) { - PlatformThread thread1(&NullRunFunction, nullptr, "PlatformThreadTest1"); - PlatformThread thread2(&NullRunFunction, nullptr, "PlatformThreadTest2"); - EXPECT_TRUE(thread1.GetThreadRef() == thread2.GetThreadRef()); - thread1.Start(); - thread2.Start(); - EXPECT_TRUE(thread1.GetThreadRef() != thread2.GetThreadRef()); - thread2.Stop(); - thread1.Stop(); +TEST(PlatformThreadTest, MovesHandles) { + PlatformThread thread1 = PlatformThread::SpawnJoinable([] {}, "1"); + PlatformThread thread2 = std::move(thread1); + EXPECT_TRUE(thread1.empty()); + EXPECT_FALSE(thread2.empty()); + thread1 = PlatformThread::SpawnDetached([] {}, "2"); + thread2 = std::move(thread1); + EXPECT_TRUE(thread1.empty()); + EXPECT_FALSE(thread2.empty()); +} + +TEST(PlatformThreadTest, + TwoThreadHandlesAreDifferentWhenStartedAndEqualWhenJoined) { + PlatformThread thread1 = PlatformThread(); + PlatformThread thread2 = PlatformThread(); + EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle()); + thread1 = PlatformThread::SpawnJoinable([] {}, "1"); + thread2 = PlatformThread::SpawnJoinable([] {}, "2"); + EXPECT_NE(thread1.GetHandle(), thread2.GetHandle()); + thread1.Finalize(); + EXPECT_NE(thread1.GetHandle(), thread2.GetHandle()); + thread2.Finalize(); + EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle()); } TEST(PlatformThreadTest, RunFunctionIsCalled) { bool flag = false; - PlatformThread thread(&SetFlagRunFunction, &flag, "RunFunctionIsCalled"); - thread.Start(); - - // At this point, the flag may be either true or false. - thread.Stop(); - - // We expect the thread to have run at least once. + PlatformThread::SpawnJoinable([&] { flag = true; }, "T"); EXPECT_TRUE(flag); } TEST(PlatformThreadTest, JoinsThread) { // This test flakes if there are problems with the join implementation. - EXPECT_TRUE(ThreadAttributes().joinable); rtc::Event event; - std::function thread_function = [&] { event.Set(); }; - PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T"); - thread.Start(); - thread.Stop(); + PlatformThread::SpawnJoinable([&] { event.Set(); }, "T"); EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0)); } @@ -83,18 +87,14 @@ TEST(PlatformThreadTest, StopsBeforeDetachedThreadExits) { rtc::Event thread_started; rtc::Event thread_continue; rtc::Event thread_exiting; - std::function thread_function = [&] { - thread_started.Set(); - thread_continue.Wait(Event::kForever); - flag = true; - thread_exiting.Set(); - }; - { - PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T", - ThreadAttributes().SetDetached()); - thread.Start(); - thread.Stop(); - } + PlatformThread::SpawnDetached( + [&] { + thread_started.Set(); + thread_continue.Wait(Event::kForever); + flag = true; + thread_exiting.Set(); + }, + "T"); thread_started.Wait(Event::kForever); EXPECT_FALSE(flag); thread_continue.Set(); diff --git a/rtc_base/rate_limiter_unittest.cc b/rtc_base/rate_limiter_unittest.cc index 8ebf8aa67b..eda644b4ca 100644 --- a/rtc_base/rate_limiter_unittest.cc +++ b/rtc_base/rate_limiter_unittest.cc @@ -127,10 +127,6 @@ class ThreadTask { rtc::Event end_signal_; }; -void RunTask(void* thread_task) { - reinterpret_cast(thread_task)->Run(); -} - TEST_F(RateLimitTest, MultiThreadedUsage) { // Simple sanity test, with different threads calling the various methods. // Runs a few simple tasks, each on its own thread, but coordinated with @@ -149,8 +145,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { EXPECT_TRUE(rate_limiter_->SetWindowSize(kWindowSizeMs / 2)); } } set_window_size_task(rate_limiter.get()); - rtc::PlatformThread thread1(RunTask, &set_window_size_task, "Thread1"); - thread1.Start(); + auto thread1 = rtc::PlatformThread::SpawnJoinable( + [&set_window_size_task] { set_window_size_task.Run(); }, "Thread1"); class SetMaxRateTask : public ThreadTask { public: @@ -160,8 +156,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { void DoRun() override { rate_limiter_->SetMaxRate(kMaxRateBps * 2); } } set_max_rate_task(rate_limiter.get()); - rtc::PlatformThread thread2(RunTask, &set_max_rate_task, "Thread2"); - thread2.Start(); + auto thread2 = rtc::PlatformThread::SpawnJoinable( + [&set_max_rate_task] { set_max_rate_task.Run(); }, "Thread2"); class UseRateTask : public ThreadTask { public: @@ -177,8 +173,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { SimulatedClock* const clock_; } use_rate_task(rate_limiter.get(), &clock_); - rtc::PlatformThread thread3(RunTask, &use_rate_task, "Thread3"); - thread3.Start(); + auto thread3 = rtc::PlatformThread::SpawnJoinable( + [&use_rate_task] { use_rate_task.Run(); }, "Thread3"); set_window_size_task.start_signal_.Set(); EXPECT_TRUE(set_window_size_task.end_signal_.Wait(kMaxTimeoutMs)); @@ -191,10 +187,6 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { // All rate consumed. EXPECT_FALSE(rate_limiter->TryUseRate(1)); - - thread1.Stop(); - thread2.Stop(); - thread3.Stop(); } } // namespace webrtc diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 71a9e8a3fe..909698611e 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -93,16 +93,12 @@ void EventAssign(struct event* ev, rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { switch (priority) { case Priority::HIGH: - return rtc::kRealtimePriority; + return rtc::ThreadPriority::kRealtime; case Priority::LOW: - return rtc::kLowPriority; + return rtc::ThreadPriority::kLow; case Priority::NORMAL: - return rtc::kNormalPriority; - default: - RTC_NOTREACHED(); - break; + return rtc::ThreadPriority::kNormal; } - return rtc::kNormalPriority; } class TaskQueueLibevent final : public TaskQueueBase { @@ -120,7 +116,6 @@ class TaskQueueLibevent final : public TaskQueueBase { ~TaskQueueLibevent() override = default; - static void ThreadMain(void* context); static void OnWakeup(int socket, short flags, void* context); // NOLINT static void RunTimer(int fd, short flags, void* context); // NOLINT @@ -172,11 +167,7 @@ class TaskQueueLibevent::SetTimerTask : public QueuedTask { TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority) - : event_base_(event_base_new()), - thread_(&TaskQueueLibevent::ThreadMain, - this, - queue_name, - rtc::ThreadAttributes().SetPriority(priority)) { + : event_base_(event_base_new()) { int fds[2]; RTC_CHECK(pipe(fds) == 0); SetNonBlocking(fds[0]); @@ -187,7 +178,18 @@ TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_, EV_READ | EV_PERSIST, OnWakeup, this); event_add(&wakeup_event_, 0); - thread_.Start(); + thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { + { + CurrentTaskQueueSetter set_current(this); + while (is_active_) + event_base_loop(event_base_, 0); + } + + for (TimerEvent* timer : pending_timers_) + delete timer; + }, + queue_name, rtc::ThreadAttributes().SetPriority(priority)); } void TaskQueueLibevent::Delete() { @@ -202,7 +204,7 @@ void TaskQueueLibevent::Delete() { nanosleep(&ts, nullptr); } - thread_.Stop(); + thread_.Finalize(); event_del(&wakeup_event_); @@ -255,20 +257,6 @@ void TaskQueueLibevent::PostDelayedTask(std::unique_ptr task, } } -// static -void TaskQueueLibevent::ThreadMain(void* context) { - TaskQueueLibevent* me = static_cast(context); - - { - CurrentTaskQueueSetter set_current(me); - while (me->is_active_) - event_base_loop(me->event_base_, 0); - } - - for (TimerEvent* timer : me->pending_timers_) - delete timer; -} - // static void TaskQueueLibevent::OnWakeup(int socket, short flags, // NOLINT diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc index bd5bb97988..548f7ef69a 100644 --- a/rtc_base/task_queue_stdlib.cc +++ b/rtc_base/task_queue_stdlib.cc @@ -36,14 +36,11 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority( TaskQueueFactory::Priority priority) { switch (priority) { case TaskQueueFactory::Priority::HIGH: - return rtc::kRealtimePriority; + return rtc::ThreadPriority::kRealtime; case TaskQueueFactory::Priority::LOW: - return rtc::kLowPriority; + return rtc::ThreadPriority::kLow; case TaskQueueFactory::Priority::NORMAL: - return rtc::kNormalPriority; - default: - RTC_NOTREACHED(); - return rtc::kNormalPriority; + return rtc::ThreadPriority::kNormal; } } @@ -78,8 +75,6 @@ class TaskQueueStdlib final : public TaskQueueBase { NextTask GetNextTask(); - static void ThreadMain(void* context); - void ProcessTasks(); void NotifyWake(); @@ -126,11 +121,13 @@ TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name, : started_(/*manual_reset=*/false, /*initially_signaled=*/false), stopped_(/*manual_reset=*/false, /*initially_signaled=*/false), flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false), - thread_(&TaskQueueStdlib::ThreadMain, - this, - queue_name, - rtc::ThreadAttributes().SetPriority(priority)) { - thread_.Start(); + thread_(rtc::PlatformThread::SpawnJoinable( + [this] { + CurrentTaskQueueSetter set_current(this); + ProcessTasks(); + }, + queue_name, + rtc::ThreadAttributes().SetPriority(priority))) { started_.Wait(rtc::Event::kForever); } @@ -145,7 +142,7 @@ void TaskQueueStdlib::Delete() { NotifyWake(); stopped_.Wait(rtc::Event::kForever); - thread_.Stop(); + thread_.Finalize(); delete this; } @@ -222,13 +219,6 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() { return result; } -// static -void TaskQueueStdlib::ThreadMain(void* context) { - TaskQueueStdlib* me = static_cast(context); - CurrentTaskQueueSetter set_current(me); - me->ProcessTasks(); -} - void TaskQueueStdlib::ProcessTasks() { started_.Set(); diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc index 8bfe5e5c44..d797d478f4 100644 --- a/rtc_base/task_queue_win.cc +++ b/rtc_base/task_queue_win.cc @@ -29,16 +29,18 @@ #include #include "absl/strings/string_view.h" +#include "absl/types/optional.h" #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/arraysize.h" #include "rtc_base/checks.h" +#include "rtc_base/constructor_magic.h" #include "rtc_base/event.h" #include "rtc_base/logging.h" #include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/platform_thread.h" -#include "rtc_base/time_utils.h" #include "rtc_base/synchronization/mutex.h" +#include "rtc_base/time_utils.h" namespace webrtc { namespace { @@ -56,16 +58,12 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority( TaskQueueFactory::Priority priority) { switch (priority) { case TaskQueueFactory::Priority::HIGH: - return rtc::kRealtimePriority; + return rtc::ThreadPriority::kRealtime; case TaskQueueFactory::Priority::LOW: - return rtc::kLowPriority; + return rtc::ThreadPriority::kLow; case TaskQueueFactory::Priority::NORMAL: - return rtc::kNormalPriority; - default: - RTC_NOTREACHED(); - break; + return rtc::ThreadPriority::kNormal; } - return rtc::kNormalPriority; } int64_t GetTick() { @@ -167,24 +165,6 @@ class TaskQueueWin : public TaskQueueBase { void RunPendingTasks(); private: - static void ThreadMain(void* context); - - class WorkerThread : public rtc::PlatformThread { - public: - WorkerThread(rtc::ThreadRunFunction func, - void* obj, - absl::string_view thread_name, - rtc::ThreadPriority priority) - : PlatformThread(func, - obj, - thread_name, - rtc::ThreadAttributes().SetPriority(priority)) {} - - bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { - return rtc::PlatformThread::QueueAPC(apc_function, data); - } - }; - void RunThreadMain(); bool ProcessQueuedMessages(); void RunDueTasks(); @@ -207,7 +187,7 @@ class TaskQueueWin : public TaskQueueBase { greater> timer_tasks_; UINT_PTR timer_id_ = 0; - WorkerThread thread_; + rtc::PlatformThread thread_; Mutex pending_lock_; std::queue> pending_ RTC_GUARDED_BY(pending_lock_); @@ -216,10 +196,12 @@ class TaskQueueWin : public TaskQueueBase { TaskQueueWin::TaskQueueWin(absl::string_view queue_name, rtc::ThreadPriority priority) - : thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority), - in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { + : in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { RTC_DCHECK(in_queue_); - thread_.Start(); + thread_ = rtc::PlatformThread::SpawnJoinable( + [this] { RunThreadMain(); }, queue_name, + rtc::ThreadAttributes().SetPriority(priority)); + rtc::Event event(false, false); RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, reinterpret_cast(&event))); @@ -228,11 +210,13 @@ TaskQueueWin::TaskQueueWin(absl::string_view queue_name, void TaskQueueWin::Delete() { RTC_DCHECK(!IsCurrent()); - while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { + RTC_CHECK(thread_.GetHandle() != absl::nullopt); + while ( + !::PostThreadMessage(GetThreadId(*thread_.GetHandle()), WM_QUIT, 0, 0)) { RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); Sleep(1); } - thread_.Stop(); + thread_.Finalize(); ::CloseHandle(in_queue_); delete this; } @@ -255,7 +239,9 @@ void TaskQueueWin::PostDelayedTask(std::unique_ptr task, // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the // task pointer and timestamp as LPARAM and WPARAM. auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task)); - if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0, + RTC_CHECK(thread_.GetHandle() != absl::nullopt); + if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()), + WM_QUEUE_DELAYED_TASK, 0, reinterpret_cast(task_info))) { delete task_info; } @@ -277,11 +263,6 @@ void TaskQueueWin::RunPendingTasks() { } } -// static -void TaskQueueWin::ThreadMain(void* context) { - static_cast(context)->RunThreadMain(); -} - void TaskQueueWin::RunThreadMain() { CurrentTaskQueueSetter set_current(this); HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_}; diff --git a/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc b/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc index fcd9c9b8f1..b77d86719f 100644 --- a/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc +++ b/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc @@ -153,28 +153,24 @@ class SleepDeadlock : public DeadlockInterface { } }; -// This is the function that is exectued by the thread that will deadlock and -// have its stacktrace captured. -void ThreadFunction(void* void_params) { - ThreadParams* params = static_cast(void_params); - params->tid = gettid(); - - params->deadlock_region_start_address = GetCurrentRelativeExecutionAddress(); - params->deadlock_start_event.Set(); - params->deadlock_impl->Deadlock(); - params->deadlock_region_end_address = GetCurrentRelativeExecutionAddress(); - - params->deadlock_done_event.Set(); -} - void TestStacktrace(std::unique_ptr deadlock_impl) { // Set params that will be sent to other thread. ThreadParams params; params.deadlock_impl = deadlock_impl.get(); // Spawn thread. - rtc::PlatformThread thread(&ThreadFunction, ¶ms, "StacktraceTest"); - thread.Start(); + auto thread = rtc::PlatformThread::SpawnJoinable( + [¶ms] { + params.tid = gettid(); + params.deadlock_region_start_address = + GetCurrentRelativeExecutionAddress(); + params.deadlock_start_event.Set(); + params.deadlock_impl->Deadlock(); + params.deadlock_region_end_address = + GetCurrentRelativeExecutionAddress(); + params.deadlock_done_event.Set(); + }, + "StacktraceTest"); // Wait until the thread has entered the deadlock region, and take a very // brief nap to give it time to reach the actual deadlock. @@ -198,8 +194,6 @@ void TestStacktrace(std::unique_ptr deadlock_impl) { << rtc::ToHex(params.deadlock_region_start_address) << ", " << rtc::ToHex(params.deadlock_region_end_address) << "] not contained in: " << StackTraceToString(stack_trace); - - thread.Stop(); } class LookoutLogSink final : public rtc::LogSink { @@ -259,13 +253,9 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) { // Start a thread that waits for an event. rtc::Event ev; - rtc::PlatformThread thread( - [](void* arg) { - auto* ev = static_cast(arg); - ev->Wait(rtc::Event::kForever); - }, - &ev, "TestRtcEventDeadlockDetection"); - thread.Start(); + auto thread = rtc::PlatformThread::SpawnJoinable( + [&ev] { ev.Wait(rtc::Event::kForever); }, + "TestRtcEventDeadlockDetection"); // The message should appear after 3 sec. We'll wait up to 10 sec in an // attempt to not be flaky. @@ -273,7 +263,7 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) { // Unblock the thread and shut it down. ev.Set(); - thread.Stop(); + thread.Finalize(); rtc::LogMessage::RemoveLogToStream(&sink); } diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc index 436b3ba1d2..53fb14e606 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc @@ -142,12 +142,9 @@ void DefaultVideoQualityAnalyzer::Start( int max_threads_count) { test_label_ = std::move(test_case_name); for (int i = 0; i < max_threads_count; i++) { - auto thread = std::make_unique( - &DefaultVideoQualityAnalyzer::ProcessComparisonsThread, this, - ("DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)).data(), - rtc::ThreadAttributes().SetPriority(rtc::kNormalPriority)); - thread->Start(); - thread_pool_.push_back(std::move(thread)); + thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable( + [this] { ProcessComparisons(); }, + "DefaultVideoQualityAnalyzerWorker-" + std::to_string(i))); } { MutexLock lock(&lock_); @@ -547,10 +544,6 @@ void DefaultVideoQualityAnalyzer::Stop() { } StopMeasuringCpuProcessTime(); comparison_available_event_.Set(); - for (auto& thread : thread_pool_) { - thread->Stop(); - } - // PlatformThread have to be deleted on the same thread, where it was created thread_pool_.clear(); // Perform final Metrics update. On this place analyzer is stopped and no one @@ -677,10 +670,6 @@ void DefaultVideoQualityAnalyzer::AddComparison( StopExcludingCpuThreadTime(); } -void DefaultVideoQualityAnalyzer::ProcessComparisonsThread(void* obj) { - static_cast(obj)->ProcessComparisons(); -} - void DefaultVideoQualityAnalyzer::ProcessComparisons() { while (true) { // Try to pick next comparison to perform from the queue. diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h index de9419dda9..626fa246e5 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h @@ -560,7 +560,7 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface { std::deque comparisons_ RTC_GUARDED_BY(comparison_lock_); AnalyzerStats analyzer_stats_ RTC_GUARDED_BY(comparison_lock_); - std::vector> thread_pool_; + std::vector thread_pool_; rtc::Event comparison_available_event_; Mutex cpu_measurement_lock_; diff --git a/video/video_analyzer.cc b/video/video_analyzer.cc index 6698dadf42..81dcf055b8 100644 --- a/video/video_analyzer.cc +++ b/video/video_analyzer.cc @@ -137,10 +137,12 @@ VideoAnalyzer::VideoAnalyzer(test::LayerFilteringTransport* transport, } for (uint32_t i = 0; i < num_cores; ++i) { - rtc::PlatformThread* thread = - new rtc::PlatformThread(&FrameComparisonThread, this, "Analyzer"); - thread->Start(); - comparison_thread_pool_.push_back(thread); + comparison_thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable( + [this] { + while (CompareFrames()) { + } + }, + "Analyzer")); } if (!rtp_dump_name.empty()) { @@ -155,10 +157,8 @@ VideoAnalyzer::~VideoAnalyzer() { MutexLock lock(&comparison_lock_); quit_ = true; } - for (rtc::PlatformThread* thread : comparison_thread_pool_) { - thread->Stop(); - delete thread; - } + // Joins all threads. + comparison_thread_pool_.clear(); } void VideoAnalyzer::SetReceiver(PacketReceiver* receiver) { @@ -533,12 +533,6 @@ void VideoAnalyzer::PollStats() { memory_usage_.AddSample(rtc::GetProcessResidentSizeBytes()); } -void VideoAnalyzer::FrameComparisonThread(void* obj) { - VideoAnalyzer* analyzer = static_cast(obj); - while (analyzer->CompareFrames()) { - } -} - bool VideoAnalyzer::CompareFrames() { if (AllFramesRecorded()) return false; diff --git a/video/video_analyzer.h b/video/video_analyzer.h index 18bacc16fc..68861d1b5f 100644 --- a/video/video_analyzer.h +++ b/video/video_analyzer.h @@ -302,7 +302,7 @@ class VideoAnalyzer : public PacketReceiver, const double avg_ssim_threshold_; bool is_quick_test_enabled_; - std::vector comparison_thread_pool_; + std::vector comparison_thread_pool_; rtc::Event comparison_available_event_; std::deque comparisons_ RTC_GUARDED_BY(comparison_lock_); bool quit_ RTC_GUARDED_BY(comparison_lock_);