Add offset to timestamp from socket

This is to ensure Epoch is the same if transport switch to TCP or another transport.
First packet received will always be timestamped with rtc::TimeMicros.
Other packet timstamps will use the kernel timestamp as an offset from the first packet timestamp.
For BWE, it is important that there is not a large time base diff if transport change.

This change is protected by the experiment WebRTC-SCM-Timestamp.

Bug: webrtc:14066
Change-Id: Iaeb49831e7019e21601bc90895ac56003a54e206
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/281000
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38587}
This commit is contained in:
Per Kjellander 2022-11-08 18:23:43 +01:00 committed by WebRTC LUCI CQ
parent 7dbd77c8b9
commit 3daf696188
5 changed files with 83 additions and 14 deletions

View File

@ -19,10 +19,15 @@
#include "rtc_base/network/sent_packet.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/field_trial.h"
namespace rtc {
static const int BUF_SIZE = 64 * 1024;
// Returns true if the the client is in the experiment to get timestamps
// from the socket implementation.
static bool IsScmTimeStampExperimentEnabled() {
return webrtc::field_trial::IsEnabled("WebRTC-SCM-Timestamp");
}
AsyncUDPSocket* AsyncUDPSocket::Create(Socket* socket,
const SocketAddress& bind_address) {
@ -43,18 +48,12 @@ AsyncUDPSocket* AsyncUDPSocket::Create(SocketFactory* factory,
}
AsyncUDPSocket::AsyncUDPSocket(Socket* socket) : socket_(socket) {
size_ = BUF_SIZE;
buf_ = new char[size_];
sequence_checker_.Detach();
// The socket should start out readable but not writable.
socket_->SignalReadEvent.connect(this, &AsyncUDPSocket::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncUDPSocket::OnWriteEvent);
}
AsyncUDPSocket::~AsyncUDPSocket() {
delete[] buf_;
}
SocketAddress AsyncUDPSocket::GetLocalAddress() const {
return socket_->GetLocalAddress();
}
@ -112,10 +111,12 @@ void AsyncUDPSocket::SetError(int error) {
void AsyncUDPSocket::OnReadEvent(Socket* socket) {
RTC_DCHECK(socket_.get() == socket);
RTC_DCHECK_RUN_ON(&sequence_checker_);
SocketAddress remote_addr;
int64_t timestamp;
int len = socket_->RecvFrom(buf_, size_, &remote_addr, &timestamp);
int64_t timestamp = -1;
int len = socket_->RecvFrom(buf_, BUF_SIZE, &remote_addr, &timestamp);
if (len < 0) {
// An error here typically means we got an ICMP error in response to our
// send datagram, indicating the remote address was unreachable.
@ -126,11 +127,21 @@ void AsyncUDPSocket::OnReadEvent(Socket* socket) {
<< "] receive failed with error " << socket_->GetError();
return;
}
if (timestamp == -1) {
// Timestamp from socket is not available.
timestamp = TimeMicros();
} else {
if (!socket_time_offset_) {
socket_time_offset_ =
IsScmTimeStampExperimentEnabled() ? TimeMicros() - timestamp : 0;
}
timestamp += *socket_time_offset_;
}
// TODO: Make sure that we got all of the packet.
// If we did not, then we should resize our buffer to be large enough.
SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr,
(timestamp > -1 ? timestamp : TimeMicros()));
timestamp);
}
void AsyncUDPSocket::OnWriteEvent(Socket* socket) {

View File

@ -13,12 +13,16 @@
#include <stddef.h>
#include <cstdint>
#include <memory>
#include "absl/types/optional.h"
#include "api/sequence_checker.h"
#include "rtc_base/async_packet_socket.h"
#include "rtc_base/socket.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/socket_factory.h"
#include "rtc_base/thread_annotations.h"
namespace rtc {
@ -36,7 +40,7 @@ class AsyncUDPSocket : public AsyncPacketSocket {
static AsyncUDPSocket* Create(SocketFactory* factory,
const SocketAddress& bind_address);
explicit AsyncUDPSocket(Socket* socket);
~AsyncUDPSocket() override;
~AsyncUDPSocket() = default;
SocketAddress GetLocalAddress() const override;
SocketAddress GetRemoteAddress() const override;
@ -61,9 +65,11 @@ class AsyncUDPSocket : public AsyncPacketSocket {
// Called when the underlying socket is ready to send.
void OnWriteEvent(Socket* socket);
RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_;
std::unique_ptr<Socket> socket_;
char* buf_;
size_t size_;
static constexpr int BUF_SIZE = 64 * 1024;
char buf_[BUF_SIZE] RTC_GUARDED_BY(sequence_checker_);
absl::optional<int64_t> socket_time_offset_ RTC_GUARDED_BY(sequence_checker_);
};
} // namespace rtc

View File

@ -484,6 +484,7 @@ TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv6ScmExperiment) {
webrtc::test::ScopedFieldTrials trial("WebRTC-SCM-Timestamp/Enabled/");
SocketTest::TestSocketRecvTimestampIPv6();
}
// Verify that if the socket was unable to be bound to a real network interface
// (not loopback), Bind will return an error.
TEST_F(PhysicalSocketTest,
@ -523,4 +524,15 @@ TEST_F(PhysicalSocketTest,
#endif
TEST_F(PhysicalSocketTest, UdpSocketRecvTimestampUseRtcEpochIPv4ScmExperiment) {
MAYBE_SKIP_IPV4;
webrtc::test::ScopedFieldTrials trial("WebRTC-SCM-Timestamp/Enabled/");
SocketTest::TestUdpSocketRecvTimestampUseRtcEpochIPv4();
}
TEST_F(PhysicalSocketTest, UdpSocketRecvTimestampUseRtcEpochIPv6ScmExperiment) {
webrtc::test::ScopedFieldTrials trial("WebRTC-SCM-Timestamp/Enabled/");
SocketTest::TestUdpSocketRecvTimestampUseRtcEpochIPv6();
}
} // namespace rtc

View File

@ -14,6 +14,8 @@
#include <stdint.h>
#include <string.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include "absl/memory/memory.h"
@ -221,6 +223,15 @@ void SocketTest::TestSocketRecvTimestampIPv6() {
SocketRecvTimestamp(kIPv6Loopback);
}
void SocketTest::TestUdpSocketRecvTimestampUseRtcEpochIPv4() {
UdpSocketRecvTimestampUseRtcEpoch(kIPv4Loopback);
}
void SocketTest::TestUdpSocketRecvTimestampUseRtcEpochIPv6() {
MAYBE_SKIP_IPV6;
UdpSocketRecvTimestampUseRtcEpoch(kIPv6Loopback);
}
// For unbound sockets, GetLocalAddress / GetRemoteAddress return AF_UNSPEC
// values on Windows, but an empty address of the same family on Linux/MacOS X.
bool IsUnspecOrEmptyIP(const IPAddress& address) {
@ -1103,4 +1114,30 @@ void SocketTest::SocketRecvTimestamp(const IPAddress& loopback) {
EXPECT_NEAR(system_time_diff, recv_timestamp_diff, 10000);
}
void SocketTest::UdpSocketRecvTimestampUseRtcEpoch(const IPAddress& loopback) {
SocketAddress empty = EmptySocketAddressWithFamily(loopback.family());
std::unique_ptr<Socket> socket(
socket_factory_->CreateSocket(loopback.family(), SOCK_DGRAM));
ASSERT_EQ(socket->Bind(SocketAddress(loopback, 0)), 0);
SocketAddress address = socket->GetLocalAddress();
socket = nullptr;
auto client1 = std::make_unique<TestClient>(
absl::WrapUnique(AsyncUDPSocket::Create(socket_factory_, address)));
auto client2 = std::make_unique<TestClient>(
absl::WrapUnique(AsyncUDPSocket::Create(socket_factory_, empty)));
SocketAddress addr2;
client2->SendTo("foo", 3, address);
std::unique_ptr<TestClient::Packet> packet_1 = client1->NextPacket(10000);
ASSERT_TRUE(packet_1 != nullptr);
EXPECT_NEAR(packet_1->packet_time_us, rtc::TimeMicros(), 1000'000);
Thread::SleepMs(100);
client2->SendTo("bar", 3, address);
std::unique_ptr<TestClient::Packet> packet_2 = client1->NextPacket(10000);
ASSERT_TRUE(packet_2 != nullptr);
EXPECT_GT(packet_2->packet_time_us, packet_1->packet_time_us);
EXPECT_NEAR(packet_2->packet_time_us, rtc::TimeMicros(), 1000'000);
}
} // namespace rtc

View File

@ -62,6 +62,8 @@ class SocketTest : public ::testing::Test {
void TestGetSetOptionsIPv6();
void TestSocketRecvTimestampIPv4();
void TestSocketRecvTimestampIPv6();
void TestUdpSocketRecvTimestampUseRtcEpochIPv4();
void TestUdpSocketRecvTimestampUseRtcEpochIPv6();
static const int kTimeout = 5000; // ms
const IPAddress kIPv4Loopback;
@ -92,6 +94,7 @@ class SocketTest : public ::testing::Test {
void UdpReadyToSend(const IPAddress& loopback);
void GetSetOptionsInternal(const IPAddress& loopback);
void SocketRecvTimestamp(const IPAddress& loopback);
void UdpSocketRecvTimestampUseRtcEpoch(const IPAddress& loopback);
SocketFactory* socket_factory_;
};