From 3d77deb29c15bfb8f794ef3413837a0ec0f0c131 Mon Sep 17 00:00:00 2001 From: Honghai Zhang Date: Wed, 22 Jun 2016 15:22:22 -0700 Subject: [PATCH] Do not delete a connection in the turn port with permission error, refresh error, or binding error. Even if those error happened, the connection may still be able to receive packets for a while. If we delete the connections, all packets arriving will be dropped. BUG=webrtc:6007 R=deadbeef@webrtc.org, pthatcher@webrtc.org Review URL: https://codereview.webrtc.org/2068263003 . Cr-Commit-Position: refs/heads/master@{#13262} --- webrtc/base/virtualsocketserver.cc | 2 +- webrtc/p2p/base/p2ptransportchannel.cc | 5 + .../p2p/base/p2ptransportchannel_unittest.cc | 18 +++ webrtc/p2p/base/port.cc | 5 + webrtc/p2p/base/port.h | 4 + webrtc/p2p/base/turnport.cc | 51 +++++---- webrtc/p2p/base/turnport.h | 13 ++- webrtc/p2p/base/turnport_unittest.cc | 107 ++++++++++++------ 8 files changed, 142 insertions(+), 63 deletions(-) diff --git a/webrtc/base/virtualsocketserver.cc b/webrtc/base/virtualsocketserver.cc index 8b8d050728..d871d4c137 100644 --- a/webrtc/base/virtualsocketserver.cc +++ b/webrtc/base/virtualsocketserver.cc @@ -511,7 +511,7 @@ VirtualSocketServer::VirtualSocketServer(SocketServer* ss) server_owned_(false), msg_queue_(NULL), stop_on_idle_(false), - network_delay_(TimeMillis()), + network_delay_(0), next_ipv4_(kInitialNextIPv4), next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort), diff --git a/webrtc/p2p/base/p2ptransportchannel.cc b/webrtc/p2p/base/p2ptransportchannel.cc index 78cb7045f8..5c1f85dfa0 100644 --- a/webrtc/p2p/base/p2ptransportchannel.cc +++ b/webrtc/p2p/base/p2ptransportchannel.cc @@ -1349,6 +1349,11 @@ bool P2PTransportChannel::IsPingable(Connection* conn, int64_t now) { return false; } + // A failed connection will not be pinged. + if (conn->state() == Connection::STATE_FAILED) { + return false; + } + // An never connected connection cannot be written to at all, so pinging is // out of the question. However, if it has become WRITABLE, it is in the // reconnecting state so ping is needed. diff --git a/webrtc/p2p/base/p2ptransportchannel_unittest.cc b/webrtc/p2p/base/p2ptransportchannel_unittest.cc index e5e63a3fa0..87be379ffb 100644 --- a/webrtc/p2p/base/p2ptransportchannel_unittest.cc +++ b/webrtc/p2p/base/p2ptransportchannel_unittest.cc @@ -2278,6 +2278,24 @@ TEST_F(P2PTransportChannelPingTest, TestNoTriggeredChecksWhenWritable) { EXPECT_EQ(conn2, FindNextPingableConnectionAndPingIt(&ch)); } +TEST_F(P2PTransportChannelPingTest, TestFailedConnectionNotPingable) { + cricket::FakePortAllocator pa(rtc::Thread::Current(), nullptr); + cricket::P2PTransportChannel ch("Do not ping failed connections", 1, &pa); + PrepareChannel(&ch); + ch.Connect(); + ch.MaybeStartGathering(); + ch.AddRemoteCandidate(CreateHostCandidate("1.1.1.1", 1, 1)); + + cricket::Connection* conn1 = WaitForConnectionTo(&ch, "1.1.1.1", 1); + ASSERT_TRUE(conn1 != nullptr); + + EXPECT_EQ(conn1, ch.FindNextPingableConnection()); + conn1->Prune(); // A pruned connection may still be pingable. + EXPECT_EQ(conn1, ch.FindNextPingableConnection()); + conn1->FailAndPrune(); + EXPECT_TRUE(nullptr == ch.FindNextPingableConnection()); +} + TEST_F(P2PTransportChannelPingTest, TestSignalStateChanged) { cricket::FakePortAllocator pa(rtc::Thread::Current(), nullptr); cricket::P2PTransportChannel ch("state change", 1, &pa); diff --git a/webrtc/p2p/base/port.cc b/webrtc/p2p/base/port.cc index 6e26371dae..d4dfd91eca 100644 --- a/webrtc/p2p/base/port.cc +++ b/webrtc/p2p/base/port.cc @@ -1090,6 +1090,11 @@ void Connection::FailAndDestroy() { Destroy(); } +void Connection::FailAndPrune() { + set_state(Connection::STATE_FAILED); + Prune(); +} + void Connection::PrintPingsSinceLastResponse(std::string* s, size_t max) { std::ostringstream oss; oss << std::boolalpha; diff --git a/webrtc/p2p/base/port.h b/webrtc/p2p/base/port.h index 65932b9c9f..5d95b80a5b 100644 --- a/webrtc/p2p/base/port.h +++ b/webrtc/p2p/base/port.h @@ -523,6 +523,10 @@ class Connection : public CandidatePairInterface, // Makes the connection go away, in a failed state. void FailAndDestroy(); + // Prunes the connection and sets its state to STATE_FAILED, + // It will not be used or send pings although it can still receive packets. + void FailAndPrune(); + // Checks that the state of this connection is up-to-date. The argument is // the current time, which is compared against various timeouts. void UpdateState(int64_t now); diff --git a/webrtc/p2p/base/turnport.cc b/webrtc/p2p/base/turnport.cc index ff0c2922de..40368110e3 100644 --- a/webrtc/p2p/base/turnport.cc +++ b/webrtc/p2p/base/turnport.cc @@ -449,7 +449,7 @@ Connection* TurnPort::CreateConnection(const Candidate& address, return NULL; } - if (state_ == STATE_DISCONNECTED) { + if (state_ == STATE_DISCONNECTED || state_ == STATE_RECEIVEONLY) { return NULL; } @@ -469,10 +469,10 @@ Connection* TurnPort::CreateConnection(const Candidate& address, return NULL; } -bool TurnPort::DestroyConnection(const rtc::SocketAddress& address) { +bool TurnPort::FailAndPruneConnection(const rtc::SocketAddress& address) { Connection* conn = GetConnection(address); if (conn != nullptr) { - conn->Destroy(); + conn->FailAndPrune(); return true; } return false; @@ -561,7 +561,7 @@ bool TurnPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket, if (state_ == STATE_DISCONNECTED) { LOG_J(LS_WARNING, this) - << "Received TURN message while the Turn port is disconnected"; + << "Received TURN message while the TURN port is disconnected"; return false; } @@ -739,13 +739,22 @@ void TurnPort::OnAllocateError() { thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATE_ERROR); } -void TurnPort::OnTurnRefreshError() { - // Need to Close the port asynchronously because otherwise, the refresh +void TurnPort::OnRefreshError() { + // Need to clear the requests asynchronously because otherwise, the refresh // request may be deleted twice: once at the end of the message processing - // and the other in Close(). + // and the other in HandleRefreshError(). thread()->Post(RTC_FROM_HERE, this, MSG_REFRESH_ERROR); } +void TurnPort::HandleRefreshError() { + request_manager_.Clear(); + state_ = STATE_RECEIVEONLY; + // Fail and prune all connections; stop sending data. + for (auto kv : connections()) { + kv.second->FailAndPrune(); + } +} + void TurnPort::Close() { if (!ready()) { OnAllocateError(); @@ -768,7 +777,7 @@ void TurnPort::OnMessage(rtc::Message* message) { OnAllocateMismatch(); break; case MSG_REFRESH_ERROR: - Close(); + HandleRefreshError(); break; case MSG_TRY_ALTERNATE_SERVER: if (server_address().proto == PROTO_UDP) { @@ -1277,14 +1286,14 @@ void TurnRefreshRequest::OnErrorResponse(StunMessage* response) { << ", id=" << rtc::hex_encode(id()) << ", code=" << error_code->code() << ", rtt=" << Elapsed(); - port_->OnTurnRefreshError(); + port_->OnRefreshError(); port_->SignalTurnRefreshResult(port_, error_code->code()); } } void TurnRefreshRequest::OnTimeout() { LOG_J(LS_WARNING, port_) << "TURN refresh timeout " << rtc::hex_encode(id()); - port_->OnTurnRefreshError(); + port_->OnRefreshError(); } TurnCreatePermissionRequest::TurnCreatePermissionRequest( @@ -1491,20 +1500,18 @@ void TurnEntry::OnCreatePermissionError(StunMessage* response, int code) { SendCreatePermissionRequest(0); } } else { - port_->DestroyConnection(ext_addr_); + bool found = port_->FailAndPruneConnection(ext_addr_); + if (found) { + LOG(LS_ERROR) << "Received TURN CreatePermission error response, " + << "code=" << code << "; pruned connection."; + } // Send signal with error code. port_->SignalCreatePermissionResult(port_, ext_addr_, code); - Connection* c = port_->GetConnection(ext_addr_); - if (c) { - LOG_J(LS_ERROR, c) << "Received TURN CreatePermission error response, " - << "code=" << code << "; killing connection."; - c->FailAndDestroy(); - } } } void TurnEntry::OnCreatePermissionTimeout() { - port_->DestroyConnection(ext_addr_); + port_->FailAndPruneConnection(ext_addr_); } void TurnEntry::OnChannelBindSuccess() { @@ -1516,8 +1523,8 @@ void TurnEntry::OnChannelBindSuccess() { void TurnEntry::OnChannelBindError(StunMessage* response, int code) { // If the channel bind fails due to errors other than STATE_NONCE, - // we just destroy the connection and rely on ICE restart to re-establish - // the connection. + // we will fail and prune the connection and rely on ICE restart to + // re-establish a new connection if needed. if (code == STUN_ERROR_STALE_NONCE) { if (port_->UpdateNonce(response)) { // Send channel bind request with fresh nonce. @@ -1525,11 +1532,11 @@ void TurnEntry::OnChannelBindError(StunMessage* response, int code) { } } else { state_ = STATE_UNBOUND; - port_->DestroyConnection(ext_addr_); + port_->FailAndPruneConnection(ext_addr_); } } void TurnEntry::OnChannelBindTimeout() { state_ = STATE_UNBOUND; - port_->DestroyConnection(ext_addr_); + port_->FailAndPruneConnection(ext_addr_); } } // namespace cricket diff --git a/webrtc/p2p/base/turnport.h b/webrtc/p2p/base/turnport.h index 17d2d45181..6e528aaf69 100644 --- a/webrtc/p2p/base/turnport.h +++ b/webrtc/p2p/base/turnport.h @@ -38,7 +38,9 @@ class TurnPort : public Port { STATE_CONNECTING, // Initial state, cannot send any packets. STATE_CONNECTED, // Socket connected, ready to send stun requests. STATE_READY, // Received allocate success, can send any packets. - STATE_DISCONNECTED, // TCP connection died, cannot send any packets. + STATE_RECEIVEONLY, // Had REFRESH_REQUEST error, cannot send any packets. + STATE_DISCONNECTED, // TCP connection died, cannot send/receive any + // packets. }; static TurnPort* Create(rtc::Thread* thread, rtc::PacketSocketFactory* factory, @@ -202,7 +204,8 @@ class TurnPort : public Port { } } - void OnTurnRefreshError(); + void OnRefreshError(); + void HandleRefreshError(); bool SetAlternateServer(const rtc::SocketAddress& address); void ResolveTurnAddress(const rtc::SocketAddress& address); void OnResolveResult(rtc::AsyncResolverInterface* resolver); @@ -245,9 +248,9 @@ class TurnPort : public Port { void ScheduleEntryDestruction(TurnEntry* entry); void CancelEntryDestruction(TurnEntry* entry); - // Destroys the connection with remote address |address|. Returns true if - // a connection is found and destroyed. - bool DestroyConnection(const rtc::SocketAddress& address); + // Marks the connection with remote address |address| failed and + // pruned (a.k.a. write-timed-out). Returns true if a connection is found. + bool FailAndPruneConnection(const rtc::SocketAddress& address); ProtocolAddress server_address_; RelayCredentials credentials_; diff --git a/webrtc/p2p/base/turnport_unittest.cc b/webrtc/p2p/base/turnport_unittest.cc index 0e415a2814..d6d75aad09 100644 --- a/webrtc/p2p/base/turnport_unittest.cc +++ b/webrtc/p2p/base/turnport_unittest.cc @@ -195,7 +195,6 @@ class TurnPortTest : public testing::Test, const rtc::PacketTime& packet_time) { udp_packets_.push_back(rtc::Buffer(data, size)); } - void OnConnectionDestroyed(Connection* conn) { connection_destroyed_ = true; } void OnSocketReadPacket(rtc::AsyncPacketSocket* socket, const char* data, size_t size, const rtc::SocketAddress& remote_addr, @@ -282,9 +281,6 @@ class TurnPortTest : public testing::Test, turn_port_->SignalTurnRefreshResult.connect( this, &TurnPortTest::OnTurnRefreshResult); } - void ConnectConnectionDestroyedSignal(Connection* conn) { - conn->SignalDestroyed.connect(this, &TurnPortTest::OnConnectionDestroyed); - } void CreateUdpPort() { CreateUdpPort(kLocalAddr2); } @@ -309,10 +305,23 @@ class TurnPortTest : public testing::Test, ASSERT_TRUE_WAIT(udp_ready_, kTimeout); } - bool CheckConnectionDestroyed() { - turn_port_->FlushRequests(cricket::kAllRequests); - rtc::Thread::Current()->ProcessMessages(50); - return connection_destroyed_; + bool CheckConnectionFailedAndPruned(Connection* conn) { + return conn && !conn->active() && conn->state() == Connection::STATE_FAILED; + } + + // Checks that |turn_port_| has a nonempty set of connections and they are all + // failed and pruned. + bool CheckAllConnectionsFailedAndPruned() { + auto& connections = turn_port_->connections(); + if (connections.empty()) { + return false; + } + for (auto kv : connections) { + if (!CheckConnectionFailedAndPruned(kv.second)) { + return false; + } + } + return true; } void TestTurnAlternateServer(cricket::ProtocolType protocol_type) { @@ -524,7 +533,6 @@ class TurnPortTest : public testing::Test, bool udp_ready_; bool test_finish_; bool turn_refresh_success_ = false; - bool connection_destroyed_ = false; std::vector turn_packets_; std::vector udp_packets_; rtc::PacketOptions options; @@ -733,6 +741,7 @@ TEST_F(TurnPortTest, TestTurnTcpAllocateMismatch) { } TEST_F(TurnPortTest, TestRefreshRequestGetsErrorResponse) { + rtc::ScopedFakeClock clock; CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr); PrepareTurnAndUdpPorts(); turn_port_->CreateConnection(udp_port_->Candidates()[0], @@ -745,13 +754,14 @@ TEST_F(TurnPortTest, TestRefreshRequestGetsErrorResponse) { // When this succeeds, it will schedule a new RefreshRequest with the bad // credential. turn_port_->FlushRequests(cricket::TURN_REFRESH_REQUEST); - EXPECT_TRUE_WAIT(turn_refresh_success_, kTimeout); + EXPECT_TRUE_SIMULATED_WAIT(turn_refresh_success_, kTimeout, clock); // Flush it again, it will receive a bad response. turn_port_->FlushRequests(cricket::TURN_REFRESH_REQUEST); - EXPECT_TRUE_WAIT(!turn_refresh_success_, kTimeout); - EXPECT_TRUE_WAIT(!turn_port_->connected(), kTimeout); - EXPECT_TRUE_WAIT(turn_port_->connections().empty(), kTimeout); - EXPECT_FALSE(turn_port_->HasRequests()); + EXPECT_TRUE_SIMULATED_WAIT(!turn_refresh_success_, kTimeout, clock); + EXPECT_TRUE_SIMULATED_WAIT(!turn_port_->connected(), kTimeout, clock); + EXPECT_TRUE_SIMULATED_WAIT(CheckAllConnectionsFailedAndPruned(), kTimeout, + clock); + EXPECT_TRUE_SIMULATED_WAIT(!turn_port_->HasRequests(), kTimeout, clock); } // Test that TurnPort will not handle any incoming packets once it has been @@ -796,6 +806,20 @@ TEST_F(TurnPortTest, TestCreateConnectionWhenSocketClosed) { ASSERT_TRUE(conn1 == NULL); } +// Tests that when a TCP socket is closed, the respective TURN connection will +// be destroyed. +TEST_F(TurnPortTest, TestSocketCloseWillDestroyConnection) { + turn_server_.AddInternalSocket(kTurnTcpIntAddr, cricket::PROTO_TCP); + CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr); + PrepareTurnAndUdpPorts(); + Connection* conn = turn_port_->CreateConnection(udp_port_->Candidates()[0], + Port::ORIGIN_MESSAGE); + EXPECT_NE(nullptr, conn); + EXPECT_TRUE(!turn_port_->connections().empty()); + turn_port_->socket()->SignalClose(turn_port_->socket(), 1); + EXPECT_TRUE_WAIT(turn_port_->connections().empty(), kTimeout); +} + // Test try-alternate-server feature. TEST_F(TurnPortTest, TestTurnAlternateServerUDP) { TestTurnAlternateServer(cricket::PROTO_UDP); @@ -899,9 +923,8 @@ TEST_F(TurnPortTest, TestRefreshCreatePermissionRequest) { Connection* conn = turn_port_->CreateConnection(udp_port_->Candidates()[0], Port::ORIGIN_MESSAGE); - ConnectConnectionDestroyedSignal(conn); ASSERT_TRUE(conn != NULL); - ASSERT_TRUE_WAIT(turn_create_permission_success_, kTimeout); + EXPECT_TRUE_WAIT(turn_create_permission_success_, kTimeout); turn_create_permission_success_ = false; // A create-permission-request should be pending. // After the next create-permission-response is received, it will schedule @@ -909,14 +932,15 @@ TEST_F(TurnPortTest, TestRefreshCreatePermissionRequest) { cricket::RelayCredentials bad_credentials("bad_user", "bad_pwd"); turn_port_->set_credentials(bad_credentials); turn_port_->FlushRequests(cricket::kAllRequests); - ASSERT_TRUE_WAIT(turn_create_permission_success_, kTimeout); + EXPECT_TRUE_WAIT(turn_create_permission_success_, kTimeout); // Flush the requests again; the create-permission-request will fail. turn_port_->FlushRequests(cricket::kAllRequests); EXPECT_TRUE_WAIT(!turn_create_permission_success_, kTimeout); - EXPECT_TRUE_WAIT(connection_destroyed_, kTimeout); + EXPECT_TRUE_WAIT(CheckConnectionFailedAndPruned(conn), kTimeout); } TEST_F(TurnPortTest, TestChannelBindGetErrorResponse) { + rtc::ScopedFakeClock clock; CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr); PrepareTurnAndUdpPorts(); Connection* conn1 = turn_port_->CreateConnection(udp_port_->Candidates()[0], @@ -924,18 +948,25 @@ TEST_F(TurnPortTest, TestChannelBindGetErrorResponse) { ASSERT_TRUE(conn1 != nullptr); Connection* conn2 = udp_port_->CreateConnection(turn_port_->Candidates()[0], Port::ORIGIN_MESSAGE); - ASSERT_TRUE(conn2 != nullptr); - ConnectConnectionDestroyedSignal(conn1); - conn1->Ping(0); - ASSERT_TRUE_WAIT(conn1->writable(), kTimeout); - std::string data = "ABC"; - conn1->Send(data.data(), data.length(), options); + ASSERT_TRUE(conn2 != nullptr); + conn1->Ping(0); + EXPECT_TRUE_SIMULATED_WAIT(conn1->writable(), kTimeout, clock); bool success = turn_port_->SetEntryChannelId(udp_port_->Candidates()[0].address(), -1); ASSERT_TRUE(success); - // Next time when the binding request is sent, it will get an ErrorResponse. - EXPECT_TRUE_WAIT(CheckConnectionDestroyed(), kTimeout); + + std::string data = "ABC"; + conn1->Send(data.data(), data.length(), options); + + EXPECT_TRUE_SIMULATED_WAIT(CheckConnectionFailedAndPruned(conn1), kTimeout, + clock); + // Verify that no packet can be sent after a bind request error. + conn2->SignalReadPacket.connect(static_cast(this), + &TurnPortTest::OnUdpReadPacket); + conn1->Send(data.data(), data.length(), options); + SIMULATED_WAIT(!udp_packets_.empty(), kTimeout, clock); + EXPECT_TRUE(udp_packets_.empty()); } // Do a TURN allocation, establish a UDP connection, and send some data. @@ -995,26 +1026,32 @@ TEST_F(TurnPortTest, TestOriginHeader) { } // Test that a CreatePermission failure will result in the connection being -// destroyed. -TEST_F(TurnPortTest, TestConnectionDestroyedOnCreatePermissionFailure) { +// pruned and failed. +TEST_F(TurnPortTest, TestConnectionFaildAndPrunedOnCreatePermissionFailure) { + rtc::ScopedFakeClock clock; + SIMULATED_WAIT(false, 101, clock); turn_server_.AddInternalSocket(kTurnTcpIntAddr, cricket::PROTO_TCP); turn_server_.server()->set_reject_private_addresses(true); CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr); turn_port_->PrepareAddress(); - ASSERT_TRUE_WAIT(turn_ready_, kTimeout); + EXPECT_TRUE_SIMULATED_WAIT(turn_ready_, kTimeout, clock); CreateUdpPort(SocketAddress("10.0.0.10", 0)); udp_port_->PrepareAddress(); - ASSERT_TRUE_WAIT(udp_ready_, kTimeout); + EXPECT_TRUE_SIMULATED_WAIT(udp_ready_, kTimeout, clock); // Create a connection. TestConnectionWrapper conn(turn_port_->CreateConnection( udp_port_->Candidates()[0], Port::ORIGIN_MESSAGE)); - ASSERT_TRUE(conn.connection() != nullptr); + EXPECT_TRUE(conn.connection() != nullptr); - // Asynchronously, CreatePermission request should be sent and fail, closing - // the connection. - EXPECT_TRUE_WAIT(conn.connection() == nullptr, kTimeout); - EXPECT_FALSE(turn_create_permission_success_); + // Asynchronously, CreatePermission request should be sent and fail, which + // will make the connection pruned and failed. + EXPECT_TRUE_SIMULATED_WAIT(CheckConnectionFailedAndPruned(conn.connection()), + kTimeout, clock); + EXPECT_TRUE_SIMULATED_WAIT(!turn_create_permission_success_, kTimeout, clock); + // Check that the connection is not deleted asynchronously. + SIMULATED_WAIT(conn.connection() == nullptr, kTimeout, clock); + EXPECT_TRUE(conn.connection() != nullptr); } // Test that a TURN allocation is released when the port is closed.