diff --git a/webrtc/base/nat_unittest.cc b/webrtc/base/nat_unittest.cc index 1d4ee413b4..36b9327417 100644 --- a/webrtc/base/nat_unittest.cc +++ b/webrtc/base/nat_unittest.cc @@ -249,6 +249,8 @@ TEST(NatTest, TestPhysicalIPv6) { } } +namespace { + class TestVirtualSocketServer : public VirtualSocketServer { public: explicit TestVirtualSocketServer(SocketServer* ss) @@ -261,6 +263,8 @@ class TestVirtualSocketServer : public VirtualSocketServer { scoped_ptr ss_; }; +} // namespace + void TestVirtualInternal(int family) { scoped_ptr int_vss(new TestVirtualSocketServer( new PhysicalSocketServer())); diff --git a/webrtc/base/virtualsocketserver.cc b/webrtc/base/virtualsocketserver.cc index 8cb431dcf2..9b0e48d1d8 100644 --- a/webrtc/base/virtualsocketserver.cc +++ b/webrtc/base/virtualsocketserver.cc @@ -606,6 +606,22 @@ void VirtualSocketServer::SetNextPortForTesting(uint16 port) { next_port_ = port; } +bool VirtualSocketServer::CloseTcpConnections( + const SocketAddress& addr_local, + const SocketAddress& addr_remote) { + VirtualSocket* socket = LookupConnection(addr_local, addr_remote); + if (!socket) { + return false; + } + // Signal the close event on the local connection first. + socket->SignalCloseEvent(socket, 0); + + // Trigger the remote connection's close event. + socket->Close(); + + return true; +} + int VirtualSocketServer::Bind(VirtualSocket* socket, const SocketAddress& addr) { ASSERT(NULL != socket); diff --git a/webrtc/base/virtualsocketserver.h b/webrtc/base/virtualsocketserver.h index 8359ec86b4..b96269c52c 100644 --- a/webrtc/base/virtualsocketserver.h +++ b/webrtc/base/virtualsocketserver.h @@ -114,6 +114,11 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> { // Sets the next port number to use for testing. void SetNextPortForTesting(uint16 port); + // Close a pair of Tcp connections by addresses. Both connections will have + // its own OnClose invoked. + bool CloseTcpConnections(const SocketAddress& addr_local, + const SocketAddress& addr_remote); + protected: // Returns a new IP not used before in this network. IPAddress GetNextIP(int family); diff --git a/webrtc/p2p/base/dtlstransportchannel.cc b/webrtc/p2p/base/dtlstransportchannel.cc index ca561a0898..3344bffe27 100644 --- a/webrtc/p2p/base/dtlstransportchannel.cc +++ b/webrtc/p2p/base/dtlstransportchannel.cc @@ -398,7 +398,8 @@ void DtlsTransportChannelWrapper::OnReadableState(TransportChannel* channel) { ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(channel == channel_); LOG_J(LS_VERBOSE, this) - << "DTLSTransportChannelWrapper: channel readable state changed."; + << "DTLSTransportChannelWrapper: channel readable state changed to " + << channel_->readable(); if (dtls_state_ == STATE_NONE || dtls_state_ == STATE_OPEN) { set_readable(channel_->readable()); @@ -410,7 +411,8 @@ void DtlsTransportChannelWrapper::OnWritableState(TransportChannel* channel) { ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(channel == channel_); LOG_J(LS_VERBOSE, this) - << "DTLSTransportChannelWrapper: channel writable state changed."; + << "DTLSTransportChannelWrapper: channel writable state changed to " + << channel_->writable(); switch (dtls_state_) { case STATE_NONE: diff --git a/webrtc/p2p/base/p2ptransportchannel.cc b/webrtc/p2p/base/p2ptransportchannel.cc index 031a64ce0b..bca2c450cc 100644 --- a/webrtc/p2p/base/p2ptransportchannel.cc +++ b/webrtc/p2p/base/p2ptransportchannel.cc @@ -67,7 +67,8 @@ int CompareConnectionCandidates(cricket::Connection* a, (b->remote_candidate().generation() + b->port()->generation()); } -// Compare two connections based on their writability and static preferences. +// Compare two connections based on their connected state, writability and +// static preferences. int CompareConnections(cricket::Connection *a, cricket::Connection *b) { // Sort based on write-state. Better states have lower values. if (a->write_state() < b->write_state()) @@ -75,6 +76,38 @@ int CompareConnections(cricket::Connection *a, cricket::Connection *b) { if (a->write_state() > b->write_state()) return -1; + // WARNING: Some complexity here about TCP reconnecting. + // When a TCP connection fails because of a TCP socket disconnecting, the + // active side of the connection will attempt to reconnect for 5 seconds while + // pretending to be writable (the connection is not set to the unwritable + // state). On the passive side, the connection also remains writable even + // though it is disconnected, and a new connection is created when the active + // side connects. At that point, there are two TCP connections on the passive + // side: 1. the old, disconnected one that is pretending to be writable, and + // 2. the new, connected one that is maybe not yet writable. For purposes of + // pruning, pinging, and selecting the best connection, we want to treat the + // new connection as "better" than the old one. We could add a method called + // something like Connection::ImReallyBadEvenThoughImWritable, but that is + // equivalent to the existing Connection::connected(), which we already have. + // So, in code throughout this file, we'll check whether the connection is + // connected() or not, and if it is not, treat it as "worse" than a connected + // one, even though it's writable. In the code below, we're doing so to make + // sure we treat a new writable connection as better than an old disconnected + // connection. + + // In the case where we reconnect TCP connections, the original best + // connection is disconnected without changing to WRITE_TIMEOUT. In this case, + // the new connection, when it becomes writable, should have higher priority. + if (a->write_state() == cricket::Connection::STATE_WRITABLE && + b->write_state() == cricket::Connection::STATE_WRITABLE) { + if (a->connected() && !b->connected()) { + return 1; + } + if (!a->connected() && b->connected()) { + return -1; + } + } + // Compare the candidate information. return CompareConnectionCandidates(a, b); } @@ -992,16 +1025,20 @@ void P2PTransportChannel::SortConnections() { SwitchBestConnectionTo(top_connection); } - // We can prune any connection for which there is a writable connection on - // the same network with better or equal priority. We leave those with - // better priority just in case they become writable later (at which point, - // we would prune out the current best connection). We leave connections on - // other networks because they may not be using the same resources and they - // may represent very distinct paths over which we can switch. + // We can prune any connection for which there is a connected, writable + // connection on the same network with better or equal priority. We leave + // those with better priority just in case they become writable later (at + // which point, we would prune out the current best connection). We leave + // connections on other networks because they may not be using the same + // resources and they may represent very distinct paths over which we can + // switch. If the |primier| connection is not connected, we may be + // reconnecting a TCP connection and temporarily do not prune connections in + // this network. See the big comment in CompareConnections. std::set::iterator network; for (network = networks.begin(); network != networks.end(); ++network) { Connection* primier = GetBestConnectionOnNetwork(*network); - if (!primier || (primier->write_state() != Connection::STATE_WRITABLE)) + if (!primier || (primier->write_state() != Connection::STATE_WRITABLE) || + !primier->connected()) continue; for (uint32 i = 0; i < connections_.size(); ++i) { @@ -1162,6 +1199,8 @@ void P2PTransportChannel::OnPing() { } // Is the connection in a state for us to even consider pinging the other side? +// We consider a connection pingable even if it's not connected because that's +// how a TCP connection is kicked into reconnecting on the active side. bool P2PTransportChannel::IsPingable(Connection* conn) { const Candidate& remote = conn->remote_candidate(); // We should never get this far with an empty remote ufrag. @@ -1171,10 +1210,12 @@ bool P2PTransportChannel::IsPingable(Connection* conn) { return false; } - // An unconnected connection cannot be written to at all, so pinging is out - // of the question. - if (!conn->connected()) + // An never connected connection cannot be written to at all, so pinging is + // out of the question. However, if it has become WRITABLE, it is in the + // reconnecting state so ping is needed. + if (!conn->connected() && conn->write_state() != Connection::STATE_WRITABLE) { return false; + } if (writable()) { // If we are writable, then we only want to ping connections that could be @@ -1192,14 +1233,17 @@ bool P2PTransportChannel::IsPingable(Connection* conn) { } // Returns the next pingable connection to ping. This will be the oldest -// pingable connection unless we have a writable connection that is past the -// maximum acceptable ping delay. +// pingable connection unless we have a connected, writable connection that is +// past the maximum acceptable ping delay. When reconnecting a TCP connection, +// the best connection is disconnected, although still WRITABLE while +// reconnecting. The newly created connection should be selected as the ping +// target to become writable instead. See the big comment in CompareConnections. Connection* P2PTransportChannel::FindNextPingableConnection() { uint32 now = rtc::Time(); - if (best_connection_ && + if (best_connection_ && best_connection_->connected() && (best_connection_->write_state() == Connection::STATE_WRITABLE) && - (best_connection_->last_ping_sent() - + MAX_CURRENT_WRITABLE_DELAY <= now)) { + (best_connection_->last_ping_sent() + MAX_CURRENT_WRITABLE_DELAY <= + now)) { return best_connection_; } diff --git a/webrtc/p2p/base/port.cc b/webrtc/p2p/base/port.cc index c321f83e80..bf7d75fbad 100644 --- a/webrtc/p2p/base/port.cc +++ b/webrtc/p2p/base/port.cc @@ -95,9 +95,6 @@ const int RTT_RATIO = 3; // 3 : 1 // The delay before we begin checking if this port is useless. const int kPortTimeoutDelay = 30 * 1000; // 30 seconds - -// Used by the Connection. -const uint32 MSG_DELETE = 1; } namespace cricket { @@ -948,7 +945,8 @@ void Connection::set_connected(bool value) { bool old_value = connected_; connected_ = value; if (value != old_value) { - LOG_J(LS_VERBOSE, this) << "set_connected"; + LOG_J(LS_VERBOSE, this) << "set_connected from: " << old_value << " to " + << value; } } @@ -1178,7 +1176,6 @@ void Connection::UpdateState(uint32 now) { } void Connection::Ping(uint32 now) { - ASSERT(connected_); last_ping_sent_ = now; pings_since_last_response_.push_back(now); ConnectionRequest *req = new ConnectionRequest(this); @@ -1372,7 +1369,6 @@ void Connection::MaybeUpdatePeerReflexiveCandidate( void Connection::OnMessage(rtc::Message *pmsg) { ASSERT(pmsg->message_id == MSG_DELETE); - LOG_J(LS_INFO, this) << "Connection deleted due to read or write timeout"; SignalDestroyed(this); delete this; diff --git a/webrtc/p2p/base/port.h b/webrtc/p2p/base/port.h index 6d7d6d5437..a9d4c00aad 100644 --- a/webrtc/p2p/base/port.h +++ b/webrtc/p2p/base/port.h @@ -546,6 +546,8 @@ class Connection : public rtc::MessageHandler, void MaybeUpdatePeerReflexiveCandidate(const Candidate& new_candidate); protected: + enum { MSG_DELETE = 0, MSG_FIRST_AVAILABLE }; + // Constructs a new connection to the given remote port. Connection(Port* port, size_t index, const Candidate& candidate); @@ -553,8 +555,8 @@ class Connection : public rtc::MessageHandler, void OnSendStunPacket(const void* data, size_t size, StunRequest* req); // Callbacks from ConnectionRequest - void OnConnectionRequestResponse(ConnectionRequest* req, - StunMessage* response); + virtual void OnConnectionRequestResponse(ConnectionRequest* req, + StunMessage* response); void OnConnectionRequestErrorResponse(ConnectionRequest* req, StunMessage* response); void OnConnectionRequestTimeout(ConnectionRequest* req); diff --git a/webrtc/p2p/base/port_unittest.cc b/webrtc/p2p/base/port_unittest.cc index 26e46a6995..b4a024f4a4 100644 --- a/webrtc/p2p/base/port_unittest.cc +++ b/webrtc/p2p/base/port_unittest.cc @@ -75,6 +75,8 @@ static const int STUN_ERROR_SERVER_ERROR_AS_GICE = static const int kTiebreaker1 = 11111; static const int kTiebreaker2 = 22222; +static const char* data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; + static Candidate GetCandidate(Port* port) { assert(port->Candidates().size() == 1); return port->Candidates()[0]; @@ -230,6 +232,7 @@ class TestChannel : public sigslot::has_slots<> { conn_->set_use_candidate_attr(remote_ice_mode == ICEMODE_FULL); conn_->SignalStateChange.connect( this, &TestChannel::OnConnectionStateChange); + conn_->SignalDestroyed.connect(this, &TestChannel::OnDestroyed); } void OnConnectionStateChange(Connection* conn) { if (conn->write_state() == Connection::STATE_WRITABLE) { @@ -242,6 +245,7 @@ class TestChannel : public sigslot::has_slots<> { Candidate c = GetCandidate(dst_); c.set_address(remote_address_); conn_ = src_->CreateConnection(c, Port::ORIGIN_MESSAGE); + conn_->SignalDestroyed.connect(this, &TestChannel::OnDestroyed); src_->SendBindingResponse(remote_request_.get(), remote_address_); remote_request_.reset(); } @@ -252,8 +256,9 @@ class TestChannel : public sigslot::has_slots<> { conn_->Ping(now); } void Stop() { - conn_->SignalDestroyed.connect(this, &TestChannel::OnDestroyed); - conn_->Destroy(); + if (conn_) { + conn_->Destroy(); + } } void OnPortComplete(Port* port) { @@ -263,6 +268,11 @@ class TestChannel : public sigslot::has_slots<> { ice_mode_ = ice_mode; } + int SendData(const char* data, size_t len) { + rtc::PacketOptions options; + return conn_->Send(data, len, options); + } + void OnUnknownAddress(PortInterface* port, const SocketAddress& addr, ProtocolType proto, IceMessage* msg, const std::string& rf, @@ -295,7 +305,12 @@ class TestChannel : public sigslot::has_slots<> { void OnDestroyed(Connection* conn) { ASSERT_EQ(conn_, conn); + LOG(INFO) << "OnDestroy connection " << conn << " deleted"; conn_ = NULL; + // When the connection is destroyed, also clear these fields so future + // connections are possible. + remote_request_.reset(); + remote_address_.Clear(); } void OnSrcPortDestroyed(PortInterface* port) { @@ -303,6 +318,8 @@ class TestChannel : public sigslot::has_slots<> { ASSERT_EQ(destroyed_src, port); } + Port* src_port() { return src_.get(); } + bool nominated() const { return nominated_; } private: @@ -333,9 +350,13 @@ class PortTest : public testing::Test, public sigslot::has_slots<> { nat_socket_factory2_(&nat_factory2_), stun_server_(TestStunServer::Create(main_, kStunAddr)), turn_server_(main_, kTurnUdpIntAddr, kTurnUdpExtAddr), - relay_server_(main_, kRelayUdpIntAddr, kRelayUdpExtAddr, - kRelayTcpIntAddr, kRelayTcpExtAddr, - kRelaySslTcpIntAddr, kRelaySslTcpExtAddr), + relay_server_(main_, + kRelayUdpIntAddr, + kRelayUdpExtAddr, + kRelayTcpIntAddr, + kRelayTcpExtAddr, + kRelaySslTcpIntAddr, + kRelaySslTcpExtAddr), username_(rtc::CreateRandomString(ICE_UFRAG_LENGTH)), password_(rtc::CreateRandomString(ICE_PWD_LENGTH)), ice_protocol_(cricket::ICEPROTO_GOOGLE), @@ -405,7 +426,6 @@ class PortTest : public testing::Test, public sigslot::has_slots<> { TestConnectivity("ssltcp", port1, RelayName(rtype, proto), port2, rtype == RELAY_GTURN, false, true, true); } - // helpers for above functions UDPPort* CreateUdpPort(const SocketAddress& addr) { return CreateUdpPort(addr, &socket_factory_); @@ -525,10 +545,117 @@ class PortTest : public testing::Test, public sigslot::has_slots<> { bool accept, bool same_addr1, bool same_addr2, bool possible); + // This connects the provided channels which have already started. |ch1| + // should have its Connection created (either through CreateConnection() or + // TCP reconnecting mechanism before entering this function. + void ConnectStartedChannels(TestChannel* ch1, TestChannel* ch2) { + ASSERT_TRUE(ch1->conn()); + EXPECT_TRUE_WAIT(ch1->conn()->connected(), kTimeout); // for TCP connect + ch1->Ping(); + WAIT(!ch2->remote_address().IsNil(), kTimeout); + + // Send a ping from dst to src. + ch2->AcceptConnection(); + ch2->Ping(); + EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch2->conn()->write_state(), + kTimeout); + } + // This connects and disconnects the provided channels in the same sequence as // TestConnectivity with all options set to |true|. It does not delete either // channel. - void ConnectAndDisconnectChannels(TestChannel* ch1, TestChannel* ch2); + void StartConnectAndStopChannels(TestChannel* ch1, TestChannel* ch2) { + // Acquire addresses. + ch1->Start(); + ch2->Start(); + + ch1->CreateConnection(); + ConnectStartedChannels(ch1, ch2); + + // Destroy the connections. + ch1->Stop(); + ch2->Stop(); + } + + // This disconnects both end's Connection and make sure ch2 ready for new + // connection. + void DisconnectTcpTestChannels(TestChannel* ch1, TestChannel* ch2) { + ASSERT_TRUE(ss_->CloseTcpConnections( + static_cast(ch1->conn())->socket()->GetLocalAddress(), + static_cast(ch2->conn())->socket()->GetLocalAddress())); + + // Wait for both OnClose are delivered. + EXPECT_TRUE_WAIT(!ch1->conn()->connected(), kTimeout); + EXPECT_TRUE_WAIT(!ch2->conn()->connected(), kTimeout); + + // Destroy channel2 connection to get ready for new incoming TCPConnection. + ch2->conn()->Destroy(); + EXPECT_TRUE_WAIT(ch2->conn() == NULL, kTimeout); + } + + void TestTcpReconnect(bool ping_after_disconnected, + bool send_after_disconnected) { + Port* port1 = CreateTcpPort(kLocalAddr1); + Port* port2 = CreateTcpPort(kLocalAddr2); + + port1->set_component(cricket::ICE_CANDIDATE_COMPONENT_DEFAULT); + port2->set_component(cricket::ICE_CANDIDATE_COMPONENT_DEFAULT); + + // Set up channels and ensure both ports will be deleted. + TestChannel ch1(port1, port2); + TestChannel ch2(port2, port1); + EXPECT_EQ(0, ch1.complete_count()); + EXPECT_EQ(0, ch2.complete_count()); + + ch1.Start(); + ch2.Start(); + ASSERT_EQ_WAIT(1, ch1.complete_count(), kTimeout); + ASSERT_EQ_WAIT(1, ch2.complete_count(), kTimeout); + + // Initial connecting the channel, create connection on channel1. + ch1.CreateConnection(); + ConnectStartedChannels(&ch1, &ch2); + + // Shorten the timeout period. + const int kTcpReconnectTimeout = kTimeout; + static_cast(ch1.conn()) + ->set_reconnection_timeout(kTcpReconnectTimeout); + static_cast(ch2.conn()) + ->set_reconnection_timeout(kTcpReconnectTimeout); + + // Once connected, disconnect them. + DisconnectTcpTestChannels(&ch1, &ch2); + + if (send_after_disconnected || ping_after_disconnected) { + if (send_after_disconnected) { + // First SendData after disconnect should fail but will trigger + // reconnect. + EXPECT_EQ(-1, ch1.SendData(data, static_cast(strlen(data)))); + } + + if (ping_after_disconnected) { + // Ping should trigger reconnect. + ch1.Ping(); + } + + // Wait for channel's outgoing TCPConnection connected. + EXPECT_TRUE_WAIT(ch1.conn()->connected(), kTimeout); + + // Verify that we could still connect channels. + ConnectStartedChannels(&ch1, &ch2); + } else { + EXPECT_EQ(ch1.conn()->write_state(), Connection::STATE_WRITABLE); + EXPECT_TRUE_WAIT( + ch1.conn()->write_state() == Connection::STATE_WRITE_TIMEOUT, + kTcpReconnectTimeout + kTimeout); + } + + // Tear down and ensure that goes smoothly. + ch1.Stop(); + ch2.Stop(); + EXPECT_TRUE_WAIT(ch1.conn() == NULL, kTimeout); + EXPECT_TRUE_WAIT(ch2.conn() == NULL, kTimeout); + } void SetIceProtocolType(cricket::IceProtocolType protocol) { ice_protocol_ = protocol; @@ -740,29 +867,6 @@ void PortTest::TestConnectivity(const char* name1, Port* port1, EXPECT_TRUE_WAIT(ch2.conn() == NULL, kTimeout); } -void PortTest::ConnectAndDisconnectChannels(TestChannel* ch1, - TestChannel* ch2) { - // Acquire addresses. - ch1->Start(); - ch2->Start(); - - // Send a ping from src to dst. - ch1->CreateConnection(); - EXPECT_TRUE_WAIT(ch1->conn()->connected(), kTimeout); // for TCP connect - ch1->Ping(); - WAIT(!ch2->remote_address().IsNil(), kTimeout); - - // Send a ping from dst to src. - ch2->AcceptConnection(); - ch2->Ping(); - EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch2->conn()->write_state(), - kTimeout); - - // Destroy the connections. - ch1->Stop(); - ch2->Stop(); -} - class FakePacketSocketFactory : public rtc::PacketSocketFactory { public: FakePacketSocketFactory() @@ -1039,6 +1143,18 @@ TEST_F(PortTest, TestTcpToTcp) { TestTcpToTcp(); } +TEST_F(PortTest, TestTcpReconnectOnSendPacket) { + TestTcpReconnect(false /* ping */, true /* send */); +} + +TEST_F(PortTest, TestTcpReconnectOnPing) { + TestTcpReconnect(true /* ping */, false /* send */); +} + +TEST_F(PortTest, TestTcpReconnectTimeout) { + TestTcpReconnect(false /* ping */, false /* send */); +} + /* TODO: Enable these once testrelayserver can accept external TCP. TEST_F(PortTest, TestTcpToTcpRelay) { TestTcpToRelay(PROTO_TCP); @@ -2466,7 +2582,7 @@ TEST_F(PortTest, TestControllingNoTimeout) { TestChannel ch2(port2, port1); // Simulate a connection that succeeds, and then is destroyed. - ConnectAndDisconnectChannels(&ch1, &ch2); + StartConnectAndStopChannels(&ch1, &ch2); // After the connection is destroyed, the port should not be destroyed. rtc::Thread::Current()->ProcessMessages(kTimeout); @@ -2498,7 +2614,7 @@ TEST_F(PortTest, TestControlledTimeout) { TestChannel ch2(port2, port1); // Simulate a connection that succeeds, and then is destroyed. - ConnectAndDisconnectChannels(&ch1, &ch2); + StartConnectAndStopChannels(&ch1, &ch2); // The controlled port should be destroyed after 10 milliseconds. EXPECT_TRUE_WAIT(destroyed(), kTimeout); diff --git a/webrtc/p2p/base/tcpport.cc b/webrtc/p2p/base/tcpport.cc index 89265d7b11..5c9e0425c8 100644 --- a/webrtc/p2p/base/tcpport.cc +++ b/webrtc/p2p/base/tcpport.cc @@ -8,6 +8,62 @@ * be found in the AUTHORS file in the root of the source tree. */ +/* + * This is a diagram of how TCP reconnect works for the active side. The + * passive side just waits for an incoming connection. + * + * - Connected: Indicate whether the TCP socket is connected. + * + * - Writable: Whether the stun binding is completed. Sending a data packet + * before stun binding completed will trigger IPC socket layer to shutdown + * the connection. + * + * - PendingTCP: |connection_pending_| indicates whether there is an + * outstanding TCP connection in progress. + * + * - PretendWri: Tracked by |pretending_to_be_writable_|. Marking connection as + * WRITE_TIMEOUT will cause the connection be deleted. Instead, we're + * "pretending" we're still writable for a period of time such that reconnect + * could work. + * + * Data could only be sent in state 3. Sening data during state 2 & 6 will get + * EWOULDBLOCK, 4 & 5 EPIPE. + * + * 7 -------------+ + * |Connected: N | + * Timeout |Writable: N | Timeout + * +------------------->|Connection is |<----------------+ + * | |Dead | | + * | +--------------+ | + * | ^ | + * | OnClose | | + * | +-----------------------+ | | + * | | | |Timeout | + * | v | | | + * 4 +----------+ 5 -----+--+--+ 6 -----+-----+ + * |Connected: N|Send() or |Connected: N| |Connected: Y| + * |Writable: Y|Ping() |Writable: Y|OnConnect |Writable: Y| + * |PendingTCP:N+--------> |PendingTCP:Y+---------> |PendingTCP:N| + * |PretendWri:Y| |PretendWri:Y| |PretendWri:Y| + * +-----+------+ +------------+ +---+--+-----+ + * ^ ^ | | + * | | OnClose | | + * | +----------------------------------------------+ | + * | | + * | Stun Binding Completed | + * | | + * | OnClose | + * +------------------------------------------------+ | + * | v + * 1 -----------+ 2 -----------+Stun 3 -----------+ + * |Connected: N| |Connected: Y|Binding |Connected: Y| + * |Writable: N|OnConnect |Writable: N|Completed |Writable: Y| + * |PendingTCP:Y+---------> |PendingTCP:N+--------> |PendingTCP:N| + * |PretendWri:N| |PretendWri:N| |PretendWri:N| + * +------------+ +------------+ +------------+ + * + */ + #include "webrtc/p2p/base/tcpport.h" #include "webrtc/p2p/base/common.h" @@ -134,7 +190,16 @@ int TCPPort::SendTo(const void* data, size_t size, const rtc::PacketOptions& options, bool payload) { rtc::AsyncPacketSocket * socket = NULL; - if (TCPConnection * conn = static_cast(GetConnection(addr))) { + TCPConnection* conn = static_cast(GetConnection(addr)); + + // For Connection, this is the code path used by Ping() to establish + // WRITABLE. It has to send through the socket directly as TCPConnection::Send + // checks writability. + if (conn) { + if (!conn->connected()) { + conn->MaybeReconnect(); + return SOCKET_ERROR; + } socket = conn->socket(); } else { socket = GetIncoming(addr); @@ -142,12 +207,15 @@ int TCPPort::SendTo(const void* data, size_t size, if (!socket) { LOG_J(LS_ERROR, this) << "Attempted to send to an unknown destination, " << addr.ToSensitiveString(); - return -1; // TODO: Set error_ + return SOCKET_ERROR; // TODO(tbd): Set error_ } int sent = socket->Send(data, size, options); if (sent < 0) { error_ = socket->GetError(); + // Error from this code path for a Connection (instead of from a bare + // socket) will not trigger reconnecting. In theory, this shouldn't matter + // as OnClose should always be called and set connected to false. LOG_J(LS_ERROR, this) << "TCP send of " << size << " bytes failed with error " << error_; } @@ -222,42 +290,29 @@ void TCPPort::OnAddressReady(rtc::AsyncPacketSocket* socket, ICE_TYPE_PREFERENCE_HOST_TCP, 0, true); } -TCPConnection::TCPConnection(TCPPort* port, const Candidate& candidate, +TCPConnection::TCPConnection(TCPPort* port, + const Candidate& candidate, rtc::AsyncPacketSocket* socket) - : Connection(port, 0, candidate), socket_(socket), error_(0) { - bool outgoing = (socket_ == NULL); - if (outgoing) { - // TODO: Handle failures here (unlikely since TCP). - int opts = (candidate.protocol() == SSLTCP_PROTOCOL_NAME) ? - rtc::PacketSocketFactory::OPT_SSLTCP : 0; - socket_ = port->socket_factory()->CreateClientTcpSocket( - rtc::SocketAddress(port->ip(), 0), - candidate.address(), port->proxy(), port->user_agent(), opts); - if (socket_) { - LOG_J(LS_VERBOSE, this) << "Connecting from " - << socket_->GetLocalAddress().ToSensitiveString() - << " to " - << candidate.address().ToSensitiveString(); - set_connected(false); - socket_->SignalConnect.connect(this, &TCPConnection::OnConnect); - } else { - LOG_J(LS_WARNING, this) << "Failed to create connection to " - << candidate.address().ToSensitiveString(); - } + : Connection(port, 0, candidate), + socket_(socket), + error_(0), + outgoing_(socket == NULL), + connection_pending_(false), + pretending_to_be_writable_(false), + reconnection_timeout_(cricket::CONNECTION_WRITE_CONNECT_TIMEOUT) { + if (outgoing_) { + CreateOutgoingTcpSocket(); } else { // Incoming connections should match the network address. + LOG_J(LS_VERBOSE, this) + << "socket ipaddr: " << socket_->GetLocalAddress().ToString() + << ",port() ip:" << port->ip().ToString(); ASSERT(socket_->GetLocalAddress().ipaddr() == port->ip()); - } - - if (socket_) { - socket_->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket); - socket_->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend); - socket_->SignalClose.connect(this, &TCPConnection::OnClose); + ConnectSocketSignals(socket); } } TCPConnection::~TCPConnection() { - delete socket_; } int TCPConnection::Send(const void* data, size_t size, @@ -267,7 +322,18 @@ int TCPConnection::Send(const void* data, size_t size, return SOCKET_ERROR; } - if (write_state() != STATE_WRITABLE) { + // Sending after OnClose on active side will trigger a reconnect for a + // outgoing connection. Note that the write state is still WRITABLE as we want + // to spend a few seconds attempting a reconnect before saying we're + // unwritable. + if (!connected()) { + MaybeReconnect(); + return SOCKET_ERROR; + } + + // Note that this is important to put this after the previous check to give + // the connection a chance to reconnect. + if (pretending_to_be_writable_ || write_state() != STATE_WRITABLE) { // TODO: Should STATE_WRITE_TIMEOUT return a non-blocking error? error_ = EWOULDBLOCK; return SOCKET_ERROR; @@ -287,6 +353,15 @@ int TCPConnection::GetError() { return error_; } +void TCPConnection::OnConnectionRequestResponse(ConnectionRequest* req, + StunMessage* response) { + // Once we receive a binding response, we are really writable, and not just + // pretending to be writable. + pretending_to_be_writable_ = false; + Connection::OnConnectionRequestResponse(req, response); + ASSERT(write_state() == STATE_WRITABLE); +} + void TCPConnection::OnConnect(rtc::AsyncPacketSocket* socket) { ASSERT(socket == socket_); // Do not use this connection if the socket bound to a different address than @@ -298,6 +373,7 @@ void TCPConnection::OnConnect(rtc::AsyncPacketSocket* socket) { LOG_J(LS_VERBOSE, this) << "Connection established to " << socket->GetRemoteAddress().ToSensitiveString(); set_connected(true); + connection_pending_ = false; } else { LOG_J(LS_WARNING, this) << "Dropping connection as TCP socket bound to IP " << socket_ip.ToSensitiveString() @@ -310,8 +386,48 @@ void TCPConnection::OnConnect(rtc::AsyncPacketSocket* socket) { void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) { ASSERT(socket == socket_); LOG_J(LS_INFO, this) << "Connection closed with error " << error; - set_connected(false); - set_write_state(STATE_WRITE_TIMEOUT); + + // Guard against the condition where IPC socket will call OnClose for every + // packet it can't send. + if (connected()) { + set_connected(false); + pretending_to_be_writable_ = true; + + // We don't attempt reconnect right here. This is to avoid a case where the + // shutdown is intentional and reconnect is not necessary. We only reconnect + // when the connection is used to Send() or Ping(). + port()->thread()->PostDelayed(reconnection_timeout(), this, + MSG_TCPCONNECTION_DELAYED_ONCLOSE); + } +} + +void TCPConnection::OnMessage(rtc::Message* pmsg) { + switch (pmsg->message_id) { + case MSG_TCPCONNECTION_DELAYED_ONCLOSE: + // If this connection can't become connected and writable again in 5 + // seconds, it's time to tear this down. This is the case for the original + // TCP connection on passive side during a reconnect. + if (pretending_to_be_writable_) { + set_write_state(STATE_WRITE_TIMEOUT); + } + break; + default: + Connection::OnMessage(pmsg); + } +} + +void TCPConnection::MaybeReconnect() { + // Only reconnect for an outgoing TCPConnection when OnClose was signaled and + // no outstanding reconnect is pending. + if (connected() || connection_pending_ || !outgoing_) { + return; + } + + LOG_J(LS_INFO, this) << "TCP Connection with remote is closed, " + << "trying to reconnect"; + + CreateOutgoingTcpSocket(); + error_ = EPIPE; } void TCPConnection::OnReadPacket( @@ -327,4 +443,35 @@ void TCPConnection::OnReadyToSend(rtc::AsyncPacketSocket* socket) { Connection::OnReadyToSend(); } +void TCPConnection::CreateOutgoingTcpSocket() { + ASSERT(outgoing_); + // TODO(guoweis): Handle failures here (unlikely since TCP). + int opts = (remote_candidate().protocol() == SSLTCP_PROTOCOL_NAME) + ? rtc::PacketSocketFactory::OPT_SSLTCP + : 0; + socket_.reset(port()->socket_factory()->CreateClientTcpSocket( + rtc::SocketAddress(port()->ip(), 0), remote_candidate().address(), + port()->proxy(), port()->user_agent(), opts)); + if (socket_) { + LOG_J(LS_VERBOSE, this) + << "Connecting from " << socket_->GetLocalAddress().ToSensitiveString() + << " to " << remote_candidate().address().ToSensitiveString(); + set_connected(false); + connection_pending_ = true; + ConnectSocketSignals(socket_.get()); + } else { + LOG_J(LS_WARNING, this) << "Failed to create connection to " + << remote_candidate().address().ToSensitiveString(); + } +} + +void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) { + if (outgoing_) { + socket->SignalConnect.connect(this, &TCPConnection::OnConnect); + } + socket->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket); + socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend); + socket->SignalClose.connect(this, &TCPConnection::OnClose); +} + } // namespace cricket diff --git a/webrtc/p2p/base/tcpport.h b/webrtc/p2p/base/tcpport.h index b3655a8067..d86f750e9f 100644 --- a/webrtc/p2p/base/tcpport.h +++ b/webrtc/p2p/base/tcpport.h @@ -119,9 +119,35 @@ class TCPConnection : public Connection { const rtc::PacketOptions& options); virtual int GetError(); - rtc::AsyncPacketSocket* socket() { return socket_; } + rtc::AsyncPacketSocket* socket() { return socket_.get(); } + + void OnMessage(rtc::Message* pmsg); + + // Allow test cases to overwrite the default timeout period. + int reconnection_timeout() const { return reconnection_timeout_; } + void set_reconnection_timeout(int timeout_in_ms) { + reconnection_timeout_ = timeout_in_ms; + } + + protected: + enum { + MSG_TCPCONNECTION_DELAYED_ONCLOSE = Connection::MSG_FIRST_AVAILABLE, + }; + + // Set waiting_for_stun_binding_complete_ to false to allow data packets in + // addition to what Port::OnConnectionRequestResponse does. + virtual void OnConnectionRequestResponse(ConnectionRequest* req, + StunMessage* response); private: + // Helper function to handle the case when Ping or Send fails with error + // related to socket close. + void MaybeReconnect(); + + void CreateOutgoingTcpSocket(); + + void ConnectSocketSignals(rtc::AsyncPacketSocket* socket); + void OnConnect(rtc::AsyncPacketSocket* socket); void OnClose(rtc::AsyncPacketSocket* socket, int error); void OnReadPacket(rtc::AsyncPacketSocket* socket, @@ -130,8 +156,23 @@ class TCPConnection : public Connection { const rtc::PacketTime& packet_time); void OnReadyToSend(rtc::AsyncPacketSocket* socket); - rtc::AsyncPacketSocket* socket_; + rtc::scoped_ptr socket_; int error_; + bool outgoing_; + + // Guard against multiple outgoing tcp connection during a reconnect. + bool connection_pending_; + + // Guard against data packets sent when we reconnect a TCP connection. During + // reconnecting, when a new tcp connection has being made, we can't send data + // packets out until the STUN binding is completed (i.e. the write state is + // set to WRITABLE again by Connection::OnConnectionRequestResponse). IPC + // socket, when receiving data packets before that, will trigger OnError which + // will terminate the newly created connection. + bool pretending_to_be_writable_; + + // Allow test case to overwrite the default timeout period. + int reconnection_timeout_; friend class TCPPort; }; diff --git a/webrtc/p2p/base/transportchannel.cc b/webrtc/p2p/base/transportchannel.cc index 16ae27d164..a4e8cb8df5 100644 --- a/webrtc/p2p/base/transportchannel.cc +++ b/webrtc/p2p/base/transportchannel.cc @@ -9,6 +9,7 @@ */ #include +#include "webrtc/p2p/base/common.h" #include "webrtc/p2p/base/transportchannel.h" namespace cricket { @@ -32,6 +33,8 @@ void TransportChannel::set_readable(bool readable) { void TransportChannel::set_writable(bool writable) { if (writable_ != writable) { + LOG_J(LS_VERBOSE, this) << "set_writable from:" << writable_ << " to " + << writable; writable_ = writable; if (writable_) { SignalReadyToSend(this);