From aae2784c1fab9d1510393dec15d76caa574e2da8 Mon Sep 17 00:00:00 2001 From: honghaiz Date: Mon, 10 Oct 2016 16:00:41 -0700 Subject: [PATCH] Prune connections based on network name. Previously we prune connections on the same network pointer. So if an IPv6 and an IPv4 network are on the same network interface, IPv4 connection won't be pruned even if an IPv6 connection with higher priority becomes writable. With this change, as long as one connection becomes writable, all connections having lower priority with the same network name will be pruned. Also simplify the implementation. BUG=webrtc:6512 Review-Url: https://codereview.webrtc.org/2395243005 Cr-Commit-Position: refs/heads/master@{#14593} --- webrtc/p2p/base/fakeportallocator.h | 60 ++++++++++----- webrtc/p2p/base/p2ptransportchannel.cc | 73 +++++++------------ webrtc/p2p/base/p2ptransportchannel.h | 1 - .../p2p/base/p2ptransportchannel_unittest.cc | 49 ++++++++++--- 4 files changed, 107 insertions(+), 76 deletions(-) diff --git a/webrtc/p2p/base/fakeportallocator.h b/webrtc/p2p/base/fakeportallocator.h index a1dbbf872b..a4d6fcfa52 100644 --- a/webrtc/p2p/base/fakeportallocator.h +++ b/webrtc/p2p/base/fakeportallocator.h @@ -94,7 +94,8 @@ class FakePortAllocatorSession : public PortAllocatorSession { const std::string& content_name, int component, const std::string& ice_ufrag, - const std::string& ice_pwd) + const std::string& ice_pwd, + bool ipv6_enabled) : PortAllocatorSession(content_name, component, ice_ufrag, @@ -110,10 +111,10 @@ class FakePortAllocatorSession : public PortAllocatorSession { "unittest", rtc::IPAddress(in6addr_loopback), 64), - port_(), port_config_count_(0), stun_servers_(allocator->stun_servers()), - turn_servers_(allocator->turn_servers()) { + turn_servers_(allocator->turn_servers()), + ipv6_enabled_(ipv6_enabled) { ipv4_network_.AddIP(rtc::IPAddress(INADDR_LOOPBACK)); ipv6_network_.AddIP(rtc::IPAddress(in6addr_loopback)); } @@ -122,18 +123,20 @@ class FakePortAllocatorSession : public PortAllocatorSession { candidate_filter_ = filter; } + Port* CreatePort(rtc::Network* network) { + Port* port = TestUDPPort::Create(network_thread_, factory_, network, + network->GetBestIP(), 0, 0, username(), + password(), std::string(), false); + AddPort(port); + return port; + } + void StartGettingPorts() override { - if (!port_) { - rtc::Network& network = - (rtc::HasIPv6Enabled() && (flags() & PORTALLOCATOR_ENABLE_IPV6)) - ? ipv6_network_ - : ipv4_network_; - port_.reset(TestUDPPort::Create(network_thread_, factory_, &network, - network.GetBestIP(), 0, 0, username(), - password(), std::string(), false)); - port_->SignalDestroyed.connect( - this, &FakePortAllocatorSession::OnPortDestroyed); - AddPort(port_.get()); + if (!ipv4_port_) { + ipv4_port_.reset(CreatePort(&ipv4_network_)); + } + if (!ipv6_port_ && ipv6_enabled_ && (flags() & PORTALLOCATOR_ENABLE_IPV6)) { + ipv6_port_.reset(CreatePort(&ipv6_network_)); } ++port_config_count_; running_ = true; @@ -149,7 +152,14 @@ class FakePortAllocatorSession : public PortAllocatorSession { std::vector ReadyCandidates() const override { return candidates_; } - void PruneAllPorts() override { port_->Prune(); } + void PruneAllPorts() override { + if (ipv4_port_) { + ipv4_port_->Prune(); + } + if (ipv6_port_) { + ipv6_port_->Prune(); + } + } bool CandidatesAllocationDone() const override { return allocation_done_; } int port_config_count() { return port_config_count_; } @@ -179,6 +189,8 @@ class FakePortAllocatorSession : public PortAllocatorSession { port->set_generation(generation()); port->SignalPortComplete.connect(this, &FakePortAllocatorSession::OnPortComplete); + port->SignalDestroyed.connect(this, + &FakePortAllocatorSession::OnPortDestroyed); port->PrepareAddress(); ready_ports_.push_back(port); SignalPortReady(this, port); @@ -194,14 +206,19 @@ class FakePortAllocatorSession : public PortAllocatorSession { } void OnPortDestroyed(cricket::PortInterface* port) { // Don't want to double-delete port if it deletes itself. - port_.release(); + if (port == ipv4_port_.get()) { + ipv4_port_.release(); + } else if (port == ipv6_port_.get()) { + ipv6_port_.release(); + } } rtc::Thread* network_thread_; rtc::PacketSocketFactory* factory_; rtc::Network ipv4_network_; rtc::Network ipv6_network_; - std::unique_ptr port_; + std::unique_ptr ipv4_port_; + std::unique_ptr ipv6_port_; int port_config_count_; std::vector candidates_; std::vector ready_ports_; @@ -210,6 +227,7 @@ class FakePortAllocatorSession : public PortAllocatorSession { std::vector turn_servers_; uint32_t candidate_filter_ = CF_ALL; int transport_info_update_count_ = 0; + bool ipv6_enabled_; bool running_ = false; }; @@ -222,6 +240,7 @@ class FakePortAllocator : public cricket::PortAllocator { owned_factory_.reset(new rtc::BasicPacketSocketFactory(network_thread_)); factory_ = owned_factory_.get(); } + ipv6_enabled_ = rtc::HasIPv6Enabled(); } void Initialize() override { @@ -232,6 +251,10 @@ class FakePortAllocator : public cricket::PortAllocator { void SetNetworkIgnoreMask(int network_ignore_mask) override {} + // Sometimes we can ignore the value returned by rtc::HasIpv6Enabled because + // we are using the virtual socket server. + void set_ipv6_enabled(bool ipv6_enabled) { ipv6_enabled_ = ipv6_enabled; } + cricket::PortAllocatorSession* CreateSessionInternal( const std::string& content_name, int component, @@ -239,7 +262,7 @@ class FakePortAllocator : public cricket::PortAllocator { const std::string& ice_pwd) override { return new FakePortAllocatorSession(this, network_thread_, factory_, content_name, component, ice_ufrag, - ice_pwd); + ice_pwd, ipv6_enabled_); } bool initialized() const { return initialized_; } @@ -249,6 +272,7 @@ class FakePortAllocator : public cricket::PortAllocator { rtc::PacketSocketFactory* factory_; std::unique_ptr owned_factory_; bool initialized_ = false; + bool ipv6_enabled_ = false; }; } // namespace cricket diff --git a/webrtc/p2p/base/p2ptransportchannel.cc b/webrtc/p2p/base/p2ptransportchannel.cc index 6bcf0d7d2b..0458baf3ef 100644 --- a/webrtc/p2p/base/p2ptransportchannel.cc +++ b/webrtc/p2p/base/p2ptransportchannel.cc @@ -1306,34 +1306,35 @@ void P2PTransportChannel::SortConnectionsAndUpdateState() { } void P2PTransportChannel::PruneConnections() { - // We can prune any connection for which there is a connected, writable - // connection on the same network with better or equal priority. We leave - // those with better priority just in case they become writable later (at - // which point, we would prune out the current selected connection). We leave - // connections on other networks because they may not be using the same - // resources and they may represent very distinct paths over which we can - // switch. If the |premier| connection is not connected, we may be - // reconnecting a TCP connection and temporarily do not prune connections in - // this network. See the big comment in CompareConnectionStates. + // We can prune any connection for which there is a connected, writable, + // and receiving connection with the same network name with better or equal + // priority. We leave those with better priority just in case they become + // writable later (at which point, we would prune out the current selected + // connection). We leave connections on other networks because they may not + // be using the same resources and they may represent very distinct paths + // over which we can switch. If the |premier| connection is not connected, + // we may be reconnecting a TCP connection and temporarily do not prune + // connections in this network. See the big comment in + // CompareConnectionStates. - // Get a list of the networks that we are using. - std::set networks; - for (const Connection* conn : connections_) { - networks.insert(conn->port()->Network()); + std::map premier_connection_by_network_name; + if (selected_connection_) { + // |selected_connection_| is always a premier connection. + const std::string& network_name = + selected_connection_->port()->Network()->name(); + premier_connection_by_network_name[network_name] = selected_connection_; } - for (rtc::Network* network : networks) { - Connection* premier = GetBestConnectionOnNetwork(network); - // Do not prune connections if the current selected connection is weak on - // this network. Otherwise, it may delete connections prematurely. - if (!premier || premier->weak()) { - continue; - } - - for (Connection* conn : connections_) { - if ((conn != premier) && (conn->port()->Network() == network) && - (CompareConnectionCandidates(premier, conn) >= 0)) { - conn->Prune(); - } + for (Connection* conn : connections_) { + const std::string& network_name = conn->port()->Network()->name(); + Connection* premier = premier_connection_by_network_name[network_name]; + // Since the connections are sorted, the first one with a given network name + // is the premier connection for the network name. + // |premier| might be equal to |conn| if this is the selected connection. + if (premier == nullptr) { + premier_connection_by_network_name[network_name] = conn; + } else if (premier != conn && !premier->weak() && + CompareConnectionCandidates(premier, conn) >= 0) { + conn->Prune(); } } } @@ -1471,26 +1472,6 @@ bool P2PTransportChannel::ReadyToSend(Connection* connection) const { PresumedWritable(connection)); } -// If we have a selected connection, return it, otherwise return top one in the -// list (later we will mark it best). -Connection* P2PTransportChannel::GetBestConnectionOnNetwork( - rtc::Network* network) const { - // If the selected connection is on this network, then it wins. - if (selected_connection_ && - (selected_connection_->port()->Network() == network)) { - return selected_connection_; - } - - // Otherwise, we return the top-most in sorted order. - for (size_t i = 0; i < connections_.size(); ++i) { - if (connections_[i]->port()->Network() == network) { - return connections_[i]; - } - } - - return NULL; -} - // Handle any queued up requests void P2PTransportChannel::OnMessage(rtc::Message *pmsg) { switch (pmsg->message_id) { diff --git a/webrtc/p2p/base/p2ptransportchannel.h b/webrtc/p2p/base/p2ptransportchannel.h index 672abb7423..3962f4d19d 100644 --- a/webrtc/p2p/base/p2ptransportchannel.h +++ b/webrtc/p2p/base/p2ptransportchannel.h @@ -248,7 +248,6 @@ class P2PTransportChannel : public TransportChannelImpl, void MaybeStopPortAllocatorSessions(); TransportChannelState ComputeState() const; - Connection* GetBestConnectionOnNetwork(rtc::Network* network) const; bool CreateConnections(const Candidate& remote_candidate, PortInterface* origin_port); bool CreateConnection(PortInterface* port, diff --git a/webrtc/p2p/base/p2ptransportchannel_unittest.cc b/webrtc/p2p/base/p2ptransportchannel_unittest.cc index 7e65b4baba..76da261b12 100644 --- a/webrtc/p2p/base/p2ptransportchannel_unittest.cc +++ b/webrtc/p2p/base/p2ptransportchannel_unittest.cc @@ -2813,7 +2813,7 @@ class P2PTransportChannelPingTest : public testing::Test, return GetConnectionTo(ch, ip, port_num); } - Port* GetPort(P2PTransportChannel* ch) { + Port* GetFirstPort(P2PTransportChannel* ch) { if (ch->ports().empty()) { return nullptr; } @@ -2830,11 +2830,13 @@ class P2PTransportChannelPingTest : public testing::Test, Connection* GetConnectionTo(P2PTransportChannel* ch, const std::string& ip, int port_num) { - Port* port = GetPort(ch); - if (!port) { - return nullptr; + for (PortInterface* port : ch->ports()) { + Connection* conn = port->GetConnection(rtc::SocketAddress(ip, port_num)); + if (conn != nullptr) { + return conn; + } } - return port->GetConnection(rtc::SocketAddress(ip, port_num)); + return nullptr; } Connection* FindNextPingableConnectionAndPingIt(P2PTransportChannel* ch) { @@ -3091,7 +3093,7 @@ TEST_F(P2PTransportChannelPingTest, PingingStartedAsSoonAsPossible) { uint32_t prflx_priority = ICE_TYPE_PREFERENCE_PRFLX << 24; request.AddAttribute( new StunUInt32Attribute(STUN_ATTR_PRIORITY, prflx_priority)); - Port* port = GetPort(&ch); + Port* port = GetFirstPort(&ch); ASSERT_NE(nullptr, port); port->SignalUnknownAddress(port, rtc::SocketAddress("1.1.1.1", 1), PROTO_UDP, &request, kIceUfrag[1], false); @@ -3260,7 +3262,7 @@ TEST_F(P2PTransportChannelPingTest, ConnectionResurrection) { new StunUInt32Attribute(STUN_ATTR_PRIORITY, prflx_priority)); EXPECT_NE(prflx_priority, remote_priority); - Port* port = GetPort(&ch); + Port* port = GetFirstPort(&ch); // conn1 should be resurrected with original priority. port->SignalUnknownAddress(port, rtc::SocketAddress("1.1.1.1", 1), PROTO_UDP, &request, kIceUfrag[1], false); @@ -3399,7 +3401,7 @@ TEST_F(P2PTransportChannelPingTest, TestSelectConnectionFromUnknownAddress) { uint32_t prflx_priority = ICE_TYPE_PREFERENCE_PRFLX << 24; request.AddAttribute( new StunUInt32Attribute(STUN_ATTR_PRIORITY, prflx_priority)); - TestUDPPort* port = static_cast(GetPort(&ch)); + TestUDPPort* port = static_cast(GetFirstPort(&ch)); port->SignalUnknownAddress(port, rtc::SocketAddress("1.1.1.1", 1), PROTO_UDP, &request, kIceUfrag[1], false); Connection* conn1 = WaitForConnectionTo(&ch, "1.1.1.1", 1); @@ -3497,7 +3499,7 @@ TEST_F(P2PTransportChannelPingTest, TestSelectConnectionBasedOnMediaReceived) { request.AddAttribute( new StunUInt32Attribute(STUN_ATTR_PRIORITY, prflx_priority)); request.AddAttribute(new StunByteStringAttribute(STUN_ATTR_USE_CANDIDATE)); - Port* port = GetPort(&ch); + Port* port = GetFirstPort(&ch); port->SignalUnknownAddress(port, rtc::SocketAddress("3.3.3.3", 3), PROTO_UDP, &request, kIceUfrag[1], false); Connection* conn3 = WaitForConnectionTo(&ch, "3.3.3.3", 3); @@ -3872,6 +3874,31 @@ TEST_F(P2PTransportChannelPingTest, TestConnectionPrunedAgain) { EXPECT_EQ(TransportChannelState::STATE_COMPLETED, ch.GetState()); } +TEST_F(P2PTransportChannelPingTest, TestPruneConnectionsByNetworkName) { + std::string ipv4_addr("1.1.1.1"); + std::string ipv6_addr("2400:1:2:3:4:5:6:7"); + FakePortAllocator pa(rtc::Thread::Current(), nullptr); + pa.set_ipv6_enabled(true); + pa.set_flags(PORTALLOCATOR_ENABLE_IPV6); + P2PTransportChannel ch("test channel", 1, &pa); + PrepareChannel(&ch); + ch.MaybeStartGathering(); + ch.AddRemoteCandidate(CreateUdpCandidate(LOCAL_PORT_TYPE, ipv4_addr, 1, 100)); + ch.AddRemoteCandidate(CreateUdpCandidate(LOCAL_PORT_TYPE, ipv6_addr, 1, 100)); + Connection* conn1 = WaitForConnectionTo(&ch, ipv4_addr, 1); + ASSERT_TRUE(conn1 != nullptr); + Connection* conn2 = WaitForConnectionTo(&ch, ipv6_addr, 1); + ASSERT_TRUE(conn2 != nullptr); + conn1->ReceivedPingResponse(LOW_RTT, "id"); + EXPECT_EQ_WAIT(conn1, ch.selected_connection(), kDefaultTimeout); + conn2->ReceivedPingResponse(LOW_RTT, "id"); + // IPv6 connection has higher priority. + EXPECT_EQ_WAIT(conn2, ch.selected_connection(), kDefaultTimeout); + // Since conn1 and conn2 are on networks with the same network name, + // conn1 will be pruned when conn2 becomes writable and receiving. + EXPECT_FALSE(conn1->writable()); +} + // Test that if all connections in a channel has timed out on writing, they // will all be deleted. We use Prune to simulate write_time_out. TEST_F(P2PTransportChannelPingTest, TestDeleteConnectionsIfAllWriteTimedout) { @@ -4006,12 +4033,12 @@ TEST_F(P2PTransportChannelPingTest, TestPortDestroyedAfterTimeoutAndPruned) { } EXPECT_EQ(nullptr, GetConnectionTo(&ch, "1.1.1.1", 1)); // Port will not be removed because it is not pruned yet. - PortInterface* port = GetPort(&ch); + PortInterface* port = GetFirstPort(&ch); ASSERT_NE(nullptr, port); // If the session prunes all ports, the port will be destroyed. ch.allocator_session()->PruneAllPorts(); - EXPECT_EQ_SIMULATED_WAIT(nullptr, GetPort(&ch), 1, fake_clock); + EXPECT_EQ_SIMULATED_WAIT(nullptr, GetFirstPort(&ch), 1, fake_clock); EXPECT_EQ_SIMULATED_WAIT(nullptr, GetPrunedPort(&ch), 1, fake_clock); }