Add experiment to use ::recvmsg to receive packets on posix systems
Using ::recvmsg ensure packet timestamp can then be read directly when reading the buffer instead of a separate system call and should also work on Ios/Mac. The same experiment field trial flag will be "WebRTC-SCM-Timestamp/enabled/" and is also planned to be used for fixing webrtc:14066 Bug: webrtc:5773, webrtc:14066 Change-Id: I8a3749e87c686aa18fcee947472c1b602a0f63c8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/279280 Reviewed-by: Evan Shrubsole <eshr@webrtc.org> Commit-Queue: Per Kjellander <perkj@webrtc.org> Reviewed-by: Jonas Oreland <jonaso@webrtc.org> Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org> Cr-Commit-Position: refs/heads/main@{#38585}
This commit is contained in:
parent
119fb1910a
commit
fdcfefa708
@ -961,6 +961,7 @@ rtc_library("threading") {
|
||||
"../api/task_queue",
|
||||
"../api/task_queue:pending_task_safety_flag",
|
||||
"../api/units:time_delta",
|
||||
"../system_wrappers:field_trial",
|
||||
"synchronization:mutex",
|
||||
"system:no_unique_address",
|
||||
"system:rtc_export",
|
||||
@ -1498,6 +1499,7 @@ if (rtc_include_tests) {
|
||||
":timeutils",
|
||||
"../api/units:time_delta",
|
||||
"../system_wrappers",
|
||||
"../test:field_trial",
|
||||
"../test:fileutils",
|
||||
"../test:test_main",
|
||||
"../test:test_support",
|
||||
|
||||
@ -52,6 +52,7 @@
|
||||
#include "rtc_base/null_socket_server.h"
|
||||
#include "rtc_base/synchronization/mutex.h"
|
||||
#include "rtc_base/time_utils.h"
|
||||
#include "system_wrappers/include/field_trial.h"
|
||||
|
||||
#if defined(WEBRTC_LINUX)
|
||||
#include <linux/sockios.h>
|
||||
@ -118,6 +119,12 @@ class ScopedSetTrue {
|
||||
private:
|
||||
bool* value_;
|
||||
};
|
||||
|
||||
// Returns true if the the client is in the experiment to get timestamps
|
||||
// from the socket implementation.
|
||||
bool IsScmTimeStampExperimentEnabled() {
|
||||
return webrtc::field_trial::IsEnabled("WebRTC-SCM-Timestamp");
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace rtc {
|
||||
@ -127,7 +134,8 @@ PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
|
||||
s_(s),
|
||||
error_(0),
|
||||
state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
|
||||
resolver_(nullptr) {
|
||||
resolver_(nullptr),
|
||||
read_scm_timestamp_experiment_(IsScmTimeStampExperimentEnabled()) {
|
||||
if (s_ != INVALID_SOCKET) {
|
||||
SetEnabledEvents(DE_READ | DE_WRITE);
|
||||
|
||||
@ -395,7 +403,7 @@ int PhysicalSocket::SendTo(const void* buffer,
|
||||
|
||||
int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
|
||||
int received =
|
||||
::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
|
||||
DoReadFromSocket(buffer, length, /*out_addr*/ nullptr, timestamp);
|
||||
if ((received == 0) && (length != 0)) {
|
||||
// Note: on graceful shutdown, recv can return 0. In this case, we
|
||||
// pretend it is blocking, and then signal close, so that simplifying
|
||||
@ -407,9 +415,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
|
||||
SetError(EWOULDBLOCK);
|
||||
return SOCKET_ERROR;
|
||||
}
|
||||
if (timestamp) {
|
||||
*timestamp = GetSocketRecvTimestamp(s_);
|
||||
}
|
||||
|
||||
UpdateLastError();
|
||||
int error = GetError();
|
||||
bool success = (received >= 0) || IsBlockingError(error);
|
||||
@ -426,17 +432,8 @@ int PhysicalSocket::RecvFrom(void* buffer,
|
||||
size_t length,
|
||||
SocketAddress* out_addr,
|
||||
int64_t* timestamp) {
|
||||
sockaddr_storage addr_storage;
|
||||
socklen_t addr_len = sizeof(addr_storage);
|
||||
sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
|
||||
int received = ::recvfrom(s_, static_cast<char*>(buffer),
|
||||
static_cast<int>(length), 0, addr, &addr_len);
|
||||
if (timestamp) {
|
||||
*timestamp = GetSocketRecvTimestamp(s_);
|
||||
}
|
||||
int received = DoReadFromSocket(buffer, length, out_addr, timestamp);
|
||||
UpdateLastError();
|
||||
if ((received >= 0) && (out_addr != nullptr))
|
||||
SocketAddressFromSockAddrStorage(addr_storage, out_addr);
|
||||
int error = GetError();
|
||||
bool success = (received >= 0) || IsBlockingError(error);
|
||||
if (udp_ || success) {
|
||||
@ -448,6 +445,84 @@ int PhysicalSocket::RecvFrom(void* buffer,
|
||||
return received;
|
||||
}
|
||||
|
||||
int PhysicalSocket::DoReadFromSocket(void* buffer,
|
||||
size_t length,
|
||||
SocketAddress* out_addr,
|
||||
int64_t* timestamp) {
|
||||
sockaddr_storage addr_storage;
|
||||
socklen_t addr_len = sizeof(addr_storage);
|
||||
sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
|
||||
|
||||
#if defined(WEBRTC_POSIX)
|
||||
int received = 0;
|
||||
if (read_scm_timestamp_experiment_) {
|
||||
iovec iov = {.iov_base = buffer, .iov_len = length};
|
||||
msghdr msg = {.msg_iov = &iov, .msg_iovlen = 1};
|
||||
if (out_addr) {
|
||||
out_addr->Clear();
|
||||
msg.msg_name = addr;
|
||||
msg.msg_namelen = addr_len;
|
||||
}
|
||||
char control[CMSG_SPACE(sizeof(struct timeval))] = {};
|
||||
if (timestamp) {
|
||||
*timestamp = -1;
|
||||
msg.msg_control = &control;
|
||||
msg.msg_controllen = sizeof(control);
|
||||
}
|
||||
received = ::recvmsg(s_, &msg, 0);
|
||||
if (received <= 0) {
|
||||
// An error occured or shut down.
|
||||
return received;
|
||||
}
|
||||
if (timestamp) {
|
||||
struct cmsghdr* cmsg;
|
||||
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
|
||||
if (cmsg->cmsg_level != SOL_SOCKET)
|
||||
continue;
|
||||
if (cmsg->cmsg_type == SCM_TIMESTAMP) {
|
||||
timeval* ts = reinterpret_cast<timeval*>(CMSG_DATA(cmsg));
|
||||
*timestamp =
|
||||
rtc::kNumMicrosecsPerSec * static_cast<int64_t>(ts->tv_sec) +
|
||||
static_cast<int64_t>(ts->tv_usec);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (out_addr) {
|
||||
SocketAddressFromSockAddrStorage(addr_storage, out_addr);
|
||||
}
|
||||
} else { // !read_scm_timestamp_experiment_
|
||||
if (out_addr) {
|
||||
received = ::recvfrom(s_, static_cast<char*>(buffer),
|
||||
static_cast<int>(length), 0, addr, &addr_len);
|
||||
SocketAddressFromSockAddrStorage(addr_storage, out_addr);
|
||||
} else {
|
||||
received =
|
||||
::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
|
||||
}
|
||||
if (timestamp) {
|
||||
*timestamp = GetSocketRecvTimestamp(s_);
|
||||
}
|
||||
}
|
||||
return received;
|
||||
|
||||
#else
|
||||
int received = 0;
|
||||
if (out_addr) {
|
||||
received = ::recvfrom(s_, static_cast<char*>(buffer),
|
||||
static_cast<int>(length), 0, addr, &addr_len);
|
||||
SocketAddressFromSockAddrStorage(addr_storage, out_addr);
|
||||
} else {
|
||||
received =
|
||||
::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
|
||||
}
|
||||
if (timestamp) {
|
||||
*timestamp = -1;
|
||||
}
|
||||
return received;
|
||||
#endif
|
||||
}
|
||||
|
||||
int PhysicalSocket::Listen(int backlog) {
|
||||
int err = ::listen(s_, backlog);
|
||||
UpdateLastError();
|
||||
@ -643,7 +718,16 @@ bool SocketDispatcher::Initialize() {
|
||||
ioctlsocket(s_, FIONBIO, &argp);
|
||||
#elif defined(WEBRTC_POSIX)
|
||||
fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
|
||||
if (IsScmTimeStampExperimentEnabled()) {
|
||||
int value = 1;
|
||||
// Attempt to get receive packet timestamp from the socket.
|
||||
if (::setsockopt(s_, SOL_SOCKET, SO_TIMESTAMP, &value, sizeof(value)) !=
|
||||
0) {
|
||||
RTC_DLOG(LS_ERROR) << "::setsockopt failed. errno: " << LAST_SYSTEM_ERROR;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#if defined(WEBRTC_IOS)
|
||||
// iOS may kill sockets when the app is moved to the background
|
||||
// (specifically, if the app doesn't use the "voip" UIBackgroundMode). When
|
||||
@ -651,7 +735,9 @@ bool SocketDispatcher::Initialize() {
|
||||
// default will terminate the process, which we don't want. By specifying
|
||||
// this socket option, SIGPIPE will be disabled for the socket.
|
||||
int value = 1;
|
||||
::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
|
||||
if (::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value)) != 0) {
|
||||
RTC_DLOG(LS_ERROR) << "::setsockopt failed. errno: " << LAST_SYSTEM_ERROR;
|
||||
}
|
||||
#endif
|
||||
ss_->Add(this);
|
||||
return true;
|
||||
|
||||
@ -190,6 +190,11 @@ class PhysicalSocket : public Socket, public sigslot::has_slots<> {
|
||||
const struct sockaddr* dest_addr,
|
||||
socklen_t addrlen);
|
||||
|
||||
int DoReadFromSocket(void* buffer,
|
||||
size_t length,
|
||||
SocketAddress* out_addr,
|
||||
int64_t* timestamp);
|
||||
|
||||
void OnResolveResult(AsyncResolverInterface* resolver);
|
||||
|
||||
void UpdateLastError();
|
||||
@ -216,6 +221,7 @@ class PhysicalSocket : public Socket, public sigslot::has_slots<> {
|
||||
#endif
|
||||
|
||||
private:
|
||||
const bool read_scm_timestamp_experiment_;
|
||||
uint8_t enabled_events_ = 0;
|
||||
};
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "rtc_base/socket_unittest.h"
|
||||
#include "rtc_base/test_utils.h"
|
||||
#include "rtc_base/thread.h"
|
||||
#include "test/field_trial.h"
|
||||
#include "test/gtest.h"
|
||||
|
||||
namespace rtc {
|
||||
@ -460,8 +461,9 @@ TEST_F(PhysicalSocketTest, TestGetSetOptionsIPv6) {
|
||||
|
||||
#if defined(WEBRTC_POSIX)
|
||||
|
||||
// We don't get recv timestamps on Mac.
|
||||
#if !defined(WEBRTC_MAC)
|
||||
// We don't get recv timestamps on Mac without the experiment
|
||||
// WebRTC-SCM-Timestamp
|
||||
TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv4) {
|
||||
MAYBE_SKIP_IPV4;
|
||||
SocketTest::TestSocketRecvTimestampIPv4();
|
||||
@ -472,6 +474,16 @@ TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv6) {
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv4ScmExperiment) {
|
||||
MAYBE_SKIP_IPV4;
|
||||
webrtc::test::ScopedFieldTrials trial("WebRTC-SCM-Timestamp/Enabled/");
|
||||
SocketTest::TestSocketRecvTimestampIPv4();
|
||||
}
|
||||
|
||||
TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv6ScmExperiment) {
|
||||
webrtc::test::ScopedFieldTrials trial("WebRTC-SCM-Timestamp/Enabled/");
|
||||
SocketTest::TestSocketRecvTimestampIPv6();
|
||||
}
|
||||
// Verify that if the socket was unable to be bound to a real network interface
|
||||
// (not loopback), Bind will return an error.
|
||||
TEST_F(PhysicalSocketTest,
|
||||
|
||||
@ -1070,25 +1070,31 @@ void SocketTest::GetSetOptionsInternal(const IPAddress& loopback) {
|
||||
}
|
||||
|
||||
void SocketTest::SocketRecvTimestamp(const IPAddress& loopback) {
|
||||
StreamSink sink;
|
||||
std::unique_ptr<Socket> socket(
|
||||
socket_factory_->CreateSocket(loopback.family(), SOCK_DGRAM));
|
||||
EXPECT_EQ(0, socket->Bind(SocketAddress(loopback, 0)));
|
||||
SocketAddress address = socket->GetLocalAddress();
|
||||
sink.Monitor(socket.get());
|
||||
|
||||
int64_t send_time_1 = TimeMicros();
|
||||
socket->SendTo("foo", 3, address);
|
||||
|
||||
int64_t recv_timestamp_1;
|
||||
// Wait until data is available.
|
||||
EXPECT_TRUE_WAIT(sink.Check(socket.get(), SSE_READ), kTimeout);
|
||||
char buffer[3];
|
||||
socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_1);
|
||||
EXPECT_GT(recv_timestamp_1, -1);
|
||||
ASSERT_GT(socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_1), 0);
|
||||
|
||||
const int64_t kTimeBetweenPacketsMs = 100;
|
||||
Thread::SleepMs(kTimeBetweenPacketsMs);
|
||||
|
||||
int64_t send_time_2 = TimeMicros();
|
||||
socket->SendTo("bar", 3, address);
|
||||
// Wait until data is available.
|
||||
EXPECT_TRUE_WAIT(sink.Check(socket.get(), SSE_READ), kTimeout);
|
||||
int64_t recv_timestamp_2;
|
||||
socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_2);
|
||||
ASSERT_GT(socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_2), 0);
|
||||
|
||||
int64_t system_time_diff = send_time_2 - send_time_1;
|
||||
int64_t recv_timestamp_diff = recv_timestamp_2 - recv_timestamp_1;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user