Fix tsan failures for libjingle_unittest.

1) Change AsyncSocket's SignalReadEvent and SignalWriteEvent's thread mode to multi_threaded_local as they can be accessed from different threads.
2) Protect NATServer::TransEntry::whitelist.
3) Protect PhysicalSocket:error_.

Detail failures can be seen from issue 2080, comment #5.

TBR=fischman@webrtc.org

RISK=P1
TEST=try bots and tsanv2
BUG=2080

Review URL: https://webrtc-codereview.appspot.com/2669005

git-svn-id: http://webrtc.googlecode.com/svn/trunk@5026 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
wu@webrtc.org 2013-10-23 23:56:09 +00:00
parent d1bcf1180a
commit d371a29227
6 changed files with 69 additions and 36 deletions

View File

@ -44,8 +44,16 @@ class AsyncSocket : public Socket {
virtual AsyncSocket* Accept(SocketAddress* paddr) = 0;
sigslot::signal1<AsyncSocket*> SignalReadEvent; // ready to read
sigslot::signal1<AsyncSocket*> SignalWriteEvent; // ready to write
// SignalReadEvent and SignalWriteEvent use multi_threaded_local to allow
// access concurrently from different thread.
// For example SignalReadEvent::connect will be called in AsyncUDPSocket ctor
// but at the same time the SocketDispatcher maybe signaling the read event.
// ready to read
sigslot::signal1<AsyncSocket*,
sigslot::multi_threaded_local> SignalReadEvent;
// ready to write
sigslot::signal1<AsyncSocket*,
sigslot::multi_threaded_local> SignalWriteEvent;
sigslot::signal1<AsyncSocket*> SignalConnectEvent; // connected
sigslot::signal2<AsyncSocket*, int> SignalCloseEvent; // closed
};

View File

@ -123,7 +123,7 @@ void NATServer::OnInternalPacket(
ASSERT(iter != int_map_->end());
// Allow the destination to send packets back to the source.
iter->second->whitelist->insert(dest_addr);
iter->second->WhitelistInsert(dest_addr);
// Send the packet to its intended destination.
iter->second->socket->SendTo(buf + length, size - length, dest_addr,
@ -141,7 +141,7 @@ void NATServer::OnExternalPacket(
ASSERT(iter != ext_map_->end());
// Allow the NAT to reject this packet.
if (Filter(iter->second, remote_addr)) {
if (ShouldFilterOut(iter->second, remote_addr)) {
LOG(LS_INFO) << "Packet from " << remote_addr.ToSensitiveString()
<< " was filtered out by the NAT.";
return;
@ -173,8 +173,9 @@ void NATServer::Translate(const SocketAddressPair& route) {
socket->SignalReadPacket.connect(this, &NATServer::OnExternalPacket);
}
bool NATServer::Filter(TransEntry* entry, const SocketAddress& ext_addr) {
return entry->whitelist->find(ext_addr) == entry->whitelist->end();
bool NATServer::ShouldFilterOut(TransEntry* entry,
const SocketAddress& ext_addr) {
return entry->WhitelistContains(ext_addr);
}
NATServer::TransEntry::TransEntry(
@ -188,4 +189,14 @@ NATServer::TransEntry::~TransEntry() {
delete socket;
}
void NATServer::TransEntry::WhitelistInsert(const SocketAddress& addr) {
CritScope cs(&crit_);
whitelist->insert(addr);
}
bool NATServer::TransEntry::WhitelistContains(const SocketAddress& ext_addr) {
CritScope cs(&crit_);
return whitelist->find(ext_addr) == whitelist->end();
}
} // namespace talk_base

View File

@ -2,26 +2,26 @@
* libjingle
* Copyright 2004--2005, Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
@ -91,9 +91,13 @@ class NATServer : public sigslot::has_slots<> {
TransEntry(const SocketAddressPair& r, AsyncUDPSocket* s, NAT* nat);
~TransEntry();
void WhitelistInsert(const SocketAddress& addr);
bool WhitelistContains(const SocketAddress& ext_addr);
SocketAddressPair route;
AsyncUDPSocket* socket;
AddressSet* whitelist;
CriticalSection crit_;
};
typedef std::map<SocketAddressPair, TransEntry*, RouteCmp> InternalMap;
@ -103,7 +107,7 @@ class NATServer : public sigslot::has_slots<> {
void Translate(const SocketAddressPair& route);
/* Determines whether the NAT would filter out a packet from this address. */
bool Filter(TransEntry* entry, const SocketAddress& ext_addr);
bool ShouldFilterOut(TransEntry* entry, const SocketAddress& ext_addr);
NAT* nat_;
SocketFactory* internal_;

View File

@ -222,7 +222,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
UpdateLastError();
if (err == 0) {
state_ = CS_CONNECTED;
} else if (IsBlockingError(error_)) {
} else if (IsBlockingError(GetError())) {
state_ = CS_CONNECTING;
enabled_events_ |= DE_CONNECT;
} else {
@ -234,10 +234,12 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
}
int GetError() const {
CritScope cs(&crit_);
return error_;
}
void SetError(int error) {
CritScope cs(&crit_);
error_ = error;
}
@ -290,7 +292,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
MaybeRemapSendError();
// We have seen minidumps where this may be false.
ASSERT(sent <= static_cast<int>(cb));
if ((sent < 0) && IsBlockingError(error_)) {
if ((sent < 0) && IsBlockingError(GetError())) {
enabled_events_ |= DE_WRITE;
}
return sent;
@ -312,7 +314,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
MaybeRemapSendError();
// We have seen minidumps where this may be false.
ASSERT(sent <= static_cast<int>(length));
if ((sent < 0) && IsBlockingError(error_)) {
if ((sent < 0) && IsBlockingError(GetError())) {
enabled_events_ |= DE_WRITE;
}
return sent;
@ -329,16 +331,17 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
// Must turn this back on so that the select() loop will notice the close
// event.
enabled_events_ |= DE_READ;
error_ = EWOULDBLOCK;
SetError(EWOULDBLOCK);
return SOCKET_ERROR;
}
UpdateLastError();
bool success = (received >= 0) || IsBlockingError(error_);
int error = GetError();
bool success = (received >= 0) || IsBlockingError(error);
if (udp_ || success) {
enabled_events_ |= DE_READ;
}
if (!success) {
LOG_F(LS_VERBOSE) << "Error = " << error_;
LOG_F(LS_VERBOSE) << "Error = " << error;
}
return received;
}
@ -352,12 +355,13 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
UpdateLastError();
if ((received >= 0) && (out_addr != NULL))
SocketAddressFromSockAddrStorage(addr_storage, out_addr);
bool success = (received >= 0) || IsBlockingError(error_);
int error = GetError();
bool success = (received >= 0) || IsBlockingError(error);
if (udp_ || success) {
enabled_events_ |= DE_READ;
}
if (!success) {
LOG_F(LS_VERBOSE) << "Error = " << error_;
LOG_F(LS_VERBOSE) << "Error = " << error;
}
return received;
}
@ -408,7 +412,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
int EstimateMTU(uint16* mtu) {
SocketAddress addr = GetRemoteAddress();
if (addr.IsAny()) {
error_ = ENOTCONN;
SetError(ENOTCONN);
return -1;
}
@ -416,7 +420,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
// Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
WinPing ping;
if (!ping.IsValid()) {
error_ = EINVAL; // can't think of a better error ID
SetError(EINVAL); // can't think of a better error ID
return -1;
}
int header_size = ICMP_HEADER_SIZE;
@ -432,7 +436,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
ICMP_PING_TIMEOUT_MILLIS,
1, false);
if (result == WinPing::PING_FAIL) {
error_ = EINVAL; // can't think of a better error ID
SetError(EINVAL); // can't think of a better error ID
return -1;
} else if (result != WinPing::PING_TOO_LARGE) {
*mtu = PACKET_MAXIMUMS[level];
@ -447,7 +451,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
// SIOCGIFMTU would work if we knew which interface would be used, but
// figuring that out is pretty complicated. For now we'll return an error
// and let the caller pick a default MTU.
error_ = EINVAL;
SetError(EINVAL);
return -1;
#elif defined(LINUX) || defined(ANDROID)
// Gets the path MTU.
@ -481,13 +485,13 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
}
if (error) {
error_ = error;
SignalCloseEvent(this, error_);
SetError(error);
SignalCloseEvent(this, error);
}
}
void UpdateLastError() {
error_ = LAST_SYSTEM_ERROR;
SetError(LAST_SYSTEM_ERROR);
}
void MaybeRemapSendError() {
@ -497,8 +501,8 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
// ENOBUFS - The output queue for a network interface is full.
// This generally indicates that the interface has stopped sending,
// but may be caused by transient congestion.
if (error_ == ENOBUFS) {
error_ = EWOULDBLOCK;
if (GetError() == ENOBUFS) {
SetError(EWOULDBLOCK);
}
#endif
}
@ -542,6 +546,8 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
uint8 enabled_events_;
bool udp_;
int error_;
// Protects |error_| that is accessed from different threads.
mutable CriticalSection crit_;
ConnState state_;
AsyncResolver* resolver_;

View File

@ -80,13 +80,20 @@ TestClient::Packet* TestClient::NextPacket() {
// the wrong thread to non-thread-safe objects.
uint32 end = TimeAfter(kTimeout);
while (packets_->size() == 0 && TimeUntil(end) > 0)
while (TimeUntil(end) > 0) {
{
CritScope cs(&crit_);
if (packets_->size() != 0) {
break;
}
}
Thread::Current()->ProcessMessages(1);
}
// Return the first packet placed in the queue.
Packet* packet = NULL;
CritScope cs(&crit_);
if (packets_->size() > 0) {
CritScope cs(&crit_);
packet = packets_->front();
packets_->erase(packets_->begin());
}

View File

@ -20,10 +20,7 @@ race:talk/p2p/base/stunserver_unittest.cc
# libjingle_unittest
# See https://code.google.com/p/webrtc/issues/detail?id=2080
race:talk/base/asyncudpsocket.cc
race:talk/base/logging.cc
race:talk/base/natserver.cc
race:talk/base/physicalsocketserver.cc
race:talk/base/sharedexclusivelock_unittest.cc
race:talk/base/signalthread_unittest.cc
race:talk/base/thread.cc