From 0a2391f74c001357c661d4fe39671cc82175d5c4 Mon Sep 17 00:00:00 2001 From: tommi Date: Tue, 21 Mar 2017 02:31:51 -0700 Subject: [PATCH] Add thread check to ModuleProcessThread::DeRegisterModule and remove all unnecessary locking that was there due to one implementation calling from a different thread. The only thing that was holding us back was the indeterministic teardown of voe::Channel(), but it turned out that fixing it wasn't that hard :) BUG=webrtc:4508 Review-Url: https://codereview.webrtc.org/2755273004 Cr-Commit-Position: refs/heads/master@{#17315} --- .../utility/source/process_thread_impl.cc | 35 ++----- webrtc/voice_engine/channel.cc | 95 +++++++++++-------- webrtc/voice_engine/channel.h | 4 + webrtc/voice_engine/channel_manager.cc | 10 ++ 4 files changed, 73 insertions(+), 71 deletions(-) diff --git a/webrtc/modules/utility/source/process_thread_impl.cc b/webrtc/modules/utility/source/process_thread_impl.cc index 6252931544..7ea232786d 100644 --- a/webrtc/modules/utility/source/process_thread_impl.cc +++ b/webrtc/modules/utility/source/process_thread_impl.cc @@ -67,15 +67,8 @@ void ProcessThreadImpl::Start() { RTC_DCHECK(!stop_); - { - // TODO(tommi): Since DeRegisterModule is currently being called from - // different threads in some cases (ChannelOwner), we need to lock access to - // the modules_ collection even on the controller thread. - // Once we've cleaned up those places, we can remove this lock. - rtc::CritScope lock(&lock_); - for (ModuleCallback& m : modules_) - m.module->ProcessThreadAttached(this); - } + for (ModuleCallback& m : modules_) + m.module->ProcessThreadAttached(this); thread_.reset( new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_)); @@ -97,13 +90,6 @@ void ProcessThreadImpl::Stop() { thread_->Stop(); stop_ = false; - // TODO(tommi): Since DeRegisterModule is currently being called from - // different threads in some cases (ChannelOwner), we need to lock access to - // the modules_ collection even on the controller thread. - // Since DeRegisterModule also checks thread_, we also need to hold the - // lock for the .reset() operation. - // Once we've cleaned up those places, we can remove this lock. - rtc::CritScope lock(&lock_); thread_.reset(); for (ModuleCallback& m : modules_) m.module->ProcessThreadAttached(nullptr); @@ -162,8 +148,7 @@ void ProcessThreadImpl::RegisterModule(Module* module, } void ProcessThreadImpl::DeRegisterModule(Module* module) { - // Allowed to be called on any thread. - // TODO(tommi): Disallow this ^^^ + RTC_DCHECK(thread_checker_.CalledOnValidThread()); RTC_DCHECK(module); { @@ -171,18 +156,10 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) { modules_.remove_if([&module](const ModuleCallback& m) { return m.module == module; }); - - // TODO(tommi): we currently need to hold the lock while calling out to - // ProcessThreadAttached. This is to make sure that the thread hasn't been - // destroyed while we attach the module. Once we can make sure - // DeRegisterModule isn't being called on arbitrary threads, we can move the - // |if (thread_.get())| check and ProcessThreadAttached() call outside the - // lock scope. - - // Notify the module that it's been detached. - if (thread_.get()) - module->ProcessThreadAttached(nullptr); } + + // Notify the module that it's been detached. + module->ProcessThreadAttached(nullptr); } // static diff --git a/webrtc/voice_engine/channel.cc b/webrtc/voice_engine/channel.cc index 1d73db6d39..0f0831c8b1 100644 --- a/webrtc/voice_engine/channel.cc +++ b/webrtc/voice_engine/channel.cc @@ -21,7 +21,6 @@ #include "webrtc/base/location.h" #include "webrtc/base/logging.h" #include "webrtc/base/rate_limiter.h" -#include "webrtc/base/thread_checker.h" #include "webrtc/base/timeutils.h" #include "webrtc/config.h" #include "webrtc/logging/rtc_event_log/rtc_event_log.h" @@ -935,50 +934,12 @@ Channel::Channel(int32_t channelId, } Channel::~Channel() { - rtp_receive_statistics_->RegisterRtcpStatisticsCallback(NULL); - WEBRTC_TRACE(kTraceMemory, kTraceVoice, VoEId(_instanceId, _channelId), - "Channel::~Channel() - dtor"); - - StopSend(); - StopPlayout(); - - { - rtc::CritScope cs(&_fileCritSect); - if (input_file_player_) { - input_file_player_->RegisterModuleFileCallback(NULL); - input_file_player_->StopPlayingFile(); - } - if (output_file_player_) { - output_file_player_->RegisterModuleFileCallback(NULL); - output_file_player_->StopPlayingFile(); - } - if (output_file_recorder_) { - output_file_recorder_->RegisterModuleFileCallback(NULL); - output_file_recorder_->StopRecording(); - } - } - - // The order to safely shutdown modules in a channel is: - // 1. De-register callbacks in modules - // 2. De-register modules in process thread - // 3. Destroy modules - if (audio_coding_->RegisterTransportCallback(NULL) == -1) { - WEBRTC_TRACE(kTraceWarning, kTraceVoice, VoEId(_instanceId, _channelId), - "~Channel() failed to de-register transport callback" - " (Audio coding module)"); - } - if (audio_coding_->RegisterVADCallback(NULL) == -1) { - WEBRTC_TRACE(kTraceWarning, kTraceVoice, VoEId(_instanceId, _channelId), - "~Channel() failed to de-register VAD callback" - " (Audio coding module)"); - } - // De-register modules in process thread - _moduleProcessThreadPtr->DeRegisterModule(_rtpRtcpModule.get()); - - // End of modules shutdown + RTC_DCHECK(!channel_state_.Get().sending); + RTC_DCHECK(!channel_state_.Get().playing); } int32_t Channel::Init() { + RTC_DCHECK(construction_thread_.CalledOnValidThread()); WEBRTC_TRACE(kTraceInfo, kTraceVoice, VoEId(_instanceId, _channelId), "Channel::Init()"); @@ -1081,6 +1042,56 @@ int32_t Channel::Init() { return 0; } +void Channel::Terminate() { + RTC_DCHECK(construction_thread_.CalledOnValidThread()); + // Must be called on the same thread as Init(). + WEBRTC_TRACE(kTraceMemory, kTraceVoice, VoEId(_instanceId, _channelId), + "Channel::Terminate"); + + rtp_receive_statistics_->RegisterRtcpStatisticsCallback(NULL); + + StopSend(); + StopPlayout(); + + { + rtc::CritScope cs(&_fileCritSect); + if (input_file_player_) { + input_file_player_->RegisterModuleFileCallback(NULL); + input_file_player_->StopPlayingFile(); + } + if (output_file_player_) { + output_file_player_->RegisterModuleFileCallback(NULL); + output_file_player_->StopPlayingFile(); + } + if (output_file_recorder_) { + output_file_recorder_->RegisterModuleFileCallback(NULL); + output_file_recorder_->StopRecording(); + } + } + + // The order to safely shutdown modules in a channel is: + // 1. De-register callbacks in modules + // 2. De-register modules in process thread + // 3. Destroy modules + if (audio_coding_->RegisterTransportCallback(NULL) == -1) { + WEBRTC_TRACE(kTraceWarning, kTraceVoice, VoEId(_instanceId, _channelId), + "Terminate() failed to de-register transport callback" + " (Audio coding module)"); + } + + if (audio_coding_->RegisterVADCallback(NULL) == -1) { + WEBRTC_TRACE(kTraceWarning, kTraceVoice, VoEId(_instanceId, _channelId), + "Terminate() failed to de-register VAD callback" + " (Audio coding module)"); + } + + // De-register modules in process thread + if (_moduleProcessThreadPtr) + _moduleProcessThreadPtr->DeRegisterModule(_rtpRtcpModule.get()); + + // End of modules shutdown +} + int32_t Channel::SetEngineInformation(Statistics& engineStatistics, OutputMixer& outputMixer, ProcessThread& moduleProcessThread, diff --git a/webrtc/voice_engine/channel.h b/webrtc/voice_engine/channel.h index 60e6f6040c..68d022d4d1 100644 --- a/webrtc/voice_engine/channel.h +++ b/webrtc/voice_engine/channel.h @@ -17,6 +17,7 @@ #include "webrtc/api/call/audio_sink.h" #include "webrtc/base/criticalsection.h" #include "webrtc/base/optional.h" +#include "webrtc/base/thread_checker.h" #include "webrtc/common_audio/resampler/include/push_resampler.h" #include "webrtc/common_types.h" #include "webrtc/modules/audio_coding/acm2/codec_manager.h" @@ -150,6 +151,7 @@ class Channel uint32_t instanceId, const VoEBase::ChannelConfig& config); int32_t Init(); + void Terminate(); int32_t SetEngineInformation(Statistics& engineStatistics, OutputMixer& outputMixer, ProcessThread& moduleProcessThread, @@ -504,6 +506,8 @@ class Channel // TODO(ossu): Remove once GetAudioDecoderFactory() is no longer needed. rtc::scoped_refptr decoder_factory_; + + rtc::ThreadChecker construction_thread_; }; } // namespace voe diff --git a/webrtc/voice_engine/channel_manager.cc b/webrtc/voice_engine/channel_manager.cc index b81c8a63cc..58aef6f7ee 100644 --- a/webrtc/voice_engine/channel_manager.cc +++ b/webrtc/voice_engine/channel_manager.cc @@ -106,6 +106,12 @@ void ChannelManager::DestroyChannel(int32_t channel_id) { channels_.erase(to_delete); } } + if (reference.channel()) { + // Ensure the channel is torn down now, on this thread, since a reference + // may still be held on a different thread (e.g. in the audio capture + // thread). + reference.channel()->Terminate(); + } } void ChannelManager::DestroyAllChannels() { @@ -117,6 +123,10 @@ void ChannelManager::DestroyAllChannels() { references = channels_; channels_.clear(); } + for (auto& owner : references) { + if (owner.channel()) + owner.channel()->Terminate(); + } } size_t ChannelManager::NumOfChannels() const {