From f81af2f8fd3ec76532b3b14ecf5fad55aa379350 Mon Sep 17 00:00:00 2001 From: Per K Date: Fri, 22 Dec 2023 13:24:40 +0100 Subject: [PATCH] Introduce Socket::ReceiveBuffer and RecvFrom(ReceiveBuffer& buffer) Intention is to gradually stop using raw pointers and make it easier to introduce new meta data types. A default implementation is added that use existing int RecvFrom(void* pv,..) In this cl, async_udp_socket.cc use the new method. There should be no behaviour change. Bug: webrtc:15368 Change-Id: I8f9773a65d24ab5bbac3534dcc37ee1ed874a2c7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/332200 Reviewed-by: Harald Alvestrand Commit-Queue: Per Kjellander Reviewed-by: Jonas Oreland Cr-Commit-Position: refs/heads/main@{#41497} --- rtc_base/BUILD.gn | 5 ++++ rtc_base/async_udp_socket.cc | 46 ++++++++++++++++++++++-------------- rtc_base/async_udp_socket.h | 7 +++--- rtc_base/socket.cc | 22 ++++++++++++++++- rtc_base/socket.h | 15 ++++++++++++ 5 files changed, 73 insertions(+), 22 deletions(-) diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index e5eaa278ea..7acff4dad1 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -1105,13 +1105,16 @@ rtc_library("socket") { "socket.h", ] deps = [ + ":buffer", ":macromagic", ":socket_address", + "../api/units:timestamp", "third_party/sigslot", ] if (is_win) { deps += [ ":win32" ] } + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_source_set("network_constants") { @@ -1342,7 +1345,9 @@ rtc_library("async_udp_socket") { ":socket_factory", ":timeutils", "../api:sequence_checker", + "../api/units:time_delta", "../system_wrappers:field_trial", + "network:received_packet", "network:sent_packet", "system:no_unique_address", ] diff --git a/rtc_base/async_udp_socket.cc b/rtc_base/async_udp_socket.cc index 358420a5de..3d258bcb26 100644 --- a/rtc_base/async_udp_socket.cc +++ b/rtc_base/async_udp_socket.cc @@ -10,9 +10,11 @@ #include "rtc_base/async_udp_socket.h" - +#include "absl/types/optional.h" +#include "api/units/time_delta.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/network/received_packet.h" #include "rtc_base/network/sent_packet.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/field_trial.h" @@ -109,10 +111,8 @@ void AsyncUDPSocket::OnReadEvent(Socket* socket) { RTC_DCHECK(socket_.get() == socket); RTC_DCHECK_RUN_ON(&sequence_checker_); - SocketAddress remote_addr; - int64_t timestamp = -1; - int len = socket_->RecvFrom(buf_, BUF_SIZE, &remote_addr, ×tamp); - + Socket::ReceiveBuffer receive_buffer(buffer_); + int len = socket_->RecvFrom(receive_buffer); if (len < 0) { // An error here typically means we got an ICMP error in response to our // send datagram, indicating the remote address was unreachable. @@ -123,21 +123,31 @@ void AsyncUDPSocket::OnReadEvent(Socket* socket) { << "] receive failed with error " << socket_->GetError(); return; } - if (timestamp == -1) { - // Timestamp from socket is not available. - timestamp = TimeMicros(); - } else { - if (!socket_time_offset_) { - socket_time_offset_ = - !IsScmTimeStampExperimentDisabled() ? TimeMicros() - timestamp : 0; - } - timestamp += *socket_time_offset_; + if (len == 0) { + // Spurios wakeup. + return; } - // TODO: Make sure that we got all of the packet. - // If we did not, then we should resize our buffer to be large enough. - NotifyPacketReceived( - rtc::ReceivedPacket::CreateFromLegacy(buf_, len, timestamp, remote_addr)); + if (!receive_buffer.arrival_time) { + // Timestamp from socket is not available. + receive_buffer.arrival_time = webrtc::Timestamp::Micros(rtc::TimeMicros()); + } else { + if (!socket_time_offset_) { + // Estimate timestamp offset from first packet arrival time unless + // disabled + bool estimate_time_offset = !IsScmTimeStampExperimentDisabled(); + if (estimate_time_offset) { + socket_time_offset_ = webrtc::Timestamp::Micros(rtc::TimeMicros()) - + *receive_buffer.arrival_time; + } else { + socket_time_offset_ = webrtc::TimeDelta::Micros(0); + } + } + *receive_buffer.arrival_time += *socket_time_offset_; + } + NotifyPacketReceived(ReceivedPacket(receive_buffer.payload, + receive_buffer.source_address, + receive_buffer.arrival_time)); } void AsyncUDPSocket::OnWriteEvent(Socket* socket) { diff --git a/rtc_base/async_udp_socket.h b/rtc_base/async_udp_socket.h index 4198b25c4d..af361b98ea 100644 --- a/rtc_base/async_udp_socket.h +++ b/rtc_base/async_udp_socket.h @@ -18,6 +18,7 @@ #include "absl/types/optional.h" #include "api/sequence_checker.h" +#include "api/units/time_delta.h" #include "rtc_base/async_packet_socket.h" #include "rtc_base/socket.h" #include "rtc_base/socket_address.h" @@ -68,9 +69,9 @@ class AsyncUDPSocket : public AsyncPacketSocket { RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_; std::unique_ptr socket_; - static constexpr int BUF_SIZE = 64 * 1024; - char buf_[BUF_SIZE] RTC_GUARDED_BY(sequence_checker_); - absl::optional socket_time_offset_ RTC_GUARDED_BY(sequence_checker_); + rtc::Buffer buffer_ RTC_GUARDED_BY(sequence_checker_); + absl::optional socket_time_offset_ + RTC_GUARDED_BY(sequence_checker_); }; } // namespace rtc diff --git a/rtc_base/socket.cc b/rtc_base/socket.cc index bcd62ad2a4..0908c2991f 100644 --- a/rtc_base/socket.cc +++ b/rtc_base/socket.cc @@ -10,4 +10,24 @@ #include "rtc_base/socket.h" -namespace rtc {} // namespace rtc +#include + +#include "rtc_base/buffer.h" + +namespace rtc { + +int Socket::RecvFrom(ReceiveBuffer& buffer) { + static constexpr int BUF_SIZE = 64 * 1024; + int64_t timestamp = -1; + buffer.payload.EnsureCapacity(BUF_SIZE); + int len = RecvFrom(buffer.payload.data(), buffer.payload.capacity(), + &buffer.source_address, ×tamp); + buffer.payload.SetSize(len > 0 ? len : 0); + if (len > 0 && timestamp != -1) { + buffer.arrival_time = webrtc::Timestamp::Micros(timestamp); + } + + return len; +} + +} // namespace rtc diff --git a/rtc_base/socket.h b/rtc_base/socket.h index 0ed3a7fa6a..0fd0613e81 100644 --- a/rtc_base/socket.h +++ b/rtc_base/socket.h @@ -13,6 +13,8 @@ #include +#include "absl/types/optional.h" + #if defined(WEBRTC_POSIX) #include #include @@ -25,6 +27,8 @@ #include "rtc_base/win32.h" #endif +#include "api/units/timestamp.h" +#include "rtc_base/buffer.h" #include "rtc_base/socket_address.h" #include "rtc_base/third_party/sigslot/sigslot.h" @@ -80,6 +84,13 @@ inline bool IsBlockingError(int e) { // methods match those of normal UNIX sockets very closely. class Socket { public: + struct ReceiveBuffer { + ReceiveBuffer(rtc::Buffer& payload) : payload(payload) {} + + absl::optional arrival_time; + SocketAddress source_address; + rtc::Buffer& payload; + }; virtual ~Socket() {} Socket(const Socket&) = delete; @@ -103,6 +114,10 @@ class Socket { size_t cb, SocketAddress* paddr, int64_t* timestamp) = 0; + // Intended to replace RecvFrom(void* ...). + // Default implementation calls RecvFrom(void* ...) with 64Kbyte buffer. + // Returns number of bytes received or a negative value on error. + virtual int RecvFrom(ReceiveBuffer& buffer); virtual int Listen(int backlog) = 0; virtual Socket* Accept(SocketAddress* paddr) = 0; virtual int Close() = 0;