Ensure TCPPort is notified of sent packets after reconnect
This fix an issue where the socket does not notify the port of sent packets after that the TCPConnection has opened a new socket. TcpConnection will open a new socket if the TCP connection has been closed. Bug: webrtc:361124449 b/359989715 Change-Id: Id33c5fc5292ee7d1c2d1cad6c373e2d4355d4fe1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/360260 Reviewed-by: Jonas Oreland <jonaso@webrtc.org> Commit-Queue: Per Kjellander <perkj@webrtc.org> Cr-Commit-Position: refs/heads/main@{#42865}
This commit is contained in:
parent
6bed21c811
commit
6db0db5cfd
@ -151,13 +151,8 @@ Connection* TCPPort::CreateConnection(const Candidate& address,
|
|||||||
socket->DeregisterReceivedPacketCallback();
|
socket->DeregisterReceivedPacketCallback();
|
||||||
conn = new TCPConnection(NewWeakPtr(), address, socket);
|
conn = new TCPConnection(NewWeakPtr(), address, socket);
|
||||||
} else {
|
} else {
|
||||||
// Outgoing connection, which will create a new socket for which we still
|
// Outgoing connection, which will create a new socket.
|
||||||
// need to connect SignalReadyToSend and SignalSentPacket.
|
|
||||||
conn = new TCPConnection(NewWeakPtr(), address);
|
conn = new TCPConnection(NewWeakPtr(), address);
|
||||||
if (conn->socket()) {
|
|
||||||
conn->socket()->SignalReadyToSend.connect(this, &TCPPort::OnReadyToSend);
|
|
||||||
conn->socket()->SignalSentPacket.connect(this, &TCPPort::OnSentPacket);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
AddOrReplaceConnection(conn);
|
AddOrReplaceConnection(conn);
|
||||||
return conn;
|
return conn;
|
||||||
@ -415,6 +410,14 @@ int TCPConnection::GetError() {
|
|||||||
return error_;
|
return error_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void TCPConnection::OnSentPacket(rtc::AsyncPacketSocket* socket,
|
||||||
|
const rtc::SentPacket& sent_packet) {
|
||||||
|
RTC_DCHECK_RUN_ON(network_thread());
|
||||||
|
if (port()) {
|
||||||
|
port()->SignalSentPacket(sent_packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void TCPConnection::OnConnectionRequestResponse(StunRequest* req,
|
void TCPConnection::OnConnectionRequestResponse(StunRequest* req,
|
||||||
StunMessage* response) {
|
StunMessage* response) {
|
||||||
// Process the STUN response before we inform upper layer ready to send.
|
// Process the STUN response before we inform upper layer ready to send.
|
||||||
@ -606,14 +609,20 @@ void TCPConnection::CreateOutgoingTcpSocket() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) {
|
void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) {
|
||||||
|
// Incoming connections register SignalSentPacket and SignalReadyToSend
|
||||||
|
// directly on the port in TCPPort::OnNewConnection.
|
||||||
if (outgoing_) {
|
if (outgoing_) {
|
||||||
socket->SignalConnect.connect(this, &TCPConnection::OnConnect);
|
socket->SignalConnect.connect(this, &TCPConnection::OnConnect);
|
||||||
|
socket->SignalSentPacket.connect(this, &TCPConnection::OnSentPacket);
|
||||||
|
socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For incoming connections, this re-register ReceivedPacketCallback to the
|
||||||
|
// connection instead of the port.
|
||||||
socket->RegisterReceivedPacketCallback(
|
socket->RegisterReceivedPacketCallback(
|
||||||
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
|
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
|
||||||
OnReadPacket(socket, packet);
|
OnReadPacket(socket, packet);
|
||||||
});
|
});
|
||||||
socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend);
|
|
||||||
socket->SubscribeCloseEvent(this, [this, safety = network_safety_.flag()](
|
socket->SubscribeCloseEvent(this, [this, safety = network_safety_.flag()](
|
||||||
rtc::AsyncPacketSocket* s, int err) {
|
rtc::AsyncPacketSocket* s, int err) {
|
||||||
if (safety->alive())
|
if (safety->alive())
|
||||||
@ -623,10 +632,12 @@ void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) {
|
|||||||
|
|
||||||
void TCPConnection::DisconnectSocketSignals(rtc::AsyncPacketSocket* socket) {
|
void TCPConnection::DisconnectSocketSignals(rtc::AsyncPacketSocket* socket) {
|
||||||
if (outgoing_) {
|
if (outgoing_) {
|
||||||
|
// Incoming connections do not register these signals in TCPConnection.
|
||||||
socket->SignalConnect.disconnect(this);
|
socket->SignalConnect.disconnect(this);
|
||||||
|
socket->SignalReadyToSend.disconnect(this);
|
||||||
|
socket->SignalSentPacket.disconnect(this);
|
||||||
}
|
}
|
||||||
socket->DeregisterReceivedPacketCallback();
|
socket->DeregisterReceivedPacketCallback();
|
||||||
socket->SignalReadyToSend.disconnect(this);
|
|
||||||
socket->UnsubscribeCloseEvent(this);
|
socket->UnsubscribeCloseEvent(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -174,6 +174,8 @@ class TCPConnection : public Connection, public sigslot::has_slots<> {
|
|||||||
|
|
||||||
void OnConnect(rtc::AsyncPacketSocket* socket);
|
void OnConnect(rtc::AsyncPacketSocket* socket);
|
||||||
void OnClose(rtc::AsyncPacketSocket* socket, int error);
|
void OnClose(rtc::AsyncPacketSocket* socket, int error);
|
||||||
|
void OnSentPacket(rtc::AsyncPacketSocket* socket,
|
||||||
|
const rtc::SentPacket& sent_packet);
|
||||||
void OnReadPacket(rtc::AsyncPacketSocket* socket,
|
void OnReadPacket(rtc::AsyncPacketSocket* socket,
|
||||||
const rtc::ReceivedPacket& packet);
|
const rtc::ReceivedPacket& packet);
|
||||||
void OnReadyToSend(rtc::AsyncPacketSocket* socket);
|
void OnReadyToSend(rtc::AsyncPacketSocket* socket);
|
||||||
|
|||||||
@ -20,6 +20,7 @@
|
|||||||
#include "rtc_base/crypto_random.h"
|
#include "rtc_base/crypto_random.h"
|
||||||
#include "rtc_base/gunit.h"
|
#include "rtc_base/gunit.h"
|
||||||
#include "rtc_base/ip_address.h"
|
#include "rtc_base/ip_address.h"
|
||||||
|
#include "rtc_base/socket_address.h"
|
||||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||||
#include "rtc_base/thread.h"
|
#include "rtc_base/thread.h"
|
||||||
#include "rtc_base/time_utils.h"
|
#include "rtc_base/time_utils.h"
|
||||||
@ -82,7 +83,9 @@ class TCPPortTest : public ::testing::Test, public sigslot::has_slots<> {
|
|||||||
return &networks_.back();
|
return &networks_.back();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<TCPPort> CreateTCPPort(const SocketAddress& addr) {
|
std::unique_ptr<TCPPort> CreateTCPPort(const SocketAddress& addr,
|
||||||
|
bool allow_listen = true,
|
||||||
|
int port_number = 0) {
|
||||||
auto port = std::unique_ptr<TCPPort>(
|
auto port = std::unique_ptr<TCPPort>(
|
||||||
TCPPort::Create({.network_thread = &main_,
|
TCPPort::Create({.network_thread = &main_,
|
||||||
.socket_factory = &socket_factory_,
|
.socket_factory = &socket_factory_,
|
||||||
@ -90,7 +93,7 @@ class TCPPortTest : public ::testing::Test, public sigslot::has_slots<> {
|
|||||||
.ice_username_fragment = username_,
|
.ice_username_fragment = username_,
|
||||||
.ice_password = password_,
|
.ice_password = password_,
|
||||||
.field_trials = &field_trials_},
|
.field_trials = &field_trials_},
|
||||||
0, 0, true));
|
port_number, port_number, allow_listen));
|
||||||
port->SetIceTiebreaker(kTiebreakerDefault);
|
port->SetIceTiebreaker(kTiebreakerDefault);
|
||||||
return port;
|
return port;
|
||||||
}
|
}
|
||||||
@ -267,3 +270,93 @@ TEST_F(TCPPortTest, SignalSentPacket) {
|
|||||||
EXPECT_EQ_WAIT(10, client_counter.sent_packets(), kTimeout);
|
EXPECT_EQ_WAIT(10, client_counter.sent_packets(), kTimeout);
|
||||||
EXPECT_EQ_WAIT(10, server_counter.sent_packets(), kTimeout);
|
EXPECT_EQ_WAIT(10, server_counter.sent_packets(), kTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that SignalSentPacket is fired when a packet is successfully sent, even
|
||||||
|
// after a remote server has been restarted.
|
||||||
|
TEST_F(TCPPortTest, SignalSentPacketAfterReconnect) {
|
||||||
|
std::unique_ptr<TCPPort> client(
|
||||||
|
CreateTCPPort(kLocalAddr, /*allow_listen=*/false));
|
||||||
|
constexpr int kServerPort = 123;
|
||||||
|
std::unique_ptr<TCPPort> server(
|
||||||
|
CreateTCPPort(kRemoteAddr, /*allow_listen=*/true, kServerPort));
|
||||||
|
client->SetIceRole(cricket::ICEROLE_CONTROLLING);
|
||||||
|
server->SetIceRole(cricket::ICEROLE_CONTROLLED);
|
||||||
|
client->PrepareAddress();
|
||||||
|
server->PrepareAddress();
|
||||||
|
|
||||||
|
Connection* client_conn =
|
||||||
|
client->CreateConnection(server->Candidates()[0], Port::ORIGIN_MESSAGE);
|
||||||
|
ASSERT_NE(nullptr, client_conn);
|
||||||
|
ASSERT_TRUE_WAIT(client_conn->connected(), kTimeout);
|
||||||
|
|
||||||
|
// Need to get the port of the actual outgoing socket.
|
||||||
|
cricket::Candidate client_candidate = client->Candidates()[0];
|
||||||
|
client_candidate.set_address(static_cast<cricket::TCPConnection*>(client_conn)
|
||||||
|
->socket()
|
||||||
|
->GetLocalAddress());
|
||||||
|
client_candidate.set_tcptype("");
|
||||||
|
Connection* server_conn =
|
||||||
|
server->CreateConnection(client_candidate, Port::ORIGIN_THIS_PORT);
|
||||||
|
ASSERT_TRUE_WAIT(server_conn->connected(), kTimeout);
|
||||||
|
EXPECT_FALSE(client_conn->writable());
|
||||||
|
client_conn->Ping(rtc::TimeMillis());
|
||||||
|
ASSERT_TRUE_WAIT(client_conn->writable(), kTimeout);
|
||||||
|
|
||||||
|
SentPacketCounter client_counter(client.get());
|
||||||
|
static const char kData[] = "hello";
|
||||||
|
int result = client_conn->Send(&kData, sizeof(kData), rtc::PacketOptions());
|
||||||
|
EXPECT_EQ(result, 6);
|
||||||
|
|
||||||
|
// Deleting the server port should break the current connection.
|
||||||
|
server = nullptr;
|
||||||
|
server_conn = nullptr;
|
||||||
|
ASSERT_TRUE_WAIT(!client_conn->connected(), kTimeout);
|
||||||
|
|
||||||
|
// Recreate the server port with the same port number.
|
||||||
|
server = CreateTCPPort(kRemoteAddr, /*allow_listen=*/true, kServerPort);
|
||||||
|
server->SetIceRole(cricket::ICEROLE_CONTROLLED);
|
||||||
|
server->PrepareAddress();
|
||||||
|
|
||||||
|
// Sending a packet from the client will trigger a reconnect attempt but the
|
||||||
|
// packet will be discarded.
|
||||||
|
result = client_conn->Send(&kData, sizeof(kData), rtc::PacketOptions());
|
||||||
|
EXPECT_EQ(result, SOCKET_ERROR);
|
||||||
|
ASSERT_TRUE_WAIT(client_conn->connected(), kTimeout);
|
||||||
|
// For unknown reasons, connection is still supposed to be writable....
|
||||||
|
EXPECT_TRUE(client_conn->writable());
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
// All sent packets still fail to send.
|
||||||
|
EXPECT_EQ(client_conn->Send(&kData, sizeof(kData), rtc::PacketOptions()),
|
||||||
|
SOCKET_ERROR);
|
||||||
|
}
|
||||||
|
// And are not reported as sent.
|
||||||
|
EXPECT_EQ_WAIT(client_counter.sent_packets(), 1, kTimeout);
|
||||||
|
|
||||||
|
// Create the server connection again so server can reply to STUN pings.
|
||||||
|
// Client outgoing socket port will have changed since the client create a new
|
||||||
|
// socket when it reconnect.
|
||||||
|
client_candidate = client->Candidates()[0];
|
||||||
|
client_candidate.set_address(static_cast<cricket::TCPConnection*>(client_conn)
|
||||||
|
->socket()
|
||||||
|
->GetLocalAddress());
|
||||||
|
client_candidate.set_tcptype("");
|
||||||
|
server_conn =
|
||||||
|
server->CreateConnection(client_candidate, Port::ORIGIN_THIS_PORT);
|
||||||
|
ASSERT_TRUE_WAIT(server_conn->connected(), kTimeout);
|
||||||
|
EXPECT_EQ_WAIT(client_counter.sent_packets(), 1, kTimeout);
|
||||||
|
|
||||||
|
// Send Stun Binding request.
|
||||||
|
client_conn->Ping(rtc::TimeMillis());
|
||||||
|
// The Stun Binding request is reported as sent.
|
||||||
|
EXPECT_EQ_WAIT(client_counter.sent_packets(), 2, kTimeout);
|
||||||
|
// Wait a bit for the Stun response to be received.
|
||||||
|
rtc::Thread::Current()->ProcessMessages(100);
|
||||||
|
|
||||||
|
// After the Stun Ping response has been received, packets can be sent again
|
||||||
|
// and SignalSentPacket should be invoked.
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
EXPECT_EQ(client_conn->Send(&kData, sizeof(kData), rtc::PacketOptions()),
|
||||||
|
6);
|
||||||
|
}
|
||||||
|
EXPECT_EQ_WAIT(client_counter.sent_packets(), 2 + 5, kTimeout);
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user