diff --git a/talk/app/webrtc/statscollector.cc b/talk/app/webrtc/statscollector.cc index 76ac76d7d2..632744568e 100644 --- a/talk/app/webrtc/statscollector.cc +++ b/talk/app/webrtc/statscollector.cc @@ -603,9 +603,9 @@ StatsReport* StatsCollector::AddConnectionInfoReport( report->set_timestamp(stats_gathering_started_); const BoolForAdd bools[] = { - {StatsReport::kStatsValueNameActiveConnection, info.best_connection}, - {StatsReport::kStatsValueNameReceiving, info.receiving}, - {StatsReport::kStatsValueNameWritable, info.writable}, + { StatsReport::kStatsValueNameActiveConnection, info.best_connection }, + { StatsReport::kStatsValueNameReadable, info.readable }, + { StatsReport::kStatsValueNameWritable, info.writable }, }; for (const auto& b : bools) report->AddBoolean(b.name, b.value); diff --git a/talk/app/webrtc/statstypes.cc b/talk/app/webrtc/statstypes.cc index 51ec7fd8a3..56d705ec1f 100644 --- a/talk/app/webrtc/statstypes.cc +++ b/talk/app/webrtc/statstypes.cc @@ -556,7 +556,7 @@ const char* StatsReport::Value::display_name() const { return "googPlisSent"; case kStatsValueNamePreferredJitterBufferMs: return "googPreferredJitterBufferMs"; - case kStatsValueNameReceiving: + case kStatsValueNameReadable: return "googReadable"; case kStatsValueNameRemoteAddress: return "googRemoteAddress"; diff --git a/talk/app/webrtc/statstypes.h b/talk/app/webrtc/statstypes.h index 33b2fa7410..5d5d7174c3 100644 --- a/talk/app/webrtc/statstypes.h +++ b/talk/app/webrtc/statstypes.h @@ -125,7 +125,7 @@ class StatsReport { kStatsValueNamePacketsReceived, kStatsValueNamePacketsSent, kStatsValueNameProtocol, - kStatsValueNameReceiving, + kStatsValueNameReadable, kStatsValueNameSelectedCandidatePairId, kStatsValueNameSsrc, kStatsValueNameState, diff --git a/webrtc/p2p/base/dtlstransportchannel.cc b/webrtc/p2p/base/dtlstransportchannel.cc index dcebdee53e..3474237269 100644 --- a/webrtc/p2p/base/dtlstransportchannel.cc +++ b/webrtc/p2p/base/dtlstransportchannel.cc @@ -97,6 +97,8 @@ DtlsTransportChannelWrapper::DtlsTransportChannelWrapper( dtls_state_(STATE_NONE), ssl_role_(rtc::SSL_CLIENT), ssl_max_version_(rtc::SSL_PROTOCOL_DTLS_10) { + channel_->SignalReadableState.connect(this, + &DtlsTransportChannelWrapper::OnReadableState); channel_->SignalWritableState.connect(this, &DtlsTransportChannelWrapper::OnWritableState); channel_->SignalReadPacket.connect(this, @@ -390,12 +392,25 @@ int DtlsTransportChannelWrapper::SendPacket( // (1) If we're not doing DTLS-SRTP, then the state is just the // state of the underlying impl() // (2) If we're doing DTLS-SRTP: -// - Prior to the DTLS handshake, the state is neither receiving nor +// - Prior to the DTLS handshake, the state is neither readable or // writable // - When the impl goes writable for the first time we // start the DTLS handshake // - Once the DTLS handshake completes, the state is that of the // impl again +void DtlsTransportChannelWrapper::OnReadableState(TransportChannel* channel) { + ASSERT(rtc::Thread::Current() == worker_thread_); + ASSERT(channel == channel_); + LOG_J(LS_VERBOSE, this) + << "DTLSTransportChannelWrapper: channel readable state changed to " + << channel_->readable(); + + if (dtls_state_ == STATE_NONE || dtls_state_ == STATE_OPEN) { + set_readable(channel_->readable()); + // Note: SignalReadableState fired by set_readable. + } +} + void DtlsTransportChannelWrapper::OnWritableState(TransportChannel* channel) { ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(channel == channel_); @@ -530,6 +545,8 @@ void DtlsTransportChannelWrapper::OnDtlsEvent(rtc::StreamInterface* dtls, // The check for OPEN shouldn't be necessary but let's make // sure we don't accidentally frob the state if it's closed. dtls_state_ = STATE_OPEN; + + set_readable(true); set_writable(true); } } @@ -547,6 +564,8 @@ void DtlsTransportChannelWrapper::OnDtlsEvent(rtc::StreamInterface* dtls, } else { LOG_J(LS_INFO, this) << "DTLS channel error, code=" << err; } + + set_readable(false); set_writable(false); dtls_state_ = STATE_CLOSED; } diff --git a/webrtc/p2p/base/p2ptransportchannel.cc b/webrtc/p2p/base/p2ptransportchannel.cc index ee9906053c..094a8dcc8f 100644 --- a/webrtc/p2p/base/p2ptransportchannel.cc +++ b/webrtc/p2p/base/p2ptransportchannel.cc @@ -221,7 +221,6 @@ void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) { void P2PTransportChannel::AddConnection(Connection* connection) { connections_.push_back(connection); connection->set_remote_ice_mode(remote_ice_mode_); - connection->set_receiving_timeout(receiving_timeout_); connection->SignalReadPacket.connect( this, &P2PTransportChannel::OnReadPacket); connection->SignalReadyToSend.connect( @@ -341,10 +340,6 @@ void P2PTransportChannel::SetReceivingTimeout(int receiving_timeout_ms) { receiving_timeout_ = receiving_timeout_ms; check_receiving_delay_ = std::max(MIN_CHECK_RECEIVING_DELAY, receiving_timeout_ / 10); - - for (Connection* connection : connections_) { - connection->set_receiving_timeout(receiving_timeout_); - } LOG(LS_VERBOSE) << "Set ICE receiving timeout to " << receiving_timeout_ << " milliseconds"; } @@ -405,7 +400,7 @@ void P2PTransportChannel::OnPortReady(PortAllocatorSession *session, std::vector::iterator iter; for (iter = remote_candidates_.begin(); iter != remote_candidates_.end(); ++iter) { - CreateConnection(port, *iter, iter->origin_port()); + CreateConnection(port, *iter, iter->origin_port(), false); } SortConnections(); @@ -621,7 +616,7 @@ void P2PTransportChannel::OnCandidate(const Candidate& candidate) { } // Create connections to this remote candidate. - CreateConnections(candidate, NULL); + CreateConnections(candidate, NULL, false); // Resort the connections list, which may have new elements. SortConnections(); @@ -631,7 +626,8 @@ void P2PTransportChannel::OnCandidate(const Candidate& candidate) { // remote candidate. The return value is true if we created a connection from // the origin port. bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, - PortInterface* origin_port) { + PortInterface* origin_port, + bool readable) { ASSERT(worker_thread_ == rtc::Thread::Current()); Candidate new_remote_candidate(remote_candidate); @@ -669,7 +665,7 @@ bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, bool created = false; std::vector::reverse_iterator it; for (it = ports_.rbegin(); it != ports_.rend(); ++it) { - if (CreateConnection(*it, new_remote_candidate, origin_port)) { + if (CreateConnection(*it, new_remote_candidate, origin_port, readable)) { if (*it == origin_port) created = true; } @@ -677,7 +673,8 @@ bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, if ((origin_port != NULL) && std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) { - if (CreateConnection(origin_port, new_remote_candidate, origin_port)) + if (CreateConnection( + origin_port, new_remote_candidate, origin_port, readable)) created = true; } @@ -691,7 +688,8 @@ bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, // And then listen to connection object for changes. bool P2PTransportChannel::CreateConnection(PortInterface* port, const Candidate& remote_candidate, - PortInterface* origin_port) { + PortInterface* origin_port, + bool readable) { // Look for an existing connection with this remote address. If one is not // found, then we can create a new connection for this address. Connection* connection = port->GetConnection(remote_candidate.address()); @@ -726,6 +724,11 @@ bool P2PTransportChannel::CreateConnection(PortInterface* port, << connections_.size() << " total)"; } + // If we are readable, it is because we are creating this in response to a + // ping from the other side. This will cause the state to become readable. + if (readable) + connection->ReceivedPing(); + return true; } @@ -850,7 +853,8 @@ bool P2PTransportChannel::GetStats(ConnectionInfos *infos) { Connection *connection = *it; ConnectionInfo info; info.best_connection = (best_connection_ == connection); - info.receiving = connection->receiving(); + info.readable = + (connection->read_state() == Connection::STATE_READABLE); info.writable = (connection->write_state() == Connection::STATE_WRITABLE); info.timeout = @@ -1025,7 +1029,8 @@ void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) { LOG_J(LS_INFO, this) << "New best connection: " << best_connection_->ToString(); SignalRouteChange(this, best_connection_->remote_candidate()); - set_receiving(best_connection_->receiving()); + // When it just switched to a best connection, set receiving to true. + set_receiving(true); } else { LOG_J(LS_INFO, this) << "No best connection"; } @@ -1041,8 +1046,14 @@ void P2PTransportChannel::UpdateChannelState() { if (writable != this->writable()) LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch"; - // TODO(honghaiz): The channel receiving state is set in OnCheckReceiving. - // Will revisit in a subsequent code change. + bool readable = false; + for (uint32 i = 0; i < connections_.size(); ++i) { + if (connections_[i]->read_state() == Connection::STATE_READABLE) { + readable = true; + break; + } + } + set_readable(readable); } // We checked the status of our connections and we had at least one that @@ -1133,7 +1144,10 @@ void P2PTransportChannel::OnPing() { } void P2PTransportChannel::OnCheckReceiving() { - if (best_connection_) { + // Check receiving only if the best connection has received data packets + // because we want to detect not receiving any packets only after the media + // have started flowing. + if (best_connection_ && best_connection_->recv_total_bytes() > 0) { bool receiving = rtc::Time() <= best_connection_->last_received() + receiving_timeout_; set_receiving(receiving); @@ -1157,13 +1171,23 @@ bool P2PTransportChannel::IsPingable(Connection* conn) { // An never connected connection cannot be written to at all, so pinging is // out of the question. However, if it has become WRITABLE, it is in the // reconnecting state so ping is needed. - if (!conn->connected() && !conn->writable()) { + if (!conn->connected() && conn->write_state() != Connection::STATE_WRITABLE) { return false; } - // If the channel is not writable, ping all candidates. Otherwise, we only - // want to ping connections that have not timed out on writing. - return !writable() || conn->write_state() != Connection::STATE_WRITE_TIMEOUT; + if (writable()) { + // If we are writable, then we only want to ping connections that could be + // better than this one, i.e., the ones that were not pruned. + return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT); + } else { + // If we are not writable, then we need to try everything that might work. + // This includes both connections that do not have write timeout as well as + // ones that do not have read timeout. A connection could be readable but + // be in write-timeout if we pruned it before. Since the other side is + // still pinging it, it very well might still work. + return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) || + (conn->read_state() != Connection::STATE_READ_TIMEOUT); + } } // Returns the next pingable connection to ping. This will be the oldest diff --git a/webrtc/p2p/base/p2ptransportchannel.h b/webrtc/p2p/base/p2ptransportchannel.h index a8f16666c9..a00c17e237 100644 --- a/webrtc/p2p/base/p2ptransportchannel.h +++ b/webrtc/p2p/base/p2ptransportchannel.h @@ -176,11 +176,10 @@ class P2PTransportChannel : public TransportChannelImpl, void HandleAllTimedOut(); Connection* GetBestConnectionOnNetwork(rtc::Network* network) const; - bool CreateConnections(const Candidate& remote_candidate, - PortInterface* origin_port); - bool CreateConnection(PortInterface* port, - const Candidate& remote_candidate, - PortInterface* origin_port); + bool CreateConnections(const Candidate &remote_candidate, + PortInterface* origin_port, bool readable); + bool CreateConnection(PortInterface* port, const Candidate& remote_candidate, + PortInterface* origin_port, bool readable); bool FindConnection(cricket::Connection* connection) const; uint32 GetRemoteCandidateGeneration(const Candidate& candidate); diff --git a/webrtc/p2p/base/p2ptransportchannel_unittest.cc b/webrtc/p2p/base/p2ptransportchannel_unittest.cc index d0277f4814..3bb3be5ae8 100644 --- a/webrtc/p2p/base/p2ptransportchannel_unittest.cc +++ b/webrtc/p2p/base/p2ptransportchannel_unittest.cc @@ -492,9 +492,9 @@ class P2PTransportChannelTestBase : public testing::Test, CreateChannels(1); EXPECT_TRUE_WAIT_MARGIN(ep1_ch1() != NULL && ep2_ch1() != NULL && - ep1_ch1()->receiving() && + ep1_ch1()->readable() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && + ep2_ch1()->readable() && ep2_ch1()->writable(), expected.connect_wait, 1000); @@ -561,7 +561,7 @@ class P2PTransportChannelTestBase : public testing::Test, } } - // This test waits for the transport to become receiving and writable on both + // This test waits for the transport to become readable and writable on both // end points. Once they are, the end points set new local ice credentials to // restart the ice gathering. Finally it waits for the transport to select a // new connection using the newly generated ice candidates. @@ -569,8 +569,8 @@ class P2PTransportChannelTestBase : public testing::Test, void TestHandleIceUfragPasswordChanged() { ep1_ch1()->SetRemoteIceCredentials(kIceUfrag[1], kIcePwd[1]); ep2_ch1()->SetRemoteIceCredentials(kIceUfrag[0], kIcePwd[0]); - EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->readable() && ep1_ch1()->writable() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000, 1000); const cricket::Candidate* old_local_candidate1 = LocalCandidate(ep1_ch1()); @@ -614,9 +614,9 @@ class P2PTransportChannelTestBase : public testing::Test, EXPECT_TRUE_WAIT(GetRoleConflict(0), 1000); EXPECT_FALSE(GetRoleConflict(1)); - EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000); @@ -1103,8 +1103,8 @@ TEST_F(P2PTransportChannelTest, GetStats) { kDefaultPortAllocatorFlags, kDefaultPortAllocatorFlags); CreateChannels(1); - EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->readable() && ep1_ch1()->writable() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000, 1000); TestSendRecv(1); cricket::ConnectionInfos infos; @@ -1112,7 +1112,7 @@ TEST_F(P2PTransportChannelTest, GetStats) { ASSERT_EQ(1U, infos.size()); EXPECT_TRUE(infos[0].new_connection); EXPECT_TRUE(infos[0].best_connection); - EXPECT_TRUE(infos[0].receiving); + EXPECT_TRUE(infos[0].readable); EXPECT_TRUE(infos[0].writable); EXPECT_FALSE(infos[0].timeout); EXPECT_EQ(10U, infos[0].sent_total_packets); @@ -1241,9 +1241,9 @@ TEST_F(P2PTransportChannelTest, IncomingOnlyBlocked) { // Pump for 1 second and verify that the channels are not connected. rtc::Thread::Current()->ProcessMessages(1000); - EXPECT_FALSE(ep1_ch1()->receiving()); + EXPECT_FALSE(ep1_ch1()->readable()); EXPECT_FALSE(ep1_ch1()->writable()); - EXPECT_FALSE(ep2_ch1()->receiving()); + EXPECT_FALSE(ep2_ch1()->readable()); EXPECT_FALSE(ep2_ch1()->writable()); DestroyChannels(); @@ -1261,8 +1261,8 @@ TEST_F(P2PTransportChannelTest, IncomingOnlyOpen) { ep1_ch1()->set_incoming_only(true); EXPECT_TRUE_WAIT_MARGIN(ep1_ch1() != NULL && ep2_ch1() != NULL && - ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + ep1_ch1()->readable() && ep1_ch1()->writable() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000, 1000); DestroyChannels(); @@ -1287,8 +1287,8 @@ TEST_F(P2PTransportChannelTest, TestTcpConnectionsFromActiveToPassive) { CreateChannels(1); - EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000); EXPECT_TRUE( ep1_ch1()->best_connection() && ep2_ch1()->best_connection() && @@ -1343,9 +1343,9 @@ TEST_F(P2PTransportChannelTest, TestIceConfigWillPassDownToPort) { EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker()); } - EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000); @@ -1400,8 +1400,8 @@ TEST_F(P2PTransportChannelTest, TestIPv6Connections) { CreateChannels(1); - EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000); EXPECT_TRUE( ep1_ch1()->best_connection() && ep2_ch1()->best_connection() && @@ -1426,8 +1426,10 @@ TEST_F(P2PTransportChannelTest, TestForceTurn) { CreateChannels(1); - EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && + ep1_ch1()->writable() && + ep2_ch1()->readable() && + ep2_ch1()->writable(), 2000); EXPECT_TRUE(ep1_ch1()->best_connection() && @@ -1506,8 +1508,8 @@ TEST_F(P2PTransportChannelMultihomedTest, TestFailover) { // Create channels and let them go writable, as usual. CreateChannels(1); - EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000); EXPECT_TRUE( ep1_ch1()->best_connection() && ep2_ch1()->best_connection() && @@ -1550,8 +1552,8 @@ TEST_F(P2PTransportChannelMultihomedTest, TestDrain) { // Create channels and let them go writable, as usual. CreateChannels(1); - EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() && - ep2_ch1()->receiving() && ep2_ch1()->writable(), + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() && + ep2_ch1()->readable() && ep2_ch1()->writable(), 1000); EXPECT_TRUE( ep1_ch1()->best_connection() && ep2_ch1()->best_connection() && @@ -1704,14 +1706,14 @@ TEST_F(P2PTransportChannelPingTest, ConnectionResurrection) { uint32 remote_priority = conn1->remote_candidate().priority(); // Create a higher priority candidate and make the connection - // receiving/writable. This will prune conn1. + // readable/writable. This will prune conn1. ch.OnCandidate(CreateCandidate("2.2.2.2", 2, 2)); cricket::Connection* conn2 = WaitForConnectionTo(&ch, "2.2.2.2", 2); ASSERT_TRUE(conn2 != nullptr); conn2->ReceivedPing(); conn2->ReceivedPingResponse(); - // Wait for conn1 to be destroyed. + // Wait for conn1 being destroyed. EXPECT_TRUE_WAIT(GetConnectionTo(&ch, "1.1.1.1", 1) == nullptr, 3000); cricket::Port* port = GetPort(&ch); @@ -1903,7 +1905,7 @@ TEST_F(P2PTransportChannelPingTest, TestSelectConnectionBasedOnMediaReceived) { ch.OnCandidate(CreateCandidate("2.2.2.2", 2, 1)); cricket::Connection* conn2 = WaitForConnectionTo(&ch, "2.2.2.2", 2); ASSERT_TRUE(conn2 != nullptr); - conn2->ReceivedPing(); // Start receiving. + conn2->ReceivedPing(); // Become readable. // Do not switch because it is not writable. conn2->OnReadPacket("ABC", 3, rtc::CreatePacketTime(0)); EXPECT_EQ(conn1, ch.best_connection()); diff --git a/webrtc/p2p/base/port.cc b/webrtc/p2p/base/port.cc index d6bc27ba02..da66928db2 100644 --- a/webrtc/p2p/base/port.cc +++ b/webrtc/p2p/base/port.cc @@ -782,8 +782,8 @@ Connection::Connection(Port* port, : port_(port), local_candidate_index_(index), remote_candidate_(remote_candidate), + read_state_(STATE_READ_INIT), write_state_(STATE_WRITE_INIT), - receiving_(false), connected_(true), pruned_(false), use_candidate_attr_(false), @@ -800,8 +800,7 @@ Connection::Connection(Port* port, sent_packets_discarded_(0), sent_packets_total_(0), reported_(false), - state_(STATE_WAITING), - receiving_timeout_(WEAK_CONNECTION_RECEIVE_TIMEOUT) { + state_(STATE_WAITING) { // All of our connections start in WAITING state. // TODO(mallinath) - Start connections from STATE_FROZEN. // Wire up to send stun packets @@ -842,6 +841,16 @@ uint64 Connection::priority() const { return priority; } +void Connection::set_read_state(ReadState value) { + ReadState old_value = read_state_; + read_state_ = value; + if (value != old_value) { + LOG_J(LS_VERBOSE, this) << "set_read_state"; + SignalStateChange(this); + CheckTimeout(); + } +} + void Connection::set_write_state(WriteState value) { WriteState old_value = write_state_; write_state_ = value; @@ -853,15 +862,6 @@ void Connection::set_write_state(WriteState value) { } } -void Connection::set_receiving(bool value) { - if (value != receiving_) { - LOG_J(LS_VERBOSE, this) << "set_receiving to " << value; - receiving_ = value; - SignalStateChange(this); - CheckTimeout(); - } -} - void Connection::set_state(State state) { State old_state = state_; state_ = state; @@ -902,17 +902,27 @@ void Connection::OnReadPacket( const rtc::SocketAddress& addr(remote_candidate_.address()); if (!port_->GetStunMessage(data, size, addr, msg.accept(), &remote_ufrag)) { // The packet did not parse as a valid STUN message - // This is a data packet, pass it along. - set_receiving(true); - last_data_received_ = rtc::Time(); - recv_rate_tracker_.AddSamples(size); - SignalReadPacket(this, data, size, packet_time); - // If timed out sending writability checks, start up again - if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) { - LOG(LS_WARNING) << "Received a data packet on a timed-out Connection. " - << "Resetting state to STATE_WRITE_INIT."; - set_write_state(STATE_WRITE_INIT); + // If this connection is readable, then pass along the packet. + if (read_state_ == STATE_READABLE) { + // readable means data from this address is acceptable + // Send it on! + last_data_received_ = rtc::Time(); + recv_rate_tracker_.AddSamples(size); + SignalReadPacket(this, data, size, packet_time); + + // If timed out sending writability checks, start up again + if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) { + LOG(LS_WARNING) << "Received a data packet on a timed-out Connection. " + << "Resetting state to STATE_WRITE_INIT."; + set_write_state(STATE_WRITE_INIT); + } + } else { + // Not readable means the remote address hasn't sent a valid + // binding request yet. + + LOG_J(LS_WARNING, this) + << "Received non-STUN packet from an unreadable connection."; } } else if (!msg) { // The packet was STUN, but failed a check and was handled internally. @@ -982,7 +992,12 @@ void Connection::OnReadPacket( // Otherwise we can mark connection to read timeout. No response will be // sent in this scenario. case STUN_BINDING_INDICATION: - ReceivedPing(); + if (read_state_ == STATE_READABLE) { + ReceivedPing(); + } else { + LOG_J(LS_WARNING, this) << "Received STUN binding indication " + << "from an unreadable connection."; + } break; default: @@ -1009,7 +1024,8 @@ void Connection::Prune() { void Connection::Destroy() { LOG_J(LS_VERBOSE, this) << "Connection destroyed"; - port_->thread()->Post(this, MSG_DELETE); + set_read_state(STATE_READ_TIMEOUT); + set_write_state(STATE_WRITE_TIMEOUT); } void Connection::PrintPingsSinceLastResponse(std::string* s, size_t max) { @@ -1073,6 +1089,7 @@ void Connection::UpdateState(uint32 now) { << " rtt=" << rtt; set_write_state(STATE_WRITE_UNRELIABLE); } + if ((write_state_ == STATE_WRITE_UNRELIABLE || write_state_ == STATE_WRITE_INIT) && TooLongWithoutResponse(pings_since_last_response_, @@ -1084,11 +1101,6 @@ void Connection::UpdateState(uint32 now) { << ", rtt=" << rtt; set_write_state(STATE_WRITE_TIMEOUT); } - - // Check the receiving state. - uint32 last_recv_time = last_received(); - bool receiving = now <= last_recv_time + receiving_timeout_; - set_receiving(receiving); } void Connection::Ping(uint32 now) { @@ -1102,8 +1114,8 @@ void Connection::Ping(uint32 now) { } void Connection::ReceivedPing() { - set_receiving(true); last_ping_received_ = rtc::Time(); + set_read_state(STATE_READABLE); } void Connection::ReceivedPingResponse() { @@ -1112,7 +1124,6 @@ void Connection::ReceivedPingResponse() { // So if we're not already, become writable. We may be bringing a pruned // connection back to life, but if we don't really want it, we can always // prune it again. - set_receiving(true); set_write_state(STATE_WRITABLE); set_state(STATE_SUCCEEDED); pings_since_last_response_.clear(); @@ -1130,9 +1141,10 @@ std::string Connection::ToString() const { '-', // not connected (false) 'C', // connected (true) }; - const char RECEIVE_STATE_ABBREV[2] = { - '-', // not receiving (false) - 'R', // receiving (true) + const char READ_STATE_ABBREV[3] = { + '-', // STATE_READ_INIT + 'R', // STATE_READABLE + 'x', // STATE_READ_TIMEOUT }; const char WRITE_STATE_ABBREV[4] = { 'W', // STATE_WRITABLE @@ -1160,7 +1172,7 @@ std::string Connection::ToString() const { << ":" << remote.type() << ":" << remote.protocol() << ":" << remote.address().ToSensitiveString() << "|" << CONNECT_STATE_ABBREV[connected()] - << RECEIVE_STATE_ABBREV[receiving()] + << READ_STATE_ABBREV[read_state()] << WRITE_STATE_ABBREV[write_state()] << ICESTATE[state()] << "|" << priority() << "|"; @@ -1185,6 +1197,11 @@ void Connection::OnConnectionRequestResponse(ConnectionRequest* request, uint32 rtt = request->Elapsed(); ReceivedPingResponse(); + if (remote_ice_mode_ == ICEMODE_LITE) { + // A ice-lite end point never initiates ping requests. This will allow + // us to move to STATE_READABLE without an incoming ping request. + set_read_state(STATE_READABLE); + } if (LOG_CHECK_LEVEL_V(sev)) { bool use_candidate = ( @@ -1252,8 +1269,14 @@ void Connection::OnConnectionRequestSent(ConnectionRequest* request) { } void Connection::CheckTimeout() { - // If write has timed out and it is not receiving, remove the connection. - if (!receiving_ && write_state_ == STATE_WRITE_TIMEOUT) { + // If both read and write have timed out or read has never initialized, then + // this connection can contribute no more to p2p socket unless at some later + // date readability were to come back. However, we gave readability a long + // time to timeout, so at this point, it seems fair to get rid of this + // connection. + if ((read_state_ == STATE_READ_TIMEOUT || + read_state_ == STATE_READ_INIT) && + write_state_ == STATE_WRITE_TIMEOUT) { port_->thread()->Post(this, MSG_DELETE); } } diff --git a/webrtc/p2p/base/port.h b/webrtc/p2p/base/port.h index 8e7a2592d9..fbda9cea01 100644 --- a/webrtc/p2p/base/port.h +++ b/webrtc/p2p/base/port.h @@ -50,12 +50,8 @@ extern const char TCPTYPE_ACTIVE_STR[]; extern const char TCPTYPE_PASSIVE_STR[]; extern const char TCPTYPE_SIMOPEN_STR[]; -// If a connection does not receive anything for this long, it is considered -// dead. -const uint32 DEAD_CONNECTION_RECEIVE_TIMEOUT = 30 * 1000; // 30 seconds. - -// The timeout duration when a connection does not receive anything. -const uint32 WEAK_CONNECTION_RECEIVE_TIMEOUT = 2500; // 2.5 seconds +// The length of time we wait before timing out readability on a connection. +const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000; // 30 seconds // The length of time we wait before timing out writability on a connection. const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000; // 15 seconds @@ -421,6 +417,15 @@ class Connection : public rtc::MessageHandler, // Returns the pair priority. uint64 priority() const; + enum ReadState { + STATE_READ_INIT = 0, // we have yet to receive a ping + STATE_READABLE = 1, // we have received pings recently + STATE_READ_TIMEOUT = 2, // we haven't received pings in a while + }; + + ReadState read_state() const { return read_state_; } + bool readable() const { return read_state_ == STATE_READABLE; } + enum WriteState { STATE_WRITABLE = 0, // we have received ping responses recently STATE_WRITE_UNRELIABLE = 1, // we have had a few ping failures @@ -430,7 +435,6 @@ class Connection : public rtc::MessageHandler, WriteState write_state() const { return write_state_; } bool writable() const { return write_state_ == STATE_WRITABLE; } - bool receiving() const { return receiving_; } // Determines whether the connection has finished connecting. This can only // be false for TCP connections. @@ -462,8 +466,8 @@ class Connection : public rtc::MessageHandler, // Error if Send() returns < 0 virtual int GetError() = 0; - sigslot::signal4 - SignalReadPacket; + sigslot::signal4 SignalReadPacket; sigslot::signal1 SignalReadyToSend; @@ -491,10 +495,6 @@ class Connection : public rtc::MessageHandler, remote_ice_mode_ = mode; } - void set_receiving_timeout(uint32 receiving_timeout_ms) { - receiving_timeout_ = receiving_timeout_ms; - } - // Makes the connection go away. void Destroy(); @@ -565,8 +565,8 @@ class Connection : public rtc::MessageHandler, void OnConnectionRequestSent(ConnectionRequest* req); // Changes the state and signals if necessary. + void set_read_state(ReadState value); void set_write_state(WriteState value); - void set_receiving(bool value); void set_state(State state); void set_connected(bool value); @@ -578,8 +578,8 @@ class Connection : public rtc::MessageHandler, Port* port_; size_t local_candidate_index_; Candidate remote_candidate_; + ReadState read_state_; WriteState write_state_; - bool receiving_; bool connected_; bool pruned_; // By default |use_candidate_attr_| flag will be true, @@ -611,8 +611,6 @@ class Connection : public rtc::MessageHandler, bool reported_; State state_; - // Time duration to switch from receiving to not receiving. - uint32 receiving_timeout_; friend class Port; friend class ConnectionRequest; diff --git a/webrtc/p2p/base/port_unittest.cc b/webrtc/p2p/base/port_unittest.cc index c4fa5f8331..453b77c902 100644 --- a/webrtc/p2p/base/port_unittest.cc +++ b/webrtc/p2p/base/port_unittest.cc @@ -821,7 +821,7 @@ void PortTest::TestConnectivity(const char* name1, Port* port1, if (same_addr1 && same_addr2) { // The new ping got back to the source. - EXPECT_TRUE(ch1.conn()->receiving()); + EXPECT_EQ(Connection::STATE_READABLE, ch1.conn()->read_state()); EXPECT_EQ(Connection::STATE_WRITABLE, ch2.conn()->write_state()); // First connection may not be writable if the first ping did not get @@ -841,7 +841,7 @@ void PortTest::TestConnectivity(const char* name1, Port* port1, // able to get a ping from it. This gives us the real source address. ch1.Ping(); EXPECT_TRUE_WAIT(!ch2.remote_address().IsNil(), kTimeout); - EXPECT_FALSE(ch2.conn()->receiving()); + EXPECT_EQ(Connection::STATE_READ_INIT, ch2.conn()->read_state()); EXPECT_TRUE(ch1.remote_address().IsNil()); // Pick up the actual address and establish the connection. @@ -854,7 +854,7 @@ void PortTest::TestConnectivity(const char* name1, Port* port1, // The new ping came in, but from an unexpected address. This will happen // when the destination NAT is symmetric. EXPECT_FALSE(ch1.remote_address().IsNil()); - EXPECT_FALSE(ch1.conn()->receiving()); + EXPECT_EQ(Connection::STATE_READ_INIT, ch1.conn()->read_state()); // Update our address and complete the connection. ch1.AcceptConnection(GetCandidate(port2)); @@ -876,14 +876,14 @@ void PortTest::TestConnectivity(const char* name1, Port* port1, ASSERT_TRUE(ch1.conn() != NULL); ASSERT_TRUE(ch2.conn() != NULL); if (possible) { - EXPECT_TRUE(ch1.conn()->receiving()); + EXPECT_EQ(Connection::STATE_READABLE, ch1.conn()->read_state()); EXPECT_EQ(Connection::STATE_WRITABLE, ch1.conn()->write_state()); - EXPECT_TRUE(ch2.conn()->receiving()); + EXPECT_EQ(Connection::STATE_READABLE, ch2.conn()->read_state()); EXPECT_EQ(Connection::STATE_WRITABLE, ch2.conn()->write_state()); } else { - EXPECT_FALSE(ch1.conn()->receiving()); + EXPECT_NE(Connection::STATE_READABLE, ch1.conn()->read_state()); EXPECT_NE(Connection::STATE_WRITABLE, ch1.conn()->write_state()); - EXPECT_FALSE(ch2.conn()->receiving()); + EXPECT_NE(Connection::STATE_READABLE, ch2.conn()->read_state()); EXPECT_NE(Connection::STATE_WRITABLE, ch2.conn()->write_state()); } @@ -1273,7 +1273,7 @@ TEST_F(PortTest, TestLoopbackCal) { // response. lport->Reset(); lport->AddCandidateAddress(kLocalAddr2); - // Creating a different connection as |conn| is receiving. + // Creating a different connection as |conn| is in STATE_READABLE. Connection* conn1 = lport->CreateConnection(lport->Candidates()[1], Port::ORIGIN_MESSAGE); conn1->Ping(0); diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc index d626ad3d65..3322e3898a 100644 --- a/webrtc/p2p/base/transport.cc +++ b/webrtc/p2p/base/transport.cc @@ -25,6 +25,7 @@ using rtc::Bind; enum { MSG_ONSIGNALINGREADY = 1, MSG_ONREMOTECANDIDATE, + MSG_READSTATE, MSG_WRITESTATE, MSG_REQUESTSIGNALING, MSG_CANDIDATEREADY, @@ -237,6 +238,7 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) { if (local_description_ && remote_description_) ApplyNegotiatedTransportDescription_w(impl, NULL); + impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState); impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState); impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState); impl->SignalRequestSignaling.connect( @@ -491,6 +493,20 @@ void Transport::OnRemoteCandidate_w(const Candidate& candidate) { } } +void Transport::OnChannelReadableState(TransportChannel* channel) { + ASSERT(worker_thread()->IsCurrent()); + signaling_thread()->Post(this, MSG_READSTATE, NULL); +} + +void Transport::OnChannelReadableState_s() { + ASSERT(signaling_thread()->IsCurrent()); + TransportState readable = GetTransportState_s(TRANSPORT_READABLE_STATE); + if (readable_ != readable) { + readable_ = readable; + SignalReadableState(this); + } +} + void Transport::OnChannelWritableState(TransportChannel* channel) { ASSERT(worker_thread()->IsCurrent()); signaling_thread()->Post(this, MSG_WRITESTATE, NULL); @@ -531,6 +547,9 @@ TransportState Transport::GetTransportState_s(TransportStateType state_type) { for (const auto iter : channels_) { bool b = false; switch (state_type) { + case TRANSPORT_READABLE_STATE: + b = iter.second->readable(); + break; case TRANSPORT_WRITABLE_STATE: b = iter.second->writable(); break; @@ -851,6 +870,9 @@ void Transport::OnMessage(rtc::Message* msg) { case MSG_CONNECTING: OnConnecting_s(); break; + case MSG_READSTATE: + OnChannelReadableState_s(); + break; case MSG_WRITESTATE: OnChannelWritableState_s(); break; diff --git a/webrtc/p2p/base/transport.h b/webrtc/p2p/base/transport.h index 72a089501e..5539655953 100644 --- a/webrtc/p2p/base/transport.h +++ b/webrtc/p2p/base/transport.h @@ -54,7 +54,7 @@ class TransportChannelImpl; typedef std::vector Candidates; -// For "writable" and "receiving", we need to differentiate between +// For "writable", "readable", and "receiving", we need to differentiate between // none, all, and some. enum TransportState { TRANSPORT_STATE_NONE = 0, @@ -63,9 +63,10 @@ enum TransportState { }; // When checking transport state, we need to differentiate between -// "writable" or "receiving" check. +// "readable", "writable", or "receiving" check. enum TransportStateType { - TRANSPORT_WRITABLE_STATE = 0, + TRANSPORT_READABLE_STATE = 0, + TRANSPORT_WRITABLE_STATE, TRANSPORT_RECEIVING_STATE }; @@ -75,7 +76,7 @@ struct ConnectionInfo { ConnectionInfo() : best_connection(false), writable(false), - receiving(false), + readable(false), timeout(false), new_connection(false), rtt(0), @@ -89,7 +90,7 @@ struct ConnectionInfo { bool best_connection; // Is this the best connection we have? bool writable; // Has this connection received a STUN response? - bool receiving; // Has this connection received anything? + bool readable; // Has this connection received a STUN request? bool timeout; // Has this connection timed out? bool new_connection; // Is this a newly created connection? size_t rtt; // The STUN RTT for this connection. @@ -155,16 +156,25 @@ class Transport : public rtc::MessageHandler, // Returns the port allocator object for this transport. PortAllocator* port_allocator() { return allocator_; } - // Returns the states of this manager. These bits are the ORs + // Returns the readable and states of this manager. These bits are the ORs // of the corresponding bits on the managed channels. Each time one of these // states changes, a signal is raised. - // TODO(honghaiz): Replace uses of writable() with any_channels_writable(). + // TODO: Replace uses of readable() and writable() with + // any_channels_readable() and any_channels_writable(). + bool readable() const { return any_channels_readable(); } bool writable() const { return any_channels_writable(); } bool was_writable() const { return was_writable_; } + bool any_channels_readable() const { + return (readable_ == TRANSPORT_STATE_SOME || + readable_ == TRANSPORT_STATE_ALL); + } bool any_channels_writable() const { return (writable_ == TRANSPORT_STATE_SOME || writable_ == TRANSPORT_STATE_ALL); } + bool all_channels_readable() const { + return (readable_ == TRANSPORT_STATE_ALL); + } bool all_channels_writable() const { return (writable_ == TRANSPORT_STATE_ALL); } @@ -173,6 +183,7 @@ class Transport : public rtc::MessageHandler, receiving_ == TRANSPORT_STATE_ALL); } + sigslot::signal1 SignalReadableState; sigslot::signal1 SignalWritableState; sigslot::signal1 SignalReceivingState; sigslot::signal1 SignalCompleted; @@ -363,7 +374,8 @@ class Transport : public rtc::MessageHandler, // Candidate component => ChannelMapEntry typedef std::map ChannelMap; - // Called when the write state of a channel changes. + // Called when the state of a channel changes. + void OnChannelReadableState(TransportChannel* channel); void OnChannelWritableState(TransportChannel* channel); // Called when the receiving state of a channel changes. @@ -397,6 +409,7 @@ class Transport : public rtc::MessageHandler, void ResetChannels_w(); void DestroyAllChannels_w(); void OnRemoteCandidate_w(const Candidate& candidate); + void OnChannelReadableState_s(); void OnChannelWritableState_s(); void OnChannelReceivingState_s(); void OnChannelRequestSignaling_s(); diff --git a/webrtc/p2p/base/transportchannel.cc b/webrtc/p2p/base/transportchannel.cc index 5d5a7c98fe..5fb0eb472c 100644 --- a/webrtc/p2p/base/transportchannel.cc +++ b/webrtc/p2p/base/transportchannel.cc @@ -15,14 +15,22 @@ namespace cricket { std::string TransportChannel::ToString() const { - const char RECEIVING_ABBREV[2] = { '_', 'R' }; + const char READABLE_ABBREV[2] = { '_', 'R' }; const char WRITABLE_ABBREV[2] = { '_', 'W' }; std::stringstream ss; - ss << "Channel[" << content_name_ << "|" << component_ << "|" - << RECEIVING_ABBREV[receiving_] << WRITABLE_ABBREV[writable_] << "]"; + ss << "Channel[" << content_name_ + << "|" << component_ + << "|" << READABLE_ABBREV[readable_] << WRITABLE_ABBREV[writable_] << "]"; return ss.str(); } +void TransportChannel::set_readable(bool readable) { + if (readable_ != readable) { + readable_ = readable; + SignalReadableState(this); + } +} + void TransportChannel::set_receiving(bool receiving) { if (receiving_ == receiving) { return; diff --git a/webrtc/p2p/base/transportchannel.h b/webrtc/p2p/base/transportchannel.h index 60d1ed0b35..f492e4e000 100644 --- a/webrtc/p2p/base/transportchannel.h +++ b/webrtc/p2p/base/transportchannel.h @@ -46,8 +46,7 @@ class TransportChannel : public sigslot::has_slots<> { explicit TransportChannel(const std::string& content_name, int component) : content_name_(content_name), component_(component), - writable_(false), - receiving_(false) {} + readable_(false), writable_(false), receiving_(false) {} virtual ~TransportChannel() {} // TODO(guoweis) - Make this pure virtual once all subclasses of @@ -63,10 +62,13 @@ class TransportChannel : public sigslot::has_slots<> { const std::string& content_name() const { return content_name_; } int component() const { return component_; } - // Returns the states of this channel. Each time one of these states changes, - // a signal is raised. These states are aggregated by the TransportManager. + // Returns the readable and states of this channel. Each time one of these + // states changes, a signal is raised. These states are aggregated by the + // TransportManager. + bool readable() const { return readable_; } bool writable() const { return writable_; } bool receiving() const { return receiving_; } + sigslot::signal1 SignalReadableState; sigslot::signal1 SignalWritableState; // Emitted when the TransportChannel's ability to send has changed. sigslot::signal1 SignalReadyToSend; @@ -137,6 +139,8 @@ class TransportChannel : public sigslot::has_slots<> { std::string ToString() const; protected: + // Sets the readable state, signaling if necessary. + void set_readable(bool readable); // Sets the writable state, signaling if necessary. void set_writable(bool writable); @@ -149,6 +153,7 @@ class TransportChannel : public sigslot::has_slots<> { // Used mostly for debugging. std::string content_name_; int component_; + bool readable_; bool writable_; bool receiving_; diff --git a/webrtc/p2p/base/transportchannelproxy.cc b/webrtc/p2p/base/transportchannelproxy.cc index 74d1e1db76..f7946dd96f 100644 --- a/webrtc/p2p/base/transportchannelproxy.cc +++ b/webrtc/p2p/base/transportchannelproxy.cc @@ -55,10 +55,10 @@ void TransportChannelProxy::SetImplementation(TransportChannelImpl* impl) { impl_ = impl; if (impl_) { + impl_->SignalReadableState.connect( + this, &TransportChannelProxy::OnReadableState); impl_->SignalWritableState.connect( this, &TransportChannelProxy::OnWritableState); - impl_->SignalReceivingState.connect( - this, &TransportChannelProxy::OnReceivingState); impl_->SignalReadPacket.connect( this, &TransportChannelProxy::OnReadPacket); impl_->SignalReadyToSend.connect( @@ -229,18 +229,18 @@ IceRole TransportChannelProxy::GetIceRole() const { return impl_->GetIceRole(); } +void TransportChannelProxy::OnReadableState(TransportChannel* channel) { + ASSERT(rtc::Thread::Current() == worker_thread_); + ASSERT(channel == impl_); + set_readable(impl_->readable()); + // Note: SignalReadableState fired by set_readable. +} + void TransportChannelProxy::OnWritableState(TransportChannel* channel) { ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(channel == impl_); set_writable(impl_->writable()); - // Note: SignalWritableState fired by set_writable. -} - -void TransportChannelProxy::OnReceivingState(TransportChannel* channel) { - ASSERT(rtc::Thread::Current() == worker_thread_); - ASSERT(channel == impl_); - set_receiving(impl_->receiving()); - // Note: SignalReceivingState fired by set_receiving. + // Note: SignalWritableState fired by set_readable. } void TransportChannelProxy::OnReadPacket( @@ -267,9 +267,9 @@ void TransportChannelProxy::OnRouteChange(TransportChannel* channel, void TransportChannelProxy::OnMessage(rtc::Message* msg) { ASSERT(rtc::Thread::Current() == worker_thread_); if (msg->message_id == MSG_UPDATESTATE) { - // If impl_ is already receiving or writable, push up those signals. - set_writable(impl_ ? impl_->writable() : false); - set_receiving(impl_ ? impl_->receiving() : false); + // If impl_ is already readable or writable, push up those signals. + set_readable(impl_ ? impl_->readable() : false); + set_writable(impl_ ? impl_->writable() : false); } } diff --git a/webrtc/p2p/base/transportchannelproxy.h b/webrtc/p2p/base/transportchannelproxy.h index 80ee20aabb..f10f5076eb 100644 --- a/webrtc/p2p/base/transportchannelproxy.h +++ b/webrtc/p2p/base/transportchannelproxy.h @@ -72,7 +72,7 @@ class TransportChannelProxy : public TransportChannel, private: // Catch signals from the implementation channel. These just forward to the // client (after updating our state to match). - void OnReceivingState(TransportChannel* channel); + void OnReadableState(TransportChannel* channel); void OnWritableState(TransportChannel* channel); void OnReadPacket(TransportChannel* channel, const char* data, size_t size, const rtc::PacketTime& packet_time, int flags); diff --git a/webrtc/p2p/base/turnport_unittest.cc b/webrtc/p2p/base/turnport_unittest.cc index 724485ddde..a90ea032b7 100644 --- a/webrtc/p2p/base/turnport_unittest.cc +++ b/webrtc/p2p/base/turnport_unittest.cc @@ -364,7 +364,7 @@ class TurnPortTest : public testing::Test, conn1->Ping(0); WAIT(!turn_unknown_address_, kTimeout); EXPECT_FALSE(turn_unknown_address_); - EXPECT_FALSE(conn1->receiving()); + EXPECT_EQ(Connection::STATE_READ_INIT, conn1->read_state()); EXPECT_EQ(Connection::STATE_WRITE_INIT, conn1->write_state()); // Send ping from TURN to UDP. @@ -375,14 +375,14 @@ class TurnPortTest : public testing::Test, conn2->Ping(0); EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, conn2->write_state(), kTimeout); - EXPECT_TRUE(conn1->receiving()); - EXPECT_TRUE(conn2->receiving()); + EXPECT_EQ(Connection::STATE_READABLE, conn1->read_state()); + EXPECT_EQ(Connection::STATE_READ_INIT, conn2->read_state()); EXPECT_EQ(Connection::STATE_WRITE_INIT, conn1->write_state()); // Send another ping from UDP to TURN. conn1->Ping(0); EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, conn1->write_state(), kTimeout); - EXPECT_TRUE(conn2->receiving()); + EXPECT_EQ(Connection::STATE_READABLE, conn2->read_state()); } void TestTurnSendData() {