rtc::Port use Socket::RegisterReceivedPacketCallback

Change Stun, Turn, TCP and UDP  ports to use Socket::RegisterReceivedPacketCallback and be notified of received packets using the rtc::ReceivedPacket class.


Change-Id: I7eba2ffdd83ae6f6181f765474aab7c57ff25507
Bug: webrtc:15368, webrtc:11943
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/327941
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Jonas Oreland <jonaso@google.com>
Cr-Commit-Position: refs/heads/main@{#41195}
This commit is contained in:
Per K 2023-11-20 12:21:34 +01:00 committed by WebRTC LUCI CQ
parent 2d86b258e0
commit c09cc4f961
12 changed files with 126 additions and 126 deletions

View File

@ -13,6 +13,7 @@
#include <math.h> #include <math.h>
#include <algorithm> #include <algorithm>
#include <cstddef>
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -31,6 +32,7 @@
#include "rtc_base/message_digest.h" #include "rtc_base/message_digest.h"
#include "rtc_base/network.h" #include "rtc_base/network.h"
#include "rtc_base/numerics/safe_minmax.h" #include "rtc_base/numerics/safe_minmax.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/string_encode.h" #include "rtc_base/string_encode.h"
#include "rtc_base/string_utils.h" #include "rtc_base/string_utils.h"
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
@ -359,10 +361,10 @@ void Port::AddOrReplaceConnection(Connection* conn) {
} }
} }
void Port::OnReadPacket(const char* data, void Port::OnReadPacket(const rtc::ReceivedPacket& packet, ProtocolType proto) {
size_t size, const char* data = reinterpret_cast<const char*>(packet.payload().data());
const rtc::SocketAddress& addr, size_t size = packet.payload().size();
ProtocolType proto) { const rtc::SocketAddress& addr = packet.source_address();
// If the user has enabled port packets, just hand this over. // If the user has enabled port packets, just hand this over.
if (enable_port_packets_) { if (enable_port_packets_) {
SignalReadPacket(this, data, size, addr); SignalReadPacket(this, data, size, addr);
@ -725,10 +727,7 @@ std::string Port::CreateStunUsername(absl::string_view remote_username) const {
} }
bool Port::HandleIncomingPacket(rtc::AsyncPacketSocket* socket, bool Port::HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size,
const rtc::SocketAddress& remote_addr,
int64_t packet_time_us) {
RTC_DCHECK_NOTREACHED(); RTC_DCHECK_NOTREACHED();
return false; return false;
} }

View File

@ -43,6 +43,7 @@
#include "rtc_base/memory/always_valid_pointer.h" #include "rtc_base/memory/always_valid_pointer.h"
#include "rtc_base/net_helper.h" #include "rtc_base/net_helper.h"
#include "rtc_base/network.h" #include "rtc_base/network.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/proxy_info.h" #include "rtc_base/proxy_info.h"
#include "rtc_base/rate_tracker.h" #include "rtc_base/rate_tracker.h"
#include "rtc_base/socket_address.h" #include "rtc_base/socket_address.h"
@ -313,10 +314,7 @@ class RTC_EXPORT Port : public PortInterface, public sigslot::has_slots<> {
// port implemented this method. // port implemented this method.
// TODO(mallinath) - Make it pure virtual. // TODO(mallinath) - Make it pure virtual.
virtual bool HandleIncomingPacket(rtc::AsyncPacketSocket* socket, virtual bool HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet);
size_t size,
const rtc::SocketAddress& remote_addr,
int64_t packet_time_us);
// Shall the port handle packet from this `remote_addr`. // Shall the port handle packet from this `remote_addr`.
// This method is overridden by TurnPort. // This method is overridden by TurnPort.
@ -422,10 +420,19 @@ class RTC_EXPORT Port : public PortInterface, public sigslot::has_slots<> {
// Called when a packet is received from an unknown address that is not // Called when a packet is received from an unknown address that is not
// currently a connection. If this is an authenticated STUN binding request, // currently a connection. If this is an authenticated STUN binding request,
// then we will signal the client. // then we will signal the client.
void OnReadPacket(const char* data, void OnReadPacket(const rtc::ReceivedPacket& packet, ProtocolType proto);
[[deprecated(
"Use OnReadPacket(const rtc::ReceivedPacket& packet, ProtocolType "
"proto)")]] void
OnReadPacket(const char* data,
size_t size, size_t size,
const rtc::SocketAddress& addr, const rtc::SocketAddress& addr,
ProtocolType proto); ProtocolType proto) {
OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
data, size, /*packet_time_us = */ -1, addr),
proto);
}
// If the given data comprises a complete and correct STUN message then the // If the given data comprises a complete and correct STUN message then the
// return value is true, otherwise false. If the message username corresponds // return value is true, otherwise false. If the message username corresponds

View File

@ -25,6 +25,7 @@
#include "rtc_base/helpers.h" #include "rtc_base/helpers.h"
#include "rtc_base/ip_address.h" #include "rtc_base/ip_address.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
namespace cricket { namespace cricket {
@ -222,7 +223,10 @@ bool UDPPort::Init() {
RTC_LOG(LS_WARNING) << ToString() << ": UDP socket creation failed"; RTC_LOG(LS_WARNING) << ToString() << ": UDP socket creation failed";
return false; return false;
} }
socket_->SignalReadPacket.connect(this, &UDPPort::OnReadPacket); socket_->RegisterReceivedPacketCallback(
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
OnReadPacket(socket, packet);
});
} }
socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket); socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket);
socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend); socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend);
@ -342,12 +346,9 @@ int UDPPort::GetError() {
} }
bool UDPPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket, bool UDPPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size,
const rtc::SocketAddress& remote_addr,
int64_t packet_time_us) {
// All packets given to UDP port will be consumed. // All packets given to UDP port will be consumed.
OnReadPacket(socket, data, size, remote_addr, packet_time_us); OnReadPacket(socket, packet);
return true; return true;
} }
@ -389,27 +390,26 @@ void UDPPort::PostAddAddress(bool is_final) {
} }
void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket, void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
RTC_DCHECK(socket == socket_); RTC_DCHECK(socket == socket_);
RTC_DCHECK(!remote_addr.IsUnresolvedIP()); RTC_DCHECK(!packet.source_address().IsUnresolvedIP());
// Look for a response from the STUN server. // Look for a response from the STUN server.
// Even if the response doesn't match one of our outstanding requests, we // Even if the response doesn't match one of our outstanding requests, we
// will eat it because it might be a response to a retransmitted packet, and // will eat it because it might be a response to a retransmitted packet, and
// we already cleared the request when we got the first response. // we already cleared the request when we got the first response.
if (server_addresses_.find(remote_addr) != server_addresses_.end()) { if (server_addresses_.find(packet.source_address()) !=
request_manager_.CheckResponse(data, size); server_addresses_.end()) {
request_manager_.CheckResponse(
reinterpret_cast<const char*>(packet.payload().data()),
packet.payload().size());
return; return;
} }
if (Connection* conn = GetConnection(remote_addr)) { if (Connection* conn = GetConnection(packet.source_address())) {
conn->OnReadPacket( conn->OnReadPacket(packet);
rtc::ReceivedPacket::CreateFromLegacy(data, size, packet_time_us));
} else { } else {
Port::OnReadPacket(data, size, remote_addr, PROTO_UDP); Port::OnReadPacket(packet, PROTO_UDP);
} }
} }

View File

@ -22,6 +22,7 @@
#include "p2p/base/port.h" #include "p2p/base/port.h"
#include "p2p/base/stun_request.h" #include "p2p/base/stun_request.h"
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/system/rtc_export.h" #include "rtc_base/system/rtc_export.h"
namespace cricket { namespace cricket {
@ -97,10 +98,7 @@ class RTC_EXPORT UDPPort : public Port {
int GetError() override; int GetError() override;
bool HandleIncomingPacket(rtc::AsyncPacketSocket* socket, bool HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) override;
size_t size,
const rtc::SocketAddress& remote_addr,
int64_t packet_time_us) override;
bool SupportsProtocol(absl::string_view protocol) const override; bool SupportsProtocol(absl::string_view protocol) const override;
ProtocolType GetProtocol() const override; ProtocolType GetProtocol() const override;
@ -158,10 +156,7 @@ class RTC_EXPORT UDPPort : public Port {
void PostAddAddress(bool is_final) override; void PostAddAddress(bool is_final) override;
void OnReadPacket(rtc::AsyncPacketSocket* socket, void OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet);
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us);
void OnSentPacket(rtc::AsyncPacketSocket* socket, void OnSentPacket(rtc::AsyncPacketSocket* socket,
const rtc::SentPacket& sent_packet) override; const rtc::SentPacket& sent_packet) override;

View File

@ -16,8 +16,10 @@
#include "p2p/base/basic_packet_socket_factory.h" #include "p2p/base/basic_packet_socket_factory.h"
#include "p2p/base/mock_dns_resolving_packet_socket_factory.h" #include "p2p/base/mock_dns_resolving_packet_socket_factory.h"
#include "p2p/base/test_stun_server.h" #include "p2p/base/test_stun_server.h"
#include "rtc_base/async_packet_socket.h"
#include "rtc_base/gunit.h" #include "rtc_base/gunit.h"
#include "rtc_base/helpers.h" #include "rtc_base/helpers.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/socket_address.h" #include "rtc_base/socket_address.h"
#include "rtc_base/ssl_adapter.h" #include "rtc_base/ssl_adapter.h"
#include "rtc_base/virtual_socket_server.h" #include "rtc_base/virtual_socket_server.h"
@ -160,7 +162,10 @@ class StunPortTestBase : public ::testing::Test, public sigslot::has_slots<> {
rtc::SocketAddress(kLocalAddr.ipaddr(), 0), 0, 0)); rtc::SocketAddress(kLocalAddr.ipaddr(), 0), 0, 0));
} }
ASSERT_TRUE(socket_ != NULL); ASSERT_TRUE(socket_ != NULL);
socket_->SignalReadPacket.connect(this, &StunPortTestBase::OnReadPacket); socket_->RegisterReceivedPacketCallback(
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
OnReadPacket(socket, packet);
});
stun_port_ = cricket::UDPPort::Create( stun_port_ = cricket::UDPPort::Create(
rtc::Thread::Current(), socket_factory(), &network_, socket_.get(), rtc::Thread::Current(), socket_factory(), &network_, socket_.get(),
rtc::CreateRandomString(16), rtc::CreateRandomString(22), false, rtc::CreateRandomString(16), rtc::CreateRandomString(22), false,
@ -178,18 +183,15 @@ class StunPortTestBase : public ::testing::Test, public sigslot::has_slots<> {
void PrepareAddress() { stun_port_->PrepareAddress(); } void PrepareAddress() { stun_port_->PrepareAddress(); }
void OnReadPacket(rtc::AsyncPacketSocket* socket, void OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size, stun_port_->HandleIncomingPacket(socket, packet);
const rtc::SocketAddress& remote_addr,
const int64_t& /* packet_time_us */) {
stun_port_->HandleIncomingPacket(socket, data, size, remote_addr,
/* packet_time_us */ -1);
} }
void SendData(const char* data, size_t len) { void SendData(const char* data, size_t len) {
stun_port_->HandleIncomingPacket(socket_.get(), data, len, stun_port_->HandleIncomingPacket(socket_.get(),
rtc::SocketAddress("22.22.22.22", 0), rtc::ReceivedPacket::CreateFromLegacy(
/* packet_time_us */ -1); data, len, /* packet_time_us */ -1,
rtc::SocketAddress("22.22.22.22", 0)));
} }
void EnableMdnsObfuscation() { void EnableMdnsObfuscation() {

View File

@ -81,6 +81,7 @@
#include "rtc_base/ip_address.h" #include "rtc_base/ip_address.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/net_helper.h" #include "rtc_base/net_helper.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/rate_tracker.h" #include "rtc_base/rate_tracker.h"
namespace cricket { namespace cricket {
@ -159,7 +160,7 @@ Connection* TCPPort::CreateConnection(const Candidate& address,
// Incoming connection; we already created a socket and connected signals, // Incoming connection; we already created a socket and connected signals,
// so we need to hand off the "read packet" responsibility to // so we need to hand off the "read packet" responsibility to
// TCPConnection. // TCPConnection.
socket->SignalReadPacket.disconnect(this); socket->DeregisterReceivedPacketCallback();
conn = new TCPConnection(NewWeakPtr(), address, socket); conn = new TCPConnection(NewWeakPtr(), address, socket);
} else { } else {
// Outgoing connection, which will create a new socket for which we still // Outgoing connection, which will create a new socket for which we still
@ -288,7 +289,10 @@ void TCPPort::OnNewConnection(rtc::AsyncListenSocket* socket,
Incoming incoming; Incoming incoming;
incoming.addr = new_socket->GetRemoteAddress(); incoming.addr = new_socket->GetRemoteAddress();
incoming.socket = new_socket; incoming.socket = new_socket;
incoming.socket->SignalReadPacket.connect(this, &TCPPort::OnReadPacket); incoming.socket->RegisterReceivedPacketCallback(
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
OnReadPacket(socket, packet);
});
incoming.socket->SignalReadyToSend.connect(this, &TCPPort::OnReadyToSend); incoming.socket->SignalReadyToSend.connect(this, &TCPPort::OnReadyToSend);
incoming.socket->SignalSentPacket.connect(this, &TCPPort::OnSentPacket); incoming.socket->SignalSentPacket.connect(this, &TCPPort::OnSentPacket);
@ -326,11 +330,8 @@ rtc::AsyncPacketSocket* TCPPort::GetIncoming(const rtc::SocketAddress& addr,
} }
void TCPPort::OnReadPacket(rtc::AsyncPacketSocket* socket, void TCPPort::OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size, Port::OnReadPacket(packet, PROTO_TCP);
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
Port::OnReadPacket(data, size, remote_addr, PROTO_TCP);
} }
void TCPPort::OnSentPacket(rtc::AsyncPacketSocket* socket, void TCPPort::OnSentPacket(rtc::AsyncPacketSocket* socket,
@ -559,14 +560,10 @@ void TCPConnection::MaybeReconnect() {
} }
void TCPConnection::OnReadPacket(rtc::AsyncPacketSocket* socket, void TCPConnection::OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK_EQ(socket, socket_.get()); RTC_DCHECK_EQ(socket, socket_.get());
Connection::OnReadPacket( Connection::OnReadPacket(packet);
rtc::ReceivedPacket::CreateFromLegacy(data, size, packet_time_us));
} }
void TCPConnection::OnReadyToSend(rtc::AsyncPacketSocket* socket) { void TCPConnection::OnReadyToSend(rtc::AsyncPacketSocket* socket) {
@ -624,7 +621,10 @@ void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) {
if (outgoing_) { if (outgoing_) {
socket->SignalConnect.connect(this, &TCPConnection::OnConnect); socket->SignalConnect.connect(this, &TCPConnection::OnConnect);
} }
socket->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket); socket->RegisterReceivedPacketCallback(
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
OnReadPacket(socket, packet);
});
socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend); socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend);
socket->SubscribeCloseEvent(this, [this, safety = network_safety_.flag()]( socket->SubscribeCloseEvent(this, [this, safety = network_safety_.flag()](
rtc::AsyncPacketSocket* s, int err) { rtc::AsyncPacketSocket* s, int err) {
@ -637,7 +637,7 @@ void TCPConnection::DisconnectSocketSignals(rtc::AsyncPacketSocket* socket) {
if (outgoing_) { if (outgoing_) {
socket->SignalConnect.disconnect(this); socket->SignalConnect.disconnect(this);
} }
socket->SignalReadPacket.disconnect(this); socket->DeregisterReceivedPacketCallback();
socket->SignalReadyToSend.disconnect(this); socket->SignalReadyToSend.disconnect(this);
socket->UnsubscribeCloseEvent(this); socket->UnsubscribeCloseEvent(this);
} }

View File

@ -22,6 +22,7 @@
#include "p2p/base/port.h" #include "p2p/base/port.h"
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
#include "rtc_base/containers/flat_map.h" #include "rtc_base/containers/flat_map.h"
#include "rtc_base/network/received_packet.h"
namespace cricket { namespace cricket {
@ -101,10 +102,7 @@ class TCPPort : public Port {
// Receives packet signal from the local TCP Socket. // Receives packet signal from the local TCP Socket.
void OnReadPacket(rtc::AsyncPacketSocket* socket, void OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet);
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us);
void OnSentPacket(rtc::AsyncPacketSocket* socket, void OnSentPacket(rtc::AsyncPacketSocket* socket,
const rtc::SentPacket& sent_packet) override; const rtc::SentPacket& sent_packet) override;
@ -170,10 +168,7 @@ class TCPConnection : public Connection, public sigslot::has_slots<> {
void OnConnect(rtc::AsyncPacketSocket* socket); void OnConnect(rtc::AsyncPacketSocket* socket);
void OnClose(rtc::AsyncPacketSocket* socket, int error); void OnClose(rtc::AsyncPacketSocket* socket, int error);
void OnReadPacket(rtc::AsyncPacketSocket* socket, void OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet);
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us);
void OnReadyToSend(rtc::AsyncPacketSocket* socket); void OnReadyToSend(rtc::AsyncPacketSocket* socket);
void OnDestroyed(Connection* c); void OnDestroyed(Connection* c);

View File

@ -10,6 +10,7 @@
#include "p2p/base/turn_port.h" #include "p2p/base/turn_port.h"
#include <cstdint>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <utility> #include <utility>
@ -29,6 +30,7 @@
#include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/net_helpers.h" #include "rtc_base/net_helpers.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/socket_address.h" #include "rtc_base/socket_address.h"
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
@ -435,7 +437,10 @@ bool TurnPort::CreateTurnClientSocket() {
if (!SharedSocket()) { if (!SharedSocket()) {
// If socket is shared, AllocationSequence will receive the packet. // If socket is shared, AllocationSequence will receive the packet.
socket_->SignalReadPacket.connect(this, &TurnPort::OnReadPacket); socket_->RegisterReceivedPacketCallback(
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
OnReadPacket(socket, packet);
});
} }
socket_->SignalReadyToSend.connect(this, &TurnPort::OnReadyToSend); socket_->SignalReadyToSend.connect(this, &TurnPort::OnReadyToSend);
@ -679,10 +684,7 @@ void TurnPort::SendBindingErrorResponse(StunMessage* message,
} }
bool TurnPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket, bool TurnPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size,
const rtc::SocketAddress& remote_addr,
int64_t packet_time_us) {
if (socket != socket_) { if (socket != socket_) {
// The packet was received on a shared socket after we've allocated a new // The packet was received on a shared socket after we've allocated a new
// socket for this TURN port. // socket for this TURN port.
@ -692,16 +694,17 @@ bool TurnPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
// This is to guard against a STUN response from previous server after // This is to guard against a STUN response from previous server after
// alternative server redirection. TODO(guoweis): add a unit test for this // alternative server redirection. TODO(guoweis): add a unit test for this
// race condition. // race condition.
if (remote_addr != server_address_.address) { if (packet.source_address() != server_address_.address) {
RTC_LOG(LS_WARNING) RTC_LOG(LS_WARNING)
<< ToString() << ": Discarding TURN message from unknown address: " << ToString() << ": Discarding TURN message from unknown address: "
<< remote_addr.ToSensitiveNameAndAddressString() << " server_address_: " << packet.source_address().ToSensitiveNameAndAddressString()
<< " server_address_: "
<< server_address_.address.ToSensitiveNameAndAddressString(); << server_address_.address.ToSensitiveNameAndAddressString();
return false; return false;
} }
// The message must be at least the size of a channel header. // The message must be at least the size of a channel header.
if (size < TURN_CHANNEL_HEADER_SIZE) { if (packet.payload().size() < TURN_CHANNEL_HEADER_SIZE) {
RTC_LOG(LS_WARNING) << ToString() RTC_LOG(LS_WARNING) << ToString()
<< ": Received TURN message that was too short"; << ": Received TURN message that was too short";
return false; return false;
@ -714,10 +717,15 @@ bool TurnPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
return false; return false;
} }
const char* data = reinterpret_cast<const char*>(packet.payload().data());
int size = packet.payload().size();
int64_t packet_time_us =
packet.arrival_time() ? packet.arrival_time()->us() : -1;
// Check the message type, to see if is a Channel Data message. // Check the message type, to see if is a Channel Data message.
// The message will either be channel data, a TURN data indication, or // The message will either be channel data, a TURN data indication, or
// a response to a previous request. // a response to a previous request.
uint16_t msg_type = rtc::GetBE16(data); uint16_t msg_type = rtc::GetBE16(packet.payload().data());
if (IsTurnChannelData(msg_type)) { if (IsTurnChannelData(msg_type)) {
HandleChannelData(msg_type, data, size, packet_time_us); HandleChannelData(msg_type, data, size, packet_time_us);
return true; return true;
@ -742,11 +750,8 @@ bool TurnPort::HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
} }
void TurnPort::OnReadPacket(rtc::AsyncPacketSocket* socket, void TurnPort::OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size, HandleIncomingPacket(socket, packet);
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
HandleIncomingPacket(socket, data, size, remote_addr, packet_time_us);
} }
void TurnPort::OnSentPacket(rtc::AsyncPacketSocket* socket, void TurnPort::OnSentPacket(rtc::AsyncPacketSocket* socket,
@ -1073,11 +1078,12 @@ void TurnPort::DispatchPacket(const char* data,
const rtc::SocketAddress& remote_addr, const rtc::SocketAddress& remote_addr,
ProtocolType proto, ProtocolType proto,
int64_t packet_time_us) { int64_t packet_time_us) {
rtc::ReceivedPacket packet = rtc::ReceivedPacket::CreateFromLegacy(
data, size, packet_time_us, remote_addr);
if (Connection* conn = GetConnection(remote_addr)) { if (Connection* conn = GetConnection(remote_addr)) {
conn->OnReadPacket( conn->OnReadPacket(packet);
rtc::ReceivedPacket::CreateFromLegacy(data, size, packet_time_us));
} else { } else {
Port::OnReadPacket(data, size, remote_addr, proto); Port::OnReadPacket(packet, proto);
} }
} }

View File

@ -27,6 +27,7 @@
#include "p2p/base/port.h" #include "p2p/base/port.h"
#include "p2p/client/basic_port_allocator.h" #include "p2p/client/basic_port_allocator.h"
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/ssl_certificate.h" #include "rtc_base/ssl_certificate.h"
namespace webrtc { namespace webrtc {
@ -144,10 +145,7 @@ class TurnPort : public Port {
int GetError() override; int GetError() override;
bool HandleIncomingPacket(rtc::AsyncPacketSocket* socket, bool HandleIncomingPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) override;
size_t size,
const rtc::SocketAddress& remote_addr,
int64_t packet_time_us) override;
bool CanHandleIncomingPacketsFrom( bool CanHandleIncomingPacketsFrom(
const rtc::SocketAddress& addr) const override; const rtc::SocketAddress& addr) const override;
@ -159,10 +157,7 @@ class TurnPort : public Port {
absl::string_view reason) override; absl::string_view reason) override;
virtual void OnReadPacket(rtc::AsyncPacketSocket* socket, virtual void OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet);
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us);
void OnSentPacket(rtc::AsyncPacketSocket* socket, void OnSentPacket(rtc::AsyncPacketSocket* socket,
const rtc::SentPacket& sent_packet) override; const rtc::SentPacket& sent_packet) override;

View File

@ -7,6 +7,10 @@
* in the file PATENTS. All contributing project authors may * in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree. * be found in the AUTHORS file in the root of the source tree.
*/ */
#include <cstdint>
#include "api/array_view.h"
#include "rtc_base/network/received_packet.h"
#if defined(WEBRTC_POSIX) #if defined(WEBRTC_POSIX)
#include <dirent.h> #include <dirent.h>
@ -218,12 +222,8 @@ class TurnPortTest : public ::testing::Test,
} }
void OnUdpPortComplete(Port* port) { udp_ready_ = true; } void OnUdpPortComplete(Port* port) { udp_ready_ = true; }
void OnSocketReadPacket(rtc::AsyncPacketSocket* socket, void OnSocketReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size, turn_port_->HandleIncomingPacket(socket, packet);
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
turn_port_->HandleIncomingPacket(socket, data, size, remote_addr,
packet_time_us);
} }
void OnTurnPortDestroyed(PortInterface* port) { turn_port_destroyed_ = true; } void OnTurnPortDestroyed(PortInterface* port) { turn_port_destroyed_ = true; }
@ -323,8 +323,11 @@ class TurnPortTest : public ::testing::Test,
socket_.reset(socket_factory()->CreateUdpSocket( socket_.reset(socket_factory()->CreateUdpSocket(
rtc::SocketAddress(kLocalAddr1.ipaddr(), 0), 0, 0)); rtc::SocketAddress(kLocalAddr1.ipaddr(), 0), 0, 0));
ASSERT_TRUE(socket_ != NULL); ASSERT_TRUE(socket_ != NULL);
socket_->SignalReadPacket.connect(this, socket_->RegisterReceivedPacketCallback(
&TurnPortTest::OnSocketReadPacket); [&](rtc::AsyncPacketSocket* socket,
const rtc::ReceivedPacket& packet) {
OnSocketReadPacket(socket, packet);
});
} }
RelayServerConfig config; RelayServerConfig config;
@ -1193,8 +1196,10 @@ TEST_F(TurnPortTest, TestTurnAllocateMismatch) {
// Verify that all packets received from the shared socket are ignored. // Verify that all packets received from the shared socket are ignored.
std::string test_packet = "Test packet"; std::string test_packet = "Test packet";
EXPECT_FALSE(turn_port_->HandleIncomingPacket( EXPECT_FALSE(turn_port_->HandleIncomingPacket(
socket_.get(), test_packet.data(), test_packet.size(), socket_.get(),
rtc::SocketAddress(kTurnUdpExtAddr.ipaddr(), 0), rtc::TimeMicros())); rtc::ReceivedPacket::CreateFromLegacy(
test_packet.data(), test_packet.size(), rtc::TimeMicros(),
rtc::SocketAddress(kTurnUdpExtAddr.ipaddr(), 0))));
} }
// Tests that a shared-socket-TurnPort creates its own socket after // Tests that a shared-socket-TurnPort creates its own socket after

View File

@ -1308,8 +1308,11 @@ void AllocationSequence::Init() {
rtc::SocketAddress(network_->GetBestIP(), 0), rtc::SocketAddress(network_->GetBestIP(), 0),
session_->allocator()->min_port(), session_->allocator()->max_port())); session_->allocator()->min_port(), session_->allocator()->max_port()));
if (udp_socket_) { if (udp_socket_) {
udp_socket_->SignalReadPacket.connect(this, udp_socket_->RegisterReceivedPacketCallback(
&AllocationSequence::OnReadPacket); [&](rtc::AsyncPacketSocket* socket,
const rtc::ReceivedPacket& packet) {
OnReadPacket(socket, packet);
});
} }
// Continuing if `udp_socket_` is NULL, as local TCP and RelayPort using TCP // Continuing if `udp_socket_` is NULL, as local TCP and RelayPort using TCP
// are next available options to setup a communication channel. // are next available options to setup a communication channel.
@ -1668,10 +1671,7 @@ void AllocationSequence::CreateTurnPort(const RelayServerConfig& config,
} }
void AllocationSequence::OnReadPacket(rtc::AsyncPacketSocket* socket, void AllocationSequence::OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet) {
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
RTC_DCHECK(socket == udp_socket_.get()); RTC_DCHECK(socket == udp_socket_.get());
bool turn_port_found = false; bool turn_port_found = false;
@ -1683,9 +1683,8 @@ void AllocationSequence::OnReadPacket(rtc::AsyncPacketSocket* socket,
// the message type. The TurnPort will just ignore the message since it will // the message type. The TurnPort will just ignore the message since it will
// not find any request by transaction ID. // not find any request by transaction ID.
for (auto* port : relay_ports_) { for (auto* port : relay_ports_) {
if (port->CanHandleIncomingPacketsFrom(remote_addr)) { if (port->CanHandleIncomingPacketsFrom(packet.source_address())) {
if (port->HandleIncomingPacket(socket, data, size, remote_addr, if (port->HandleIncomingPacket(socket, packet)) {
packet_time_us)) {
return; return;
} }
turn_port_found = true; turn_port_found = true;
@ -1698,10 +1697,9 @@ void AllocationSequence::OnReadPacket(rtc::AsyncPacketSocket* socket,
// Pass the packet to the UdpPort if there is no matching TurnPort, or if // Pass the packet to the UdpPort if there is no matching TurnPort, or if
// the TURN server is also a STUN server. // the TURN server is also a STUN server.
if (!turn_port_found || if (!turn_port_found ||
stun_servers.find(remote_addr) != stun_servers.end()) { stun_servers.find(packet.source_address()) != stun_servers.end()) {
RTC_DCHECK(udp_port_->SharedSocket()); RTC_DCHECK(udp_port_->SharedSocket());
udp_port_->HandleIncomingPacket(socket, data, size, remote_addr, udp_port_->HandleIncomingPacket(socket, packet);
packet_time_us);
} }
} }
} }

View File

@ -25,6 +25,7 @@
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/memory/always_valid_pointer.h" #include "rtc_base/memory/always_valid_pointer.h"
#include "rtc_base/network.h" #include "rtc_base/network.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/system/rtc_export.h" #include "rtc_base/system/rtc_export.h"
#include "rtc_base/thread.h" #include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -330,7 +331,7 @@ class TurnPort;
// Performs the allocation of ports, in a sequenced (timed) manner, for a given // Performs the allocation of ports, in a sequenced (timed) manner, for a given
// network and IP address. // network and IP address.
// This class is thread-compatible. // This class is thread-compatible.
class AllocationSequence : public sigslot::has_slots<> { class AllocationSequence {
public: public:
enum State { enum State {
kInit, // Initial state. kInit, // Initial state.
@ -386,10 +387,7 @@ class AllocationSequence : public sigslot::has_slots<> {
void CreateRelayPorts(); void CreateRelayPorts();
void OnReadPacket(rtc::AsyncPacketSocket* socket, void OnReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const rtc::ReceivedPacket& packet);
size_t size,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us);
void OnPortDestroyed(PortInterface* port); void OnPortDestroyed(PortInterface* port);