From d371a29227710b503b450acaf8431f6369162e3f Mon Sep 17 00:00:00 2001 From: "wu@webrtc.org" Date: Wed, 23 Oct 2013 23:56:09 +0000 Subject: [PATCH] Fix tsan failures for libjingle_unittest. 1) Change AsyncSocket's SignalReadEvent and SignalWriteEvent's thread mode to multi_threaded_local as they can be accessed from different threads. 2) Protect NATServer::TransEntry::whitelist. 3) Protect PhysicalSocket:error_. Detail failures can be seen from issue 2080, comment #5. TBR=fischman@webrtc.org RISK=P1 TEST=try bots and tsanv2 BUG=2080 Review URL: https://webrtc-codereview.appspot.com/2669005 git-svn-id: http://webrtc.googlecode.com/svn/trunk@5026 4adac7df-926f-26a2-2b94-8c16560cd09d --- talk/base/asyncsocket.h | 12 +++++- talk/base/natserver.cc | 19 +++++++-- talk/base/natserver.h | 20 ++++++---- talk/base/physicalsocketserver.cc | 40 +++++++++++-------- talk/base/testclient.cc | 11 ++++- .../valgrind-webrtc/tsan_v2/suppressions.txt | 3 -- 6 files changed, 69 insertions(+), 36 deletions(-) diff --git a/talk/base/asyncsocket.h b/talk/base/asyncsocket.h index 3d12984b70..97859a7527 100644 --- a/talk/base/asyncsocket.h +++ b/talk/base/asyncsocket.h @@ -44,8 +44,16 @@ class AsyncSocket : public Socket { virtual AsyncSocket* Accept(SocketAddress* paddr) = 0; - sigslot::signal1 SignalReadEvent; // ready to read - sigslot::signal1 SignalWriteEvent; // ready to write + // SignalReadEvent and SignalWriteEvent use multi_threaded_local to allow + // access concurrently from different thread. + // For example SignalReadEvent::connect will be called in AsyncUDPSocket ctor + // but at the same time the SocketDispatcher maybe signaling the read event. + // ready to read + sigslot::signal1 SignalReadEvent; + // ready to write + sigslot::signal1 SignalWriteEvent; sigslot::signal1 SignalConnectEvent; // connected sigslot::signal2 SignalCloseEvent; // closed }; diff --git a/talk/base/natserver.cc b/talk/base/natserver.cc index 483542591e..ef85f743b7 100644 --- a/talk/base/natserver.cc +++ b/talk/base/natserver.cc @@ -123,7 +123,7 @@ void NATServer::OnInternalPacket( ASSERT(iter != int_map_->end()); // Allow the destination to send packets back to the source. - iter->second->whitelist->insert(dest_addr); + iter->second->WhitelistInsert(dest_addr); // Send the packet to its intended destination. iter->second->socket->SendTo(buf + length, size - length, dest_addr, @@ -141,7 +141,7 @@ void NATServer::OnExternalPacket( ASSERT(iter != ext_map_->end()); // Allow the NAT to reject this packet. - if (Filter(iter->second, remote_addr)) { + if (ShouldFilterOut(iter->second, remote_addr)) { LOG(LS_INFO) << "Packet from " << remote_addr.ToSensitiveString() << " was filtered out by the NAT."; return; @@ -173,8 +173,9 @@ void NATServer::Translate(const SocketAddressPair& route) { socket->SignalReadPacket.connect(this, &NATServer::OnExternalPacket); } -bool NATServer::Filter(TransEntry* entry, const SocketAddress& ext_addr) { - return entry->whitelist->find(ext_addr) == entry->whitelist->end(); +bool NATServer::ShouldFilterOut(TransEntry* entry, + const SocketAddress& ext_addr) { + return entry->WhitelistContains(ext_addr); } NATServer::TransEntry::TransEntry( @@ -188,4 +189,14 @@ NATServer::TransEntry::~TransEntry() { delete socket; } +void NATServer::TransEntry::WhitelistInsert(const SocketAddress& addr) { + CritScope cs(&crit_); + whitelist->insert(addr); +} + +bool NATServer::TransEntry::WhitelistContains(const SocketAddress& ext_addr) { + CritScope cs(&crit_); + return whitelist->find(ext_addr) == whitelist->end(); +} + } // namespace talk_base diff --git a/talk/base/natserver.h b/talk/base/natserver.h index 0a6083cbbd..ed3b0b6423 100644 --- a/talk/base/natserver.h +++ b/talk/base/natserver.h @@ -2,26 +2,26 @@ * libjingle * Copyright 2004--2005, Google Inc. * - * Redistribution and use in source and binary forms, with or without + * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * - * 1. Redistributions of source code must retain the above copyright notice, + * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. - * 3. The name of the author may not be used to endorse or promote products + * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO - * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, - * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR - * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ @@ -91,9 +91,13 @@ class NATServer : public sigslot::has_slots<> { TransEntry(const SocketAddressPair& r, AsyncUDPSocket* s, NAT* nat); ~TransEntry(); + void WhitelistInsert(const SocketAddress& addr); + bool WhitelistContains(const SocketAddress& ext_addr); + SocketAddressPair route; AsyncUDPSocket* socket; AddressSet* whitelist; + CriticalSection crit_; }; typedef std::map InternalMap; @@ -103,7 +107,7 @@ class NATServer : public sigslot::has_slots<> { void Translate(const SocketAddressPair& route); /* Determines whether the NAT would filter out a packet from this address. */ - bool Filter(TransEntry* entry, const SocketAddress& ext_addr); + bool ShouldFilterOut(TransEntry* entry, const SocketAddress& ext_addr); NAT* nat_; SocketFactory* internal_; diff --git a/talk/base/physicalsocketserver.cc b/talk/base/physicalsocketserver.cc index f14c3bd69c..21d6eabaa6 100644 --- a/talk/base/physicalsocketserver.cc +++ b/talk/base/physicalsocketserver.cc @@ -222,7 +222,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { UpdateLastError(); if (err == 0) { state_ = CS_CONNECTED; - } else if (IsBlockingError(error_)) { + } else if (IsBlockingError(GetError())) { state_ = CS_CONNECTING; enabled_events_ |= DE_CONNECT; } else { @@ -234,10 +234,12 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { } int GetError() const { + CritScope cs(&crit_); return error_; } void SetError(int error) { + CritScope cs(&crit_); error_ = error; } @@ -290,7 +292,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { MaybeRemapSendError(); // We have seen minidumps where this may be false. ASSERT(sent <= static_cast(cb)); - if ((sent < 0) && IsBlockingError(error_)) { + if ((sent < 0) && IsBlockingError(GetError())) { enabled_events_ |= DE_WRITE; } return sent; @@ -312,7 +314,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { MaybeRemapSendError(); // We have seen minidumps where this may be false. ASSERT(sent <= static_cast(length)); - if ((sent < 0) && IsBlockingError(error_)) { + if ((sent < 0) && IsBlockingError(GetError())) { enabled_events_ |= DE_WRITE; } return sent; @@ -329,16 +331,17 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { // Must turn this back on so that the select() loop will notice the close // event. enabled_events_ |= DE_READ; - error_ = EWOULDBLOCK; + SetError(EWOULDBLOCK); return SOCKET_ERROR; } UpdateLastError(); - bool success = (received >= 0) || IsBlockingError(error_); + int error = GetError(); + bool success = (received >= 0) || IsBlockingError(error); if (udp_ || success) { enabled_events_ |= DE_READ; } if (!success) { - LOG_F(LS_VERBOSE) << "Error = " << error_; + LOG_F(LS_VERBOSE) << "Error = " << error; } return received; } @@ -352,12 +355,13 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { UpdateLastError(); if ((received >= 0) && (out_addr != NULL)) SocketAddressFromSockAddrStorage(addr_storage, out_addr); - bool success = (received >= 0) || IsBlockingError(error_); + int error = GetError(); + bool success = (received >= 0) || IsBlockingError(error); if (udp_ || success) { enabled_events_ |= DE_READ; } if (!success) { - LOG_F(LS_VERBOSE) << "Error = " << error_; + LOG_F(LS_VERBOSE) << "Error = " << error; } return received; } @@ -408,7 +412,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { int EstimateMTU(uint16* mtu) { SocketAddress addr = GetRemoteAddress(); if (addr.IsAny()) { - error_ = ENOTCONN; + SetError(ENOTCONN); return -1; } @@ -416,7 +420,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { // Gets the interface MTU (TTL=1) for the interface used to reach |addr|. WinPing ping; if (!ping.IsValid()) { - error_ = EINVAL; // can't think of a better error ID + SetError(EINVAL); // can't think of a better error ID return -1; } int header_size = ICMP_HEADER_SIZE; @@ -432,7 +436,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { ICMP_PING_TIMEOUT_MILLIS, 1, false); if (result == WinPing::PING_FAIL) { - error_ = EINVAL; // can't think of a better error ID + SetError(EINVAL); // can't think of a better error ID return -1; } else if (result != WinPing::PING_TOO_LARGE) { *mtu = PACKET_MAXIMUMS[level]; @@ -447,7 +451,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { // SIOCGIFMTU would work if we knew which interface would be used, but // figuring that out is pretty complicated. For now we'll return an error // and let the caller pick a default MTU. - error_ = EINVAL; + SetError(EINVAL); return -1; #elif defined(LINUX) || defined(ANDROID) // Gets the path MTU. @@ -481,13 +485,13 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { } if (error) { - error_ = error; - SignalCloseEvent(this, error_); + SetError(error); + SignalCloseEvent(this, error); } } void UpdateLastError() { - error_ = LAST_SYSTEM_ERROR; + SetError(LAST_SYSTEM_ERROR); } void MaybeRemapSendError() { @@ -497,8 +501,8 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { // ENOBUFS - The output queue for a network interface is full. // This generally indicates that the interface has stopped sending, // but may be caused by transient congestion. - if (error_ == ENOBUFS) { - error_ = EWOULDBLOCK; + if (GetError() == ENOBUFS) { + SetError(EWOULDBLOCK); } #endif } @@ -542,6 +546,8 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { uint8 enabled_events_; bool udp_; int error_; + // Protects |error_| that is accessed from different threads. + mutable CriticalSection crit_; ConnState state_; AsyncResolver* resolver_; diff --git a/talk/base/testclient.cc b/talk/base/testclient.cc index 0ef8518311..1a12761314 100644 --- a/talk/base/testclient.cc +++ b/talk/base/testclient.cc @@ -80,13 +80,20 @@ TestClient::Packet* TestClient::NextPacket() { // the wrong thread to non-thread-safe objects. uint32 end = TimeAfter(kTimeout); - while (packets_->size() == 0 && TimeUntil(end) > 0) + while (TimeUntil(end) > 0) { + { + CritScope cs(&crit_); + if (packets_->size() != 0) { + break; + } + } Thread::Current()->ProcessMessages(1); + } // Return the first packet placed in the queue. Packet* packet = NULL; + CritScope cs(&crit_); if (packets_->size() > 0) { - CritScope cs(&crit_); packet = packets_->front(); packets_->erase(packets_->begin()); } diff --git a/tools/valgrind-webrtc/tsan_v2/suppressions.txt b/tools/valgrind-webrtc/tsan_v2/suppressions.txt index 32a41b8c11..aa28bfaff7 100644 --- a/tools/valgrind-webrtc/tsan_v2/suppressions.txt +++ b/tools/valgrind-webrtc/tsan_v2/suppressions.txt @@ -20,10 +20,7 @@ race:talk/p2p/base/stunserver_unittest.cc # libjingle_unittest # See https://code.google.com/p/webrtc/issues/detail?id=2080 -race:talk/base/asyncudpsocket.cc race:talk/base/logging.cc -race:talk/base/natserver.cc -race:talk/base/physicalsocketserver.cc race:talk/base/sharedexclusivelock_unittest.cc race:talk/base/signalthread_unittest.cc race:talk/base/thread.cc