From b8c254abd6fa784294277e2baa8298c3352faf78 Mon Sep 17 00:00:00 2001 From: "henrike@webrtc.org" Date: Fri, 14 Feb 2014 23:38:45 +0000 Subject: [PATCH] (Auto)update libjingle 61549749-> 61608469 git-svn-id: http://webrtc.googlecode.com/svn/trunk@5555 4adac7df-926f-26a2-2b94-8c16560cd09d --- talk/base/asyncinvoker-inl.h | 145 ++++++++++++++++--------- talk/base/asyncinvoker.cc | 88 ++++++++++----- talk/base/asyncinvoker.h | 62 ++++------- talk/base/messagehandler.h | 9 +- talk/base/thread_unittest.cc | 14 +-- talk/media/base/mediachannel.h | 14 ++- talk/media/webrtc/webrtcvoiceengine.cc | 14 +++ 7 files changed, 213 insertions(+), 133 deletions(-) diff --git a/talk/base/asyncinvoker-inl.h b/talk/base/asyncinvoker-inl.h index c4afd3a194..b6be175045 100644 --- a/talk/base/asyncinvoker-inl.h +++ b/talk/base/asyncinvoker-inl.h @@ -28,74 +28,119 @@ #ifndef TALK_BASE_ASYNCINVOKER_INL_H_ #define TALK_BASE_ASYNCINVOKER_INL_H_ +#include "talk/base/bind.h" +#include "talk/base/callback.h" #include "talk/base/criticalsection.h" #include "talk/base/messagehandler.h" +#include "talk/base/refcount.h" +#include "talk/base/scoped_ref_ptr.h" #include "talk/base/sigslot.h" #include "talk/base/thread.h" namespace talk_base { -// Helper class for AsyncInvoker. Runs a functor on a message queue or thread -// and doesn't execute the callback when finished if the calling thread ends. -template -class AsyncFunctorMessageHandler - : public FunctorMessageHandler, - public sigslot::has_slots<> { - typedef AsyncFunctorMessageHandler ThisT; +class AsyncInvoker; + +// Helper class for AsyncInvoker. Runs a task and triggers a callback +// on the calling thread if necessary. Instances are ref-counted so their +// lifetime can be independent of AsyncInvoker. +class AsyncClosure : public RefCountInterface { public: - explicit AsyncFunctorMessageHandler(const FunctorT& functor) - : FunctorMessageHandler(functor), - thread_(Thread::Current()), - shutting_down_(false) { - thread_->SignalQueueDestroyed.connect(this, &ThisT::OnThreadDestroyed); - } + virtual ~AsyncClosure() {} + // Runs the asynchronous task, and triggers a callback to the calling + // thread if needed. Should be called from the target thread. + virtual void Execute() = 0; +}; - virtual ~AsyncFunctorMessageHandler() { - CritScope cs(&running_crit_); - shutting_down_ = true; +// Simple closure that doesn't trigger a callback for the calling thread. +template +class FireAndForgetAsyncClosure : public AsyncClosure { + public: + explicit FireAndForgetAsyncClosure(const FunctorT& functor) + : functor_(functor) {} + virtual void Execute() { + functor_(); } + private: + FunctorT functor_; +}; - virtual void OnMessage(Message* msg) { - CritScope cs(&running_crit_); - if (!shutting_down_) { - FunctorMessageHandler::OnMessage(msg); +// Base class for closures that may trigger a callback for the calling thread. +// Listens for the "destroyed" signals from the calling thread and the invoker, +// and cancels the callback to the calling thread if either is destroyed. +class NotifyingAsyncClosureBase : public AsyncClosure, + public sigslot::has_slots<> { + public: + virtual ~NotifyingAsyncClosureBase() { disconnect_all(); } + + protected: + NotifyingAsyncClosureBase(AsyncInvoker* invoker, Thread* calling_thread); + void TriggerCallback(); + void SetCallback(const Callback0& callback) { + CritScope cs(&crit_); + callback_ = callback; + } + bool CallbackCanceled() const { return calling_thread_ == NULL; } + + private: + Callback0 callback_; + CriticalSection crit_; + AsyncInvoker* invoker_; + Thread* calling_thread_; + + void CancelCallback(); +}; + +// Closures that have a non-void return value and require a callback. +template +class NotifyingAsyncClosure : public NotifyingAsyncClosureBase { + public: + NotifyingAsyncClosure(AsyncInvoker* invoker, + Thread* calling_thread, + const FunctorT& functor, + void (HostT::*callback)(ReturnT), + HostT* callback_host) + : NotifyingAsyncClosureBase(invoker, calling_thread), + functor_(functor), + callback_(callback), + callback_host_(callback_host) {} + virtual void Execute() { + ReturnT result = functor_(); + if (!CallbackCanceled()) { + SetCallback(Callback0(Bind(callback_, callback_host_, result))); + TriggerCallback(); } } - // Returns the thread that initiated the async call. - Thread* thread() const { return thread_; } - - // Wraps a callback so that it won't execute if |thread_| goes away. - void WrapCallback(Callback0 cb) { - this->SetCallback( - Callback0(Bind(&ThisT::MaybeRunCallback, this, cb))); - } - private: - void OnThreadDestroyed() { - CritScope cs(&thread_crit_); - thread_ = NULL; - this->SetCallback(Callback0()); // Clear out the callback. - } - - void MaybeRunCallback(Callback0 cb) { -#ifdef _DEBUG - ASSERT(running_crit_.CurrentThreadIsOwner()); -#endif - CritScope cs(&thread_crit_); - if (thread_ && !shutting_down_) { - cb(); - } - } - FunctorT functor_; - Thread* thread_; - CriticalSection thread_crit_; - CriticalSection running_crit_; - bool shutting_down_; + void (HostT::*callback_)(ReturnT); + HostT* callback_host_; +}; + +// Closures that have a void return value and require a callback. +template +class NotifyingAsyncClosure + : public NotifyingAsyncClosureBase { + public: + NotifyingAsyncClosure(AsyncInvoker* invoker, + Thread* calling_thread, + const FunctorT& functor, + void (HostT::*callback)(), + HostT* callback_host) + : NotifyingAsyncClosureBase(invoker, calling_thread), + functor_(functor) { + SetCallback(Callback0(Bind(callback, callback_host))); + } + virtual void Execute() { + functor_(); + TriggerCallback(); + } + + private: + FunctorT functor_; }; } // namespace talk_base - #endif // TALK_BASE_ASYNCINVOKER_INL_H_ diff --git a/talk/base/asyncinvoker.cc b/talk/base/asyncinvoker.cc index 63171be401..a57eb7b962 100644 --- a/talk/base/asyncinvoker.cc +++ b/talk/base/asyncinvoker.cc @@ -29,50 +29,80 @@ namespace talk_base { -// Synchronously execute all outstanding calls we own pending -// on |thread|. Optionally filter by message id. +AsyncInvoker::AsyncInvoker() : destroying_(false) {} + +AsyncInvoker::~AsyncInvoker() { + destroying_ = true; + SignalInvokerDestroyed(); + // Messages for this need to be cleared *before* our destructor is complete. + MessageQueueManager::Clear(this); +} + +void AsyncInvoker::OnMessage(Message* msg) { + // Get the AsyncClosure shared ptr from this message's data. + ScopedRefMessageData* data = + static_cast*>(msg->pdata); + scoped_refptr closure = data->data(); + delete msg->pdata; + msg->pdata = NULL; + + // Execute the closure and trigger the return message if needed. + closure->Execute(); +} + void AsyncInvoker::Flush(Thread* thread, uint32 id /*= MQID_ANY*/) { + if (destroying_) return; + // Run this on |thread| to reduce the number of context switches. if (Thread::Current() != thread) { thread->Invoke(Bind(&AsyncInvoker::Flush, this, thread, id)); return; } - // Make a copy of handlers_, since it'll be modified by - // callbacks to RemoveHandler when each is done executing. - crit_.Enter(); - std::vector handlers(handlers_.collection()); - crit_.Leave(); MessageList removed; - for (size_t i = 0; i < handlers.size(); ++i) { - removed.clear(); - thread->Clear(handlers[i], id, &removed); - if (!removed.empty()) { - // Since each message gets its own handler with AsyncInvoker, - // we expect a maximum of one removed. - ASSERT(removed.size() == 1); - // This handler was pending on this thread, so run it now. - const Message& msg = removed.front(); - thread->Send(msg.phandler, - msg.message_id, - msg.pdata); - } + thread->Clear(this, id, &removed); + for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) { + // This message was pending on this thread, so run it now. + thread->Send(it->phandler, + it->message_id, + it->pdata); } } -void AsyncInvoker::InvokeHandler(Thread* thread, MessageHandler* handler, - uint32 id) { - { - CritScope cs(&crit_); - handlers_.PushBack(handler); +void AsyncInvoker::DoInvoke(Thread* thread, AsyncClosure* closure, + uint32 id) { + if (destroying_) { + LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; + // Since this call transwers ownership of |closure|, we clean it up here. + delete closure; + return; } - thread->Post(handler, id); + thread->Post(this, id, new ScopedRefMessageData(closure)); } -void AsyncInvoker::RemoveHandler(MessageHandler* handler) { +NotifyingAsyncClosureBase::NotifyingAsyncClosureBase(AsyncInvoker* invoker, + Thread* calling_thread) + : invoker_(invoker), calling_thread_(calling_thread) { + calling_thread->SignalQueueDestroyed.connect( + this, &NotifyingAsyncClosureBase::CancelCallback); + invoker->SignalInvokerDestroyed.connect( + this, &NotifyingAsyncClosureBase::CancelCallback); +} + +void NotifyingAsyncClosureBase::TriggerCallback() { CritScope cs(&crit_); - handlers_.Remove(handler); - delete handler; + if (!CallbackCanceled() && !callback_.empty()) { + invoker_->AsyncInvoke(calling_thread_, callback_); + } +} + +void NotifyingAsyncClosureBase::CancelCallback() { + // If the callback is triggering when this is called, block the + // destructor of the dying object here by waiting until the callback + // is done triggering. + CritScope cs(&crit_); + // calling_thread_ == NULL means do not trigger the callback. + calling_thread_ = NULL; } } // namespace talk_base diff --git a/talk/base/asyncinvoker.h b/talk/base/asyncinvoker.h index fabdbdcbd2..b7dfac9838 100644 --- a/talk/base/asyncinvoker.h +++ b/talk/base/asyncinvoker.h @@ -30,6 +30,7 @@ #include "talk/base/asyncinvoker-inl.h" #include "talk/base/bind.h" +#include "talk/base/sigslot.h" #include "talk/base/scopedptrcollection.h" #include "talk/base/thread.h" @@ -82,18 +83,20 @@ namespace talk_base { // AsyncInvoker invoker_; // int result_; // }; -class AsyncInvoker { +class AsyncInvoker : public MessageHandler { public: + AsyncInvoker(); + virtual ~AsyncInvoker(); + // Call |functor| asynchronously on |thread|, with no callback upon // completion. Returns immediately. template void AsyncInvoke(Thread* thread, const FunctorT& functor, uint32 id = 0) { - FunctorMessageHandler* handler = - new FunctorMessageHandler(functor); - handler->SetCallback(Bind(&AsyncInvoker::RemoveHandler, this, handler)); - InvokeHandler(thread, handler, id); + AsyncClosure* closure = + new RefCountedObject >(functor); + DoInvoke(thread, closure, id); } // Call |functor| asynchronously on |thread|, calling |callback| when done. @@ -103,12 +106,10 @@ class AsyncInvoker { void (HostT::*callback)(ReturnT), HostT* callback_host, uint32 id = 0) { - AsyncFunctorMessageHandler* handler = - new AsyncFunctorMessageHandler(functor); - handler->WrapCallback( - Bind(&AsyncInvoker::OnAsyncCallCompleted, - this, handler, callback, callback_host)); - InvokeHandler(thread, handler, id); + AsyncClosure* closure = + new RefCountedObject >( + this, Thread::Current(), functor, callback, callback_host); + DoInvoke(thread, closure, id); } // Call |functor| asynchronously on |thread|, calling |callback| when done. @@ -119,12 +120,10 @@ class AsyncInvoker { void (HostT::*callback)(), HostT* callback_host, uint32 id = 0) { - AsyncFunctorMessageHandler* handler = - new AsyncFunctorMessageHandler(functor); - handler->WrapCallback( - Bind(&AsyncInvoker::OnAsyncVoidCallCompleted, - this, handler, callback, callback_host)); - InvokeHandler(thread, handler, id); + AsyncClosure* closure = + new RefCountedObject >( + this, Thread::Current(), functor, callback, callback_host); + DoInvoke(thread, closure, id); } // Synchronously execute on |thread| all outstanding calls we own @@ -134,31 +133,16 @@ class AsyncInvoker { // behavior is desired, call Flush() before destroying this object. void Flush(Thread* thread, uint32 id = MQID_ANY); + // Signaled when this object is destructed. + sigslot::signal0<> SignalInvokerDestroyed; + private: - void InvokeHandler(Thread* thread, MessageHandler* handler, uint32 id); - void RemoveHandler(MessageHandler* handler); + virtual void OnMessage(Message* msg); + void DoInvoke(Thread* thread, AsyncClosure* closure, uint32 id); - template - void OnAsyncCallCompleted( - AsyncFunctorMessageHandler* handler, - void (HostT::*callback)(ReturnT), - HostT* callback_host) { - AsyncInvoke(handler->thread(), - Bind(callback, callback_host, handler->result())); - RemoveHandler(handler); - } + bool destroying_; - template - void OnAsyncVoidCallCompleted( - AsyncFunctorMessageHandler* handler, - void (HostT::*callback)(), - HostT* callback_host) { - AsyncInvoke(handler->thread(), Bind(callback, callback_host)); - RemoveHandler(handler); - } - - CriticalSection crit_; - ScopedPtrCollection handlers_; + DISALLOW_COPY_AND_ASSIGN(AsyncInvoker); }; } // namespace talk_base diff --git a/talk/base/messagehandler.h b/talk/base/messagehandler.h index 8b9e5f6d13..6494f2b256 100644 --- a/talk/base/messagehandler.h +++ b/talk/base/messagehandler.h @@ -28,7 +28,6 @@ #ifndef TALK_BASE_MESSAGEHANDLER_H_ #define TALK_BASE_MESSAGEHANDLER_H_ -#include "talk/base/callback.h" #include "talk/base/constructormagic.h" namespace talk_base { @@ -57,14 +56,12 @@ class FunctorMessageHandler : public MessageHandler { : functor_(functor) {} virtual void OnMessage(Message* msg) { result_ = functor_(); - if (!callback_.empty()) callback_(); } const ReturnT& result() const { return result_; } - void SetCallback(const Callback0& callback) { callback_ = callback; } + private: FunctorT functor_; ReturnT result_; - Callback0 callback_; }; // Specialization for ReturnT of void. @@ -75,13 +72,11 @@ class FunctorMessageHandler : public MessageHandler { : functor_(functor) {} virtual void OnMessage(Message* msg) { functor_(); - if (!callback_.empty()) callback_(); } void result() const {} - void SetCallback(const Callback0& callback) { callback_ = callback; } + private: FunctorT functor_; - Callback0 callback_; }; diff --git a/talk/base/thread_unittest.cc b/talk/base/thread_unittest.cc index 728e32158f..ddf6326a24 100644 --- a/talk/base/thread_unittest.cc +++ b/talk/base/thread_unittest.cc @@ -339,7 +339,7 @@ class AsyncInvokeTest : public testing::Test { Thread* expected_thread_; }; -TEST_F(AsyncInvokeTest, DISABLED_FireAndForget) { +TEST_F(AsyncInvokeTest, FireAndForget) { AsyncInvoker invoker; // Create and start the thread. Thread thread; @@ -350,7 +350,7 @@ TEST_F(AsyncInvokeTest, DISABLED_FireAndForget) { EXPECT_TRUE_WAIT(called, kWaitTimeout); } -TEST_F(AsyncInvokeTest, DISABLED_WithCallback) { +TEST_F(AsyncInvokeTest, WithCallback) { AsyncInvoker invoker; // Create and start the thread. Thread thread; @@ -363,7 +363,7 @@ TEST_F(AsyncInvokeTest, DISABLED_WithCallback) { EXPECT_EQ_WAIT(42, int_value_, kWaitTimeout); } -TEST_F(AsyncInvokeTest, DISABLED_CancelInvoker) { +TEST_F(AsyncInvokeTest, CancelInvoker) { // Create and start the thread. Thread thread; thread.Start(); @@ -379,7 +379,7 @@ TEST_F(AsyncInvokeTest, DISABLED_CancelInvoker) { EXPECT_EQ(0, int_value_); } -TEST_F(AsyncInvokeTest, DISABLED_CancelCallingThread) { +TEST_F(AsyncInvokeTest, CancelCallingThread) { AsyncInvoker invoker; { // Create and start the thread. Thread thread; @@ -396,7 +396,7 @@ TEST_F(AsyncInvokeTest, DISABLED_CancelCallingThread) { EXPECT_EQ(0, int_value_); } -TEST_F(AsyncInvokeTest, DISABLED_KillInvokerBeforeExecute) { +TEST_F(AsyncInvokeTest, KillInvokerBeforeExecute) { Thread thread; thread.Start(); { @@ -413,7 +413,7 @@ TEST_F(AsyncInvokeTest, DISABLED_KillInvokerBeforeExecute) { EXPECT_EQ(0, int_value_); } -TEST_F(AsyncInvokeTest, DISABLED_Flush) { +TEST_F(AsyncInvokeTest, Flush) { AsyncInvoker invoker; bool flag1 = false; bool flag2 = false; @@ -431,7 +431,7 @@ TEST_F(AsyncInvokeTest, DISABLED_Flush) { EXPECT_TRUE(flag2); } -TEST_F(AsyncInvokeTest, DISABLED_FlushWithIds) { +TEST_F(AsyncInvokeTest, FlushWithIds) { AsyncInvoker invoker; bool flag1 = false; bool flag2 = false; diff --git a/talk/media/base/mediachannel.h b/talk/media/base/mediachannel.h index 239798c2ef..1a84704c24 100644 --- a/talk/media/base/mediachannel.h +++ b/talk/media/base/mediachannel.h @@ -749,7 +749,13 @@ struct VoiceReceiverInfo : public MediaReceiverInfo { jitter_buffer_preferred_ms(0), delay_estimate_ms(0), audio_level(0), - expand_rate(0) { + expand_rate(0), + decoding_calls_to_silence_generator(0), + decoding_calls_to_neteq(0), + decoding_normal(0), + decoding_plc(0), + decoding_cng(0), + decoding_plc_cng(0) { } int ext_seqnum; @@ -760,6 +766,12 @@ struct VoiceReceiverInfo : public MediaReceiverInfo { int audio_level; // fraction of synthesized speech inserted through pre-emptive expansion float expand_rate; + int decoding_calls_to_silence_generator; + int decoding_calls_to_neteq; + int decoding_normal; + int decoding_plc; + int decoding_cng; + int decoding_plc_cng; }; struct VideoSenderInfo : public MediaSenderInfo { diff --git a/talk/media/webrtc/webrtcvoiceengine.cc b/talk/media/webrtc/webrtcvoiceengine.cc index f1030bd621..e2415cf569 100644 --- a/talk/media/webrtc/webrtcvoiceengine.cc +++ b/talk/media/webrtc/webrtcvoiceengine.cc @@ -3200,6 +3200,20 @@ bool WebRtcVoiceMediaChannel::GetStats(VoiceMediaInfo* info) { rinfo.expand_rate = static_cast(ns.currentExpandRate) / (1 << 14); } + + webrtc::AudioDecodingCallStats ds; + if (engine()->voe()->neteq() && + engine()->voe()->neteq()->GetDecodingCallStatistics( + *it, &ds) != -1) { + rinfo.decoding_calls_to_silence_generator = + ds.calls_to_silence_generator; + rinfo.decoding_calls_to_neteq = ds.calls_to_neteq; + rinfo.decoding_normal = ds.decoded_normal; + rinfo.decoding_plc = ds.decoded_plc; + rinfo.decoding_cng = ds.decoded_cng; + rinfo.decoding_plc_cng = ds.decoded_plc_cng; + } + if (engine()->voe()->sync()) { int jitter_buffer_delay_ms = 0; int playout_buffer_delay_ms = 0;