From 462dbcfc2a308ce898e9b70f394f14cdf1adc271 Mon Sep 17 00:00:00 2001 From: "tommi@webrtc.org" Date: Tue, 17 Mar 2015 21:40:06 +0000 Subject: [PATCH] Fix bug in Transport where channel_.clear() was being called without a lock. Looks like this snuck in between misaligned braces. Also switching to C++11 for loops, reducing lock scopes in a few places and removing locks in others. BUG=4444 R=pthatcher@webrtc.org Review URL: https://webrtc-codereview.appspot.com/43769004 Cr-Commit-Position: refs/heads/master@{#8765} git-svn-id: http://webrtc.googlecode.com/svn/trunk@8765 4adac7df-926f-26a2-2b94-8c16560cd09d --- webrtc/p2p/base/transport.cc | 176 +++++++++++++++++++---------------- webrtc/p2p/base/transport.h | 1 + 2 files changed, 96 insertions(+), 81 deletions(-) diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc index 0569cc0418..b5c6b05c4e 100644 --- a/webrtc/p2p/base/transport.cc +++ b/webrtc/p2p/base/transport.cc @@ -197,21 +197,29 @@ TransportChannelImpl* Transport::CreateChannel(int component) { TransportChannelImpl* Transport::CreateChannel_w(int component) { ASSERT(worker_thread()->IsCurrent()); - TransportChannelImpl *impl; + TransportChannelImpl* impl; + // TODO(tommi): We don't really need to grab the lock until the actual call + // to insert() below and presumably hold it throughout initialization of + // |impl| after the impl_exists check. Maybe we can factor that out to + // a separate function and not grab the lock in this function. + // Actually, we probably don't need to hold the lock while initializing + // |impl| since we can just do the insert when that's done. rtc::CritScope cs(&crit_); // Create the entry if it does not exist. bool impl_exists = false; - if (channels_.find(component) == channels_.end()) { + auto iterator = channels_.find(component); + if (iterator == channels_.end()) { impl = CreateTransportChannel(component); - channels_[component] = ChannelMapEntry(impl); + iterator = channels_.insert(std::pair( + component, ChannelMapEntry(impl))).first; } else { - impl = channels_[component].get(); + impl = iterator->second.get(); impl_exists = true; } // Increase the ref count. - channels_[component].AddRef(); + iterator->second.AddRef(); destroyed_ = false; if (impl_exists) { @@ -256,6 +264,11 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) { } TransportChannelImpl* Transport::GetChannel(int component) { + // TODO(tommi,pthatcher): Since we're returning a pointer from the channels_ + // map, shouldn't we assume that we're on the worker thread? (The pointer + // will be used outside of the lock). + // And if we're on the worker thread, which is the only thread that modifies + // channels_, can we skip grabbing the lock? rtc::CritScope cs(&crit_); ChannelMap::iterator iter = channels_.find(component); return (iter != channels_.end()) ? iter->second.get() : NULL; @@ -274,18 +287,17 @@ void Transport::DestroyChannel(int component) { void Transport::DestroyChannel_w(int component) { ASSERT(worker_thread()->IsCurrent()); - TransportChannelImpl* impl = NULL; - { - rtc::CritScope cs(&crit_); - ChannelMap::iterator iter = channels_.find(component); - if (iter == channels_.end()) - return; + ChannelMap::iterator iter = channels_.find(component); + if (iter == channels_.end()) + return; - iter->second.DecRef(); - if (!iter->second.ref()) { - impl = iter->second.get(); - channels_.erase(iter); - } + TransportChannelImpl* impl = NULL; + + iter->second.DecRef(); + if (!iter->second.ref()) { + impl = iter->second.get(); + rtc::CritScope cs(&crit_); + channels_.erase(iter); } if (connect_requested_ && channels_.empty()) { @@ -309,9 +321,9 @@ void Transport::ConnectChannels_w() { ASSERT(worker_thread()->IsCurrent()); if (connect_requested_ || channels_.empty()) return; + connect_requested_ = true; - signaling_thread()->Post( - this, MSG_CANDIDATEREADY, NULL); + signaling_thread()->Post(this, MSG_CANDIDATEREADY, NULL); if (!local_description_) { // TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here. @@ -343,8 +355,7 @@ void Transport::OnConnecting_s() { void Transport::DestroyAllChannels() { ASSERT(signaling_thread()->IsCurrent()); - worker_thread_->Invoke( - Bind(&Transport::DestroyAllChannels_w, this)); + worker_thread_->Invoke(Bind(&Transport::DestroyAllChannels_w, this)); worker_thread()->Clear(this); signaling_thread()->Clear(this); destroyed_ = true; @@ -352,19 +363,18 @@ void Transport::DestroyAllChannels() { void Transport::DestroyAllChannels_w() { ASSERT(worker_thread()->IsCurrent()); + std::vector impls; + for (auto& iter : channels_) { + iter.second.DecRef(); + if (!iter.second.ref()) + impls.push_back(iter.second.get()); + } + { rtc::CritScope cs(&crit_); - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); - ++iter) { - iter->second.DecRef(); - if (!iter->second.ref()) - impls.push_back(iter->second.get()); - } - } - channels_.clear(); - + channels_.clear(); + } for (size_t i = 0; i < impls.size(); ++i) DestroyTransportChannel(impls[i]); @@ -401,11 +411,8 @@ void Transport::OnSignalingReady() { void Transport::CallChannels_w(TransportChannelFunc func) { ASSERT(worker_thread()->IsCurrent()); - rtc::CritScope cs(&crit_); - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); - ++iter) { - ((iter->second.get())->*func)(); + for (const auto& iter : channels_) { + ((iter.second.get())->*func)(); } } @@ -451,10 +458,8 @@ bool Transport::GetStats_w(TransportStats* stats) { ASSERT(worker_thread()->IsCurrent()); stats->content_name = content_name(); stats->channel_stats.clear(); - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); - ++iter) { - ChannelMapEntry& entry = iter->second; + for (auto iter : channels_) { + ChannelMapEntry& entry = iter.second; TransportChannelStats substats; substats.component = entry->component(); entry->GetSrtpCipher(&substats.srtp_cipher); @@ -536,24 +541,24 @@ void Transport::OnChannelWritableState_s() { TransportState Transport::GetTransportState_s(bool read) { ASSERT(signaling_thread()->IsCurrent()); + rtc::CritScope cs(&crit_); bool any = false; bool all = !channels_.empty(); - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); - ++iter) { - bool b = (read ? iter->second->readable() : - iter->second->writable()); - any = any || b; - all = all && b; + for (const auto iter : channels_) { + bool b = (read ? iter.second->readable() : + iter.second->writable()); + any |= b; + all &= b; } + if (all) { return TRANSPORT_STATE_ALL; } else if (any) { return TRANSPORT_STATE_SOME; - } else { - return TRANSPORT_STATE_NONE; } + + return TRANSPORT_STATE_NONE; } void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) { @@ -566,15 +571,18 @@ void Transport::OnChannelRequestSignaling_s(int component) { ASSERT(signaling_thread()->IsCurrent()); LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates"; // Resetting ICE state for the channel. - { - rtc::CritScope cs(&crit_); - ChannelMap::iterator iter = channels_.find(component); - if (iter != channels_.end()) - iter->second.set_candidates_allocated(false); - } + worker_thread_->Invoke( + Bind(&Transport::OnChannelRequestSignaling_w, this, component)); SignalRequestSignaling(this); } +void Transport::OnChannelRequestSignaling_w(int component) { + ASSERT(worker_thread()->IsCurrent()); + ChannelMap::iterator iter = channels_.find(component); + if (iter != channels_.end()) + iter->second.set_candidates_allocated(false); +} + void Transport::OnChannelCandidateReady(TransportChannelImpl* channel, const Candidate& candidate) { ASSERT(worker_thread()->IsCurrent()); @@ -622,18 +630,18 @@ void Transport::OnChannelRouteChange_s(const TransportChannel* channel, void Transport::OnChannelCandidatesAllocationDone( TransportChannelImpl* channel) { ASSERT(worker_thread()->IsCurrent()); - rtc::CritScope cs(&crit_); ChannelMap::iterator iter = channels_.find(channel->component()); ASSERT(iter != channels_.end()); LOG(LS_INFO) << "Transport: " << content_name_ << ", component " << channel->component() << " allocation complete"; + iter->second.set_candidates_allocated(true); // If all channels belonging to this Transport got signal, then // forward this signal to upper layer. // Can this signal arrive before all transport channels are created? - for (iter = channels_.begin(); iter != channels_.end(); ++iter) { - if (!iter->second.candidates_allocated()) + for (auto& iter : channels_) { + if (!iter.second.candidates_allocated()) return; } signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE); @@ -680,20 +688,19 @@ void Transport::MaybeCompleted_w() { // When there is no channel created yet, calling this function could fire an // IceConnectionCompleted event prematurely. - if (channels_.size() == 0) { + if (channels_.empty()) { return; } // A Transport's ICE process is completed if all of its channels are writable, // have finished allocating candidates, and have pruned all but one of their // connections. - ChannelMap::const_iterator iter; - for (iter = channels_.begin(); iter != channels_.end(); ++iter) { - const TransportChannelImpl* channel = iter->second.get(); + for (const auto& iter : channels_) { + const TransportChannelImpl* channel = iter.second.get(); if (!(channel->writable() && channel->GetState() == TransportChannelState::STATE_COMPLETED && channel->GetIceRole() == ICEROLE_CONTROLLING && - iter->second.candidates_allocated())) { + iter.second.candidates_allocated())) { return; } } @@ -702,21 +709,20 @@ void Transport::MaybeCompleted_w() { } void Transport::SetIceRole_w(IceRole role) { + ASSERT(worker_thread()->IsCurrent()); rtc::CritScope cs(&crit_); ice_role_ = role; - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); ++iter) { - iter->second->SetIceRole(ice_role_); + for (auto& iter : channels_) { + iter.second->SetIceRole(ice_role_); } } void Transport::SetRemoteIceMode_w(IceMode mode) { - rtc::CritScope cs(&crit_); + ASSERT(worker_thread()->IsCurrent()); remote_ice_mode_ = mode; // Shouldn't channels be created after this method executed? - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); ++iter) { - iter->second->SetRemoteIceMode(remote_ice_mode_); + for (auto& iter : channels_) { + iter.second->SetRemoteIceMode(remote_ice_mode_); } } @@ -724,14 +730,22 @@ bool Transport::SetLocalTransportDescription_w( const TransportDescription& desc, ContentAction action, std::string* error_desc) { + ASSERT(worker_thread()->IsCurrent()); bool ret = true; - rtc::CritScope cs(&crit_); if (!VerifyIceParams(desc)) { return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", error_desc); } + // TODO(tommi,pthatcher): I'm not sure why we need to grab this lock at this + // point. |local_description_| seems to always be modified on the worker + // thread, so we should be able to use it here without grabbing the lock. + // However, we _might_ need it before the call to reset() below? + // Actually, if we ever give out a pointer to the local description to + // another thread, won't we run into trouble? (see local_description() in + // the header file - no thread check, so I'm not sure from where it's called). + rtc::CritScope cs(&crit_); if (local_description_ && IceCredentialsChanged(*local_description_, desc)) { IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED; @@ -743,9 +757,8 @@ bool Transport::SetLocalTransportDescription_w( local_description_.reset(new TransportDescription(desc)); - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); ++iter) { - ret &= ApplyLocalTransportDescription_w(iter->second.get(), error_desc); + for (auto& iter : channels_) { + ret &= ApplyLocalTransportDescription_w(iter.second.get(), error_desc); } if (!ret) return false; @@ -762,17 +775,17 @@ bool Transport::SetRemoteTransportDescription_w( ContentAction action, std::string* error_desc) { bool ret = true; - rtc::CritScope cs(&crit_); if (!VerifyIceParams(desc)) { return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", error_desc); } + // TODO(tommi,pthatcher): See todo for local_description_ above. + rtc::CritScope cs(&crit_); remote_description_.reset(new TransportDescription(desc)); - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); ++iter) { - ret &= ApplyRemoteTransportDescription_w(iter->second.get(), error_desc); + for (auto& iter : channels_) { + ret &= ApplyRemoteTransportDescription_w(iter.second.get(), error_desc); } // If PRANSWER/ANSWER is set, we should decide transport protocol type. @@ -784,6 +797,7 @@ bool Transport::SetRemoteTransportDescription_w( bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch, std::string* error_desc) { + ASSERT(worker_thread()->IsCurrent()); // If existing protocol_type is HYBRID, we may have not chosen the final // protocol type, so update the channel protocol type from the // local description. Otherwise, skip updating the protocol type. @@ -813,6 +827,7 @@ bool Transport::ApplyRemoteTransportDescription_w(TransportChannelImpl* ch, bool Transport::ApplyNegotiatedTransportDescription_w( TransportChannelImpl* channel, std::string* error_desc) { + ASSERT(worker_thread()->IsCurrent()); channel->SetIceProtocolType(protocol_); channel->SetRemoteIceMode(remote_ice_mode_); return true; @@ -820,6 +835,7 @@ bool Transport::ApplyNegotiatedTransportDescription_w( bool Transport::NegotiateTransportDescription_w(ContentAction local_role, std::string* error_desc) { + ASSERT(worker_thread()->IsCurrent()); // TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into // P2PTransport. const TransportDescription* offer; @@ -871,10 +887,8 @@ bool Transport::NegotiateTransportDescription_w(ContentAction local_role, // between future SetRemote/SetLocal invocations and new channel // creation, we have the negotiation state saved until a new // negotiation happens. - for (ChannelMap::iterator iter = channels_.begin(); - iter != channels_.end(); - ++iter) { - if (!ApplyNegotiatedTransportDescription_w(iter->second.get(), error_desc)) + for (auto& iter : channels_) { + if (!ApplyNegotiatedTransportDescription_w(iter.second.get(), error_desc)) return false; } return true; diff --git a/webrtc/p2p/base/transport.h b/webrtc/p2p/base/transport.h index 32c6bb3570..016c9af7ba 100644 --- a/webrtc/p2p/base/transport.h +++ b/webrtc/p2p/base/transport.h @@ -387,6 +387,7 @@ class Transport : public rtc::MessageHandler, void OnChannelReadableState_s(); void OnChannelWritableState_s(); void OnChannelRequestSignaling_s(int component); + void OnChannelRequestSignaling_w(int component); void OnConnecting_s(); void OnChannelRouteChange_s(const TransportChannel* channel, const Candidate& remote_candidate);