diff --git a/webrtc/base/asyncsocket.cc b/webrtc/base/asyncsocket.cc index db451c6382..089802e018 100644 --- a/webrtc/base/asyncsocket.cc +++ b/webrtc/base/asyncsocket.cc @@ -64,12 +64,15 @@ int AsyncSocketAdapter::SendTo(const void* pv, return socket_->SendTo(pv, cb, addr); } -int AsyncSocketAdapter::Recv(void* pv, size_t cb) { - return socket_->Recv(pv, cb); +int AsyncSocketAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) { + return socket_->Recv(pv, cb, timestamp); } -int AsyncSocketAdapter::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) { - return socket_->RecvFrom(pv, cb, paddr); +int AsyncSocketAdapter::RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) { + return socket_->RecvFrom(pv, cb, paddr, timestamp); } int AsyncSocketAdapter::Listen(int backlog) { diff --git a/webrtc/base/asyncsocket.h b/webrtc/base/asyncsocket.h index 7a859be962..5dbffb966c 100644 --- a/webrtc/base/asyncsocket.h +++ b/webrtc/base/asyncsocket.h @@ -56,8 +56,11 @@ class AsyncSocketAdapter : public AsyncSocket, public sigslot::has_slots<> { int Connect(const SocketAddress& addr) override; int Send(const void* pv, size_t cb) override; int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override; - int Recv(void* pv, size_t cb) override; - int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override; + int Recv(void* pv, size_t cb, int64_t* timestamp) override; + int RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) override; int Listen(int backlog) override; AsyncSocket* Accept(SocketAddress* paddr) override; int Close() override; diff --git a/webrtc/base/asynctcpsocket.cc b/webrtc/base/asynctcpsocket.cc index 9ba46d7abc..60ac7b3159 100644 --- a/webrtc/base/asynctcpsocket.cc +++ b/webrtc/base/asynctcpsocket.cc @@ -208,7 +208,8 @@ void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) { free_size = inbuf_.capacity() - inbuf_.size(); } - int len = socket_->Recv(inbuf_.data() + inbuf_.size(), free_size); + int len = + socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr); if (len < 0) { // TODO(stefan): Do something better like forwarding the error to the // user. diff --git a/webrtc/base/asyncudpsocket.cc b/webrtc/base/asyncudpsocket.cc index fc7d88712c..232d264ab1 100644 --- a/webrtc/base/asyncudpsocket.cc +++ b/webrtc/base/asyncudpsocket.cc @@ -102,7 +102,8 @@ void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket) { ASSERT(socket_.get() == socket); SocketAddress remote_addr; - int len = socket_->RecvFrom(buf_, size_, &remote_addr); + int64_t timestamp; + int len = socket_->RecvFrom(buf_, size_, &remote_addr, ×tamp); if (len < 0) { // An error here typically means we got an ICMP error in response to our // send datagram, indicating the remote address was unreachable. @@ -116,8 +117,9 @@ void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket) { // TODO: Make sure that we got all of the packet. // If we did not, then we should resize our buffer to be large enough. - SignalReadPacket(this, buf_, static_cast(len), remote_addr, - CreatePacketTime(0)); + SignalReadPacket( + this, buf_, static_cast(len), remote_addr, + (timestamp > -1 ? PacketTime(timestamp, 0) : CreatePacketTime(0))); } void AsyncUDPSocket::OnWriteEvent(AsyncSocket* socket) { diff --git a/webrtc/base/autodetectproxy.cc b/webrtc/base/autodetectproxy.cc index 22950fb2b3..e6174ec96b 100644 --- a/webrtc/base/autodetectproxy.cc +++ b/webrtc/base/autodetectproxy.cc @@ -247,7 +247,7 @@ void AutoDetectProxy::OnConnectEvent(AsyncSocket * socket) { void AutoDetectProxy::OnReadEvent(AsyncSocket * socket) { char data[257]; - int len = socket_->Recv(data, 256); + int len = socket_->Recv(data, 256, nullptr); if (len > 0) { data[len] = 0; LOG(LS_VERBOSE) << "AutoDetectProxy read " << len << " bytes"; diff --git a/webrtc/base/firewallsocketserver.cc b/webrtc/base/firewallsocketserver.cc index 6339017e08..bf3ec42433 100644 --- a/webrtc/base/firewallsocketserver.cc +++ b/webrtc/base/firewallsocketserver.cc @@ -52,14 +52,17 @@ class FirewallSocket : public AsyncSocketAdapter { } return AsyncSocketAdapter::SendTo(pv, cb, addr); } - int Recv(void* pv, size_t cb) override { + int Recv(void* pv, size_t cb, int64_t* timestamp) override { SocketAddress addr; - return RecvFrom(pv, cb, &addr); + return RecvFrom(pv, cb, &addr, timestamp); } - int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override { + int RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) override { if (type_ == SOCK_DGRAM) { while (true) { - int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr); + int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr, timestamp); if (res <= 0) return res; if (server_->Check(FP_UDP, *paddr, GetLocalAddress())) @@ -69,7 +72,7 @@ class FirewallSocket : public AsyncSocketAdapter { << GetLocalAddress().ToSensitiveString() << " dropped"; } } - return AsyncSocketAdapter::RecvFrom(pv, cb, paddr); + return AsyncSocketAdapter::RecvFrom(pv, cb, paddr, timestamp); } int Listen(int backlog) override { diff --git a/webrtc/base/macasyncsocket.cc b/webrtc/base/macasyncsocket.cc index 8f811ea8b6..9f38c2937c 100644 --- a/webrtc/base/macasyncsocket.cc +++ b/webrtc/base/macasyncsocket.cc @@ -188,7 +188,10 @@ int MacAsyncSocket::SendTo(const void* buffer, size_t length, } // Read data received from the remote end we're connected to. -int MacAsyncSocket::Recv(void* buffer, size_t length) { +int MacAsyncSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { + if (timestamp) { + *timestamp = -1; + } int received = ::recv(native_socket_, reinterpret_cast(buffer), length, 0); if (received == SOCKET_ERROR) error_ = errno; @@ -199,8 +202,13 @@ int MacAsyncSocket::Recv(void* buffer, size_t length) { } // Read data received from any remote party -int MacAsyncSocket::RecvFrom(void* buffer, size_t length, - SocketAddress* out_addr) { +int MacAsyncSocket::RecvFrom(void* buffer, + size_t length, + SocketAddress* out_addr, + int64_t* timestamp) { + if (timestamp) { + *timestamp = -1; + } sockaddr_storage saddr; socklen_t addr_len = sizeof(saddr); int received = ::recvfrom(native_socket_, reinterpret_cast(buffer), diff --git a/webrtc/base/macasyncsocket.h b/webrtc/base/macasyncsocket.h index c0f57b948c..1996b351c8 100644 --- a/webrtc/base/macasyncsocket.h +++ b/webrtc/base/macasyncsocket.h @@ -42,8 +42,11 @@ class MacAsyncSocket : public AsyncSocket, public sigslot::has_slots<> { int SendTo(const void* buffer, size_t length, const SocketAddress& addr) override; - int Recv(void* buffer, size_t length) override; - int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override; + int Recv(void* buffer, size_t length, int64_t* timestamp) override; + int RecvFrom(void* buffer, + size_t length, + SocketAddress* out_addr, + int64_t* timestamp) override; int Listen(int backlog) override; MacAsyncSocket* Accept(SocketAddress* out_addr) override; int Close() override; diff --git a/webrtc/base/nat_unittest.cc b/webrtc/base/nat_unittest.cc index ca72c9356a..bd630b306e 100644 --- a/webrtc/base/nat_unittest.cc +++ b/webrtc/base/nat_unittest.cc @@ -195,7 +195,7 @@ bool TestConnectivity(const SocketAddress& src, const IPAddress& dst) { const size_t kRecvBufSize = 64; char recvbuf[kRecvBufSize]; Thread::Current()->SleepMs(100); - int received = server->RecvFrom(recvbuf, kRecvBufSize, &addr); + int received = server->RecvFrom(recvbuf, kRecvBufSize, &addr, nullptr); return received == sent && ::memcmp(buf, recvbuf, len) == 0; } diff --git a/webrtc/base/natsocketfactory.cc b/webrtc/base/natsocketfactory.cc index 985748cff9..a92498429c 100644 --- a/webrtc/base/natsocketfactory.cc +++ b/webrtc/base/natsocketfactory.cc @@ -155,14 +155,17 @@ class NATSocket : public AsyncSocket, public sigslot::has_slots<> { return result; } - int Recv(void* data, size_t size) override { + int Recv(void* data, size_t size, int64_t* timestamp) override { SocketAddress addr; - return RecvFrom(data, size, &addr); + return RecvFrom(data, size, &addr, timestamp); } - int RecvFrom(void* data, size_t size, SocketAddress* out_addr) override { + int RecvFrom(void* data, + size_t size, + SocketAddress* out_addr, + int64_t* timestamp) override { if (server_addr_.IsNil() || type_ == SOCK_STREAM) { - return socket_->RecvFrom(data, size, out_addr); + return socket_->RecvFrom(data, size, out_addr, timestamp); } // Make sure we have enough room to read the requested amount plus the // largest possible header address. @@ -170,7 +173,7 @@ class NATSocket : public AsyncSocket, public sigslot::has_slots<> { Grow(size + kNATEncodedIPv6AddressSize); // Read the packet from the socket. - int result = socket_->RecvFrom(buf_, size_, &remote_addr); + int result = socket_->RecvFrom(buf_, size_, &remote_addr, timestamp); if (result >= 0) { ASSERT(remote_addr == server_addr_); @@ -278,7 +281,7 @@ class NATSocket : public AsyncSocket, public sigslot::has_slots<> { // Handles the byte sent back from the server and fires the appropriate event. void HandleConnectReply() { char code; - socket_->Recv(&code, sizeof(code)); + socket_->Recv(&code, sizeof(code), nullptr); if (code == 0) { connected_ = true; SignalConnectEvent(this); diff --git a/webrtc/base/openssladapter.cc b/webrtc/base/openssladapter.cc index 40c352d643..d1cbaa811b 100644 --- a/webrtc/base/openssladapter.cc +++ b/webrtc/base/openssladapter.cc @@ -119,7 +119,7 @@ static int socket_read(BIO* b, char* out, int outl) { return -1; rtc::AsyncSocket* socket = static_cast(b->ptr); BIO_clear_retry_flags(b); - int result = socket->Recv(out, outl); + int result = socket->Recv(out, outl, nullptr); if (result > 0) { return result; } else if (result == 0) { @@ -524,13 +524,12 @@ OpenSSLAdapter::SendTo(const void* pv, size_t cb, const SocketAddress& addr) { return SOCKET_ERROR; } -int -OpenSSLAdapter::Recv(void* pv, size_t cb) { +int OpenSSLAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) { //LOG(LS_INFO) << "OpenSSLAdapter::Recv(" << cb << ")"; switch (state_) { case SSL_NONE: - return AsyncSocketAdapter::Recv(pv, cb); + return AsyncSocketAdapter::Recv(pv, cb, timestamp); case SSL_WAIT: case SSL_CONNECTING: @@ -579,10 +578,12 @@ OpenSSLAdapter::Recv(void* pv, size_t cb) { return SOCKET_ERROR; } -int -OpenSSLAdapter::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) { +int OpenSSLAdapter::RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) { if (socket_->GetState() == Socket::CS_CONNECTED) { - int ret = Recv(pv, cb); + int ret = Recv(pv, cb, timestamp); *paddr = GetRemoteAddress(); diff --git a/webrtc/base/openssladapter.h b/webrtc/base/openssladapter.h index cdf45e603f..554627f58f 100644 --- a/webrtc/base/openssladapter.h +++ b/webrtc/base/openssladapter.h @@ -37,8 +37,11 @@ public: int StartSSL(const char* hostname, bool restartable) override; int Send(const void* pv, size_t cb) override; int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override; - int Recv(void* pv, size_t cb) override; - int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override; + int Recv(void* pv, size_t cb, int64_t* timestamp) override; + int RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) override; int Close() override; // Note that the socket returns ST_CONNECTING while SSL is being negotiated. diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc index 0230077a52..f28b1f5686 100644 --- a/webrtc/base/physicalsocketserver.cc +++ b/webrtc/base/physicalsocketserver.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -55,8 +56,28 @@ #include // for TCP_NODELAY #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h typedef void* SockOptArg; + #endif // WEBRTC_POSIX +#if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) +int64_t GetSocketRecvTimestamp(int socket) { + struct timeval tv_ioctl; + int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl); + if (ret != 0) + return -1; + int64_t timestamp = + rtc::kNumMicrosecsPerSec * static_cast(tv_ioctl.tv_sec) + + static_cast(tv_ioctl.tv_usec); + return timestamp; +} + +#else + +int64_t GetSocketRecvTimestamp(int socket) { + return -1; +} +#endif + #if defined(WEBRTC_WIN) typedef char* SockOptArg; #endif @@ -324,7 +345,7 @@ int PhysicalSocket::SendTo(const void* buffer, return sent; } -int PhysicalSocket::Recv(void* buffer, size_t length) { +int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { int received = ::recv(s_, static_cast(buffer), static_cast(length), 0); if ((received == 0) && (length != 0)) { @@ -338,6 +359,9 @@ int PhysicalSocket::Recv(void* buffer, size_t length) { SetError(EWOULDBLOCK); return SOCKET_ERROR; } + if (timestamp) { + *timestamp = GetSocketRecvTimestamp(s_); + } UpdateLastError(); int error = GetError(); bool success = (received >= 0) || IsBlockingError(error); @@ -352,12 +376,16 @@ int PhysicalSocket::Recv(void* buffer, size_t length) { int PhysicalSocket::RecvFrom(void* buffer, size_t length, - SocketAddress* out_addr) { + SocketAddress* out_addr, + int64_t* timestamp) { sockaddr_storage addr_storage; socklen_t addr_len = sizeof(addr_storage); sockaddr* addr = reinterpret_cast(&addr_storage); int received = ::recvfrom(s_, static_cast(buffer), static_cast(length), 0, addr, &addr_len); + if (timestamp) { + *timestamp = GetSocketRecvTimestamp(s_); + } UpdateLastError(); if ((received >= 0) && (out_addr != nullptr)) SocketAddressFromSockAddrStorage(addr_storage, out_addr); diff --git a/webrtc/base/physicalsocketserver.h b/webrtc/base/physicalsocketserver.h index f5867d25f9..3437eb8cb3 100644 --- a/webrtc/base/physicalsocketserver.h +++ b/webrtc/base/physicalsocketserver.h @@ -143,8 +143,11 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { size_t length, const SocketAddress& addr) override; - int Recv(void* buffer, size_t length) override; - int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override; + int Recv(void* buffer, size_t length, int64_t* timestamp) override; + int RecvFrom(void* buffer, + size_t length, + SocketAddress* out_addr, + int64_t* timestamp) override; int Listen(int backlog) override; AsyncSocket* Accept(SocketAddress* out_addr) override; diff --git a/webrtc/base/physicalsocketserver_unittest.cc b/webrtc/base/physicalsocketserver_unittest.cc index b069baaeae..bd1acfb9a4 100644 --- a/webrtc/base/physicalsocketserver_unittest.cc +++ b/webrtc/base/physicalsocketserver_unittest.cc @@ -386,6 +386,21 @@ TEST_F(PhysicalSocketTest, TestGetSetOptionsIPv6) { #if defined(WEBRTC_POSIX) +#if !defined(WEBRTC_MAC) +TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv4) { + SocketTest::TestSocketRecvTimestamp(); +} + +#if defined(WEBRTC_LINUX) +#define MAYBE_TestSocketRecvTimestampIPv6 DISABLED_TestSocketRecvTimestampIPv6 +#else +#define MAYBE_TestSocketRecvTimestampIPv6 TestSocketRecvTimestampIPv6 +#endif +TEST_F(PhysicalSocketTest, MAYBE_TestSocketRecvTimestampIPv6) { + SocketTest::TestSocketRecvTimestamp(); +} +#endif + class PosixSignalDeliveryTest : public testing::Test { public: static void RecordSignal(int signum) { diff --git a/webrtc/base/proxyserver.cc b/webrtc/base/proxyserver.cc index d91a92fbde..6a1bdcd004 100644 --- a/webrtc/base/proxyserver.cc +++ b/webrtc/base/proxyserver.cc @@ -129,7 +129,7 @@ void ProxyBinding::Read(AsyncSocket* socket, FifoBuffer* buffer) { int read; if (buffer->GetBuffered(&size) && size == 0) { void* p = buffer->GetWriteBuffer(&size); - read = socket->Recv(p, size); + read = socket->Recv(p, size, nullptr); buffer->ConsumeWriteBuffer(std::max(read, 0)); } } diff --git a/webrtc/base/socket.h b/webrtc/base/socket.h index 22326cb997..7db9459de7 100644 --- a/webrtc/base/socket.h +++ b/webrtc/base/socket.h @@ -151,8 +151,11 @@ class Socket { virtual int Connect(const SocketAddress& addr) = 0; virtual int Send(const void *pv, size_t cb) = 0; virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) = 0; - virtual int Recv(void *pv, size_t cb) = 0; - virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) = 0; + virtual int Recv(void* pv, size_t cb, int64_t* timestamp) = 0; + virtual int RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) = 0; virtual int Listen(int backlog) = 0; virtual Socket *Accept(SocketAddress *paddr) = 0; virtual int Close() = 0; diff --git a/webrtc/base/socket_unittest.cc b/webrtc/base/socket_unittest.cc index 718903d538..ea37f23f27 100644 --- a/webrtc/base/socket_unittest.cc +++ b/webrtc/base/socket_unittest.cc @@ -184,6 +184,10 @@ void SocketTest::TestGetSetOptionsIPv6() { GetSetOptionsInternal(kIPv6Loopback); } +void SocketTest::TestSocketRecvTimestamp() { + SocketRecvTimestamp(kIPv4Loopback); +} + // For unbound sockets, GetLocalAddress / GetRemoteAddress return AF_UNSPEC // values on Windows, but an empty address of the same family on Linux/MacOS X. bool IsUnspecOrEmptyIP(const IPAddress& address) { @@ -541,7 +545,7 @@ void SocketTest::ServerCloseInternal(const IPAddress& loopback) { // Ensure the data can be read. char buffer[10]; - EXPECT_EQ(1, client->Recv(buffer, sizeof(buffer))); + EXPECT_EQ(1, client->Recv(buffer, sizeof(buffer), nullptr)); EXPECT_EQ('a', buffer[0]); // Now we should close, but the remote address will remain. @@ -673,7 +677,7 @@ void SocketTest::SocketServerWaitInternal(const IPAddress& loopback) { // But should signal when process_io is true. EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout); - EXPECT_LT(0, accepted->Recv(buf, 1024)); + EXPECT_LT(0, accepted->Recv(buf, 1024, nullptr)); } void SocketTest::TcpInternal(const IPAddress& loopback, size_t data_size, @@ -763,7 +767,7 @@ void SocketTest::TcpInternal(const IPAddress& loopback, size_t data_size, // Receive as much as we can get in a single recv call. char recved_data[data_size]; - int recved_size = receiver->Recv(recved_data, data_size); + int recved_size = receiver->Recv(recved_data, data_size, nullptr); if (!recv_called) { // The first Recv() after getting readability should succeed and receive @@ -850,7 +854,7 @@ void SocketTest::SingleFlowControlCallbackInternal(const IPAddress& loopback) { // Pull data. for (int i = 0; i < sends; ++i) { - client->Recv(buf, arraysize(buf)); + client->Recv(buf, arraysize(buf), nullptr); } // Expect at least one additional writable callback. @@ -1023,4 +1027,25 @@ void SocketTest::GetSetOptionsInternal(const IPAddress& loopback) { } } +void SocketTest::SocketRecvTimestamp(const IPAddress& loopback) { + std::unique_ptr socket( + ss_->CreateSocket(loopback.family(), SOCK_DGRAM)); + EXPECT_EQ(0, socket->Bind(SocketAddress(loopback, 0))); + SocketAddress address = socket->GetLocalAddress(); + + socket->SendTo("foo", 3, address); + int64_t timestamp; + char buffer[3]; + socket->RecvFrom(buffer, 3, nullptr, ×tamp); + EXPECT_GT(timestamp, -1); + int64_t prev_timestamp = timestamp; + + const int64_t kTimeBetweenPacketsMs = 10; + Thread::SleepMs(kTimeBetweenPacketsMs); + + socket->SendTo("bar", 3, address); + socket->RecvFrom(buffer, 3, nullptr, ×tamp); + EXPECT_NEAR(timestamp, prev_timestamp + kTimeBetweenPacketsMs * 1000, 2000); +} + } // namespace rtc diff --git a/webrtc/base/socket_unittest.h b/webrtc/base/socket_unittest.h index adc69f1465..41f0a653cb 100644 --- a/webrtc/base/socket_unittest.h +++ b/webrtc/base/socket_unittest.h @@ -57,6 +57,7 @@ class SocketTest : public testing::Test { void TestUdpReadyToSendIPv6(); void TestGetSetOptionsIPv4(); void TestGetSetOptionsIPv6(); + void TestSocketRecvTimestamp(); static const int kTimeout = 5000; // ms const IPAddress kIPv4Loopback; @@ -84,6 +85,7 @@ class SocketTest : public testing::Test { void UdpInternal(const IPAddress& loopback); void UdpReadyToSend(const IPAddress& loopback); void GetSetOptionsInternal(const IPAddress& loopback); + void SocketRecvTimestamp(const IPAddress& loopback); SocketServer* ss_; }; diff --git a/webrtc/base/socketadapters.cc b/webrtc/base/socketadapters.cc index 85c126751c..e1c8a072b2 100644 --- a/webrtc/base/socketadapters.cc +++ b/webrtc/base/socketadapters.cc @@ -36,7 +36,7 @@ #if defined(WEBRTC_WIN) #include "webrtc/base/sec_buffer.h" -#endif // WEBRTC_WIN +#endif // WEBRTC_WIN namespace rtc { @@ -59,7 +59,7 @@ int BufferedReadAdapter::Send(const void *pv, size_t cb) { return AsyncSocketAdapter::Send(pv, cb); } -int BufferedReadAdapter::Recv(void *pv, size_t cb) { +int BufferedReadAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) { if (buffering_) { socket_->SetError(EWOULDBLOCK); return -1; @@ -80,7 +80,7 @@ int BufferedReadAdapter::Recv(void *pv, size_t cb) { // FIX: If cb == 0, we won't generate another read event - int res = AsyncSocketAdapter::Recv(pv, cb); + int res = AsyncSocketAdapter::Recv(pv, cb, timestamp); if (res >= 0) { // Read from socket and possibly buffer; return combined length return res + static_cast(read); @@ -113,7 +113,8 @@ void BufferedReadAdapter::OnReadEvent(AsyncSocket * socket) { data_len_ = 0; } - int len = socket_->Recv(buffer_ + data_len_, buffer_size_ - data_len_); + int len = + socket_->Recv(buffer_ + data_len_, buffer_size_ - data_len_, nullptr); if (len < 0) { // TODO: Do something better like forwarding the error to the user. LOG_ERR(INFO) << "Recv"; @@ -874,15 +875,18 @@ int LoggingSocketAdapter::SendTo(const void *pv, size_t cb, return res; } -int LoggingSocketAdapter::Recv(void *pv, size_t cb) { - int res = AsyncSocketAdapter::Recv(pv, cb); +int LoggingSocketAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) { + int res = AsyncSocketAdapter::Recv(pv, cb, timestamp); if (res > 0) LogMultiline(level_, label_.c_str(), true, pv, res, hex_mode_, &lms_); return res; } -int LoggingSocketAdapter::RecvFrom(void *pv, size_t cb, SocketAddress *paddr) { - int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr); +int LoggingSocketAdapter::RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) { + int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr, timestamp); if (res > 0) LogMultiline(level_, label_.c_str(), true, pv, res, hex_mode_, &lms_); return res; diff --git a/webrtc/base/socketadapters.h b/webrtc/base/socketadapters.h index 970a3b5eb6..02f6bca3c9 100644 --- a/webrtc/base/socketadapters.h +++ b/webrtc/base/socketadapters.h @@ -36,7 +36,7 @@ class BufferedReadAdapter : public AsyncSocketAdapter { ~BufferedReadAdapter() override; int Send(const void* pv, size_t cb) override; - int Recv(void* pv, size_t cb) override; + int Recv(void* pv, size_t cb, int64_t* timestamp) override; protected: int DirectSend(const void* pv, size_t cb) { @@ -224,8 +224,11 @@ class LoggingSocketAdapter : public AsyncSocketAdapter { int Send(const void* pv, size_t cb) override; int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override; - int Recv(void* pv, size_t cb) override; - int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override; + int Recv(void* pv, size_t cb, int64_t* timestamp) override; + int RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) override; int Close() override; protected: diff --git a/webrtc/base/socketstream.cc b/webrtc/base/socketstream.cc index b0acf94c58..9dc8794ebc 100644 --- a/webrtc/base/socketstream.cc +++ b/webrtc/base/socketstream.cc @@ -60,7 +60,7 @@ StreamState SocketStream::GetState() const { StreamResult SocketStream::Read(void* buffer, size_t buffer_len, size_t* read, int* error) { ASSERT(socket_ != NULL); - int result = socket_->Recv(buffer, buffer_len); + int result = socket_->Recv(buffer, buffer_len, nullptr); if (result < 0) { if (socket_->IsBlocking()) return SR_BLOCK; diff --git a/webrtc/base/ssladapter_unittest.cc b/webrtc/base/ssladapter_unittest.cc index 16e5c2ea32..adce5f4f01 100644 --- a/webrtc/base/ssladapter_unittest.cc +++ b/webrtc/base/ssladapter_unittest.cc @@ -102,7 +102,7 @@ class SSLAdapterTestDummyClient : public sigslot::has_slots<> { char buffer[4096] = ""; // Read data received from the server and store it in our internal buffer. - int read = socket->Recv(buffer, sizeof(buffer) - 1); + int read = socket->Recv(buffer, sizeof(buffer) - 1, nullptr); if (read != -1) { buffer[read] = '\0'; diff --git a/webrtc/base/testclient.cc b/webrtc/base/testclient.cc index be4de82ad6..fbb4f0cdf3 100644 --- a/webrtc/base/testclient.cc +++ b/webrtc/base/testclient.cc @@ -19,7 +19,7 @@ namespace rtc { // NextPacket. TestClient::TestClient(AsyncPacketSocket* socket) - : socket_(socket), ready_to_send_(false) { + : socket_(socket), ready_to_send_(false), prev_packet_timestamp_(-1) { packets_ = new std::vector(); socket_->SignalReadPacket.connect(this, &TestClient::OnPacket); socket_->SignalReadyToSend.connect(this, &TestClient::OnReadyToSend); @@ -91,7 +91,8 @@ bool TestClient::CheckNextPacket(const char* buf, size_t size, bool res = false; Packet* packet = NextPacket(kTimeoutMs); if (packet) { - res = (packet->size == size && memcmp(packet->buf, buf, size) == 0); + res = (packet->size == size && memcmp(packet->buf, buf, size) == 0 && + CheckTimestamp(packet->packet_time.timestamp)); if (addr) *addr = packet->addr; delete packet; @@ -99,6 +100,29 @@ bool TestClient::CheckNextPacket(const char* buf, size_t size, return res; } +bool TestClient::CheckTimestamp(int64_t packet_timestamp) { + bool res = true; + if (packet_timestamp == -1) { + res = false; + } + int64_t time_us = rtc::TimeMicros(); + if (prev_packet_timestamp_ != -1) { + if (packet_timestamp < prev_packet_timestamp_) { + res = false; + } + const int64_t kErrorMarginUs = 20000; + if (packet_timestamp - prev_packet_timestamp_ < + time_us - prev_time_us_ - kErrorMarginUs || + packet_timestamp - prev_packet_timestamp_ > + time_us - prev_time_us_ + kErrorMarginUs) { + res = false; + } + } + prev_packet_timestamp_ = packet_timestamp; + prev_time_us_ = time_us; + return res; +} + bool TestClient::CheckNoPacket() { bool res; Packet* packet = NextPacket(kNoPacketTimeoutMs); @@ -123,21 +147,24 @@ void TestClient::OnPacket(AsyncPacketSocket* socket, const char* buf, size_t size, const SocketAddress& remote_addr, const PacketTime& packet_time) { CritScope cs(&crit_); - packets_->push_back(new Packet(remote_addr, buf, size)); + packets_->push_back(new Packet(remote_addr, buf, size, packet_time)); } void TestClient::OnReadyToSend(AsyncPacketSocket* socket) { ready_to_send_ = true; } -TestClient::Packet::Packet(const SocketAddress& a, const char* b, size_t s) - : addr(a), buf(0), size(s) { +TestClient::Packet::Packet(const SocketAddress& a, + const char* b, + size_t s, + const PacketTime& packet_time) + : addr(a), buf(0), size(s), packet_time(packet_time) { buf = new char[size]; memcpy(buf, b, size); } TestClient::Packet::Packet(const Packet& p) - : addr(p.addr), buf(0), size(p.size) { + : addr(p.addr), buf(0), size(p.size), packet_time(p.packet_time) { buf = new char[size]; memcpy(buf, p.buf, size); } diff --git a/webrtc/base/testclient.h b/webrtc/base/testclient.h index df831fefb7..d78e142a2f 100644 --- a/webrtc/base/testclient.h +++ b/webrtc/base/testclient.h @@ -24,13 +24,17 @@ class TestClient : public sigslot::has_slots<> { public: // Records the contents of a packet that was received. struct Packet { - Packet(const SocketAddress& a, const char* b, size_t s); + Packet(const SocketAddress& a, + const char* b, + size_t s, + const PacketTime& packet_time); Packet(const Packet& p); virtual ~Packet(); SocketAddress addr; char* buf; size_t size; + PacketTime packet_time; }; // Default timeout for NextPacket reads. @@ -85,11 +89,14 @@ class TestClient : public sigslot::has_slots<> { const SocketAddress& remote_addr, const PacketTime& packet_time); void OnReadyToSend(AsyncPacketSocket* socket); + bool CheckTimestamp(int64_t packet_timestamp); CriticalSection crit_; AsyncPacketSocket* socket_; std::vector* packets_; bool ready_to_send_; + int64_t prev_packet_timestamp_; + int64_t prev_time_us_; RTC_DISALLOW_COPY_AND_ASSIGN(TestClient); }; diff --git a/webrtc/base/testutils.h b/webrtc/base/testutils.h index e5e571b9bf..c9d5a317e2 100644 --- a/webrtc/base/testutils.h +++ b/webrtc/base/testutils.h @@ -359,7 +359,7 @@ private: } void OnReadEvent(AsyncSocket* socket) { char data[64 * 1024]; - int result = socket_->Recv(data, arraysize(data)); + int result = socket_->Recv(data, arraysize(data), nullptr); if (result > 0) { recv_buffer_.insert(recv_buffer_.end(), data, data + result); } diff --git a/webrtc/base/virtualsocket_unittest.cc b/webrtc/base/virtualsocket_unittest.cc index e63310423f..ce89b5057c 100644 --- a/webrtc/base/virtualsocket_unittest.cc +++ b/webrtc/base/virtualsocket_unittest.cc @@ -464,7 +464,7 @@ class VirtualSocketServerTest : public testing::Test { char buffer[10]; EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ)); - EXPECT_EQ(-1, b->Recv(buffer, 10)); + EXPECT_EQ(-1, b->Recv(buffer, 10, nullptr)); EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE)); EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED); @@ -531,7 +531,7 @@ class VirtualSocketServerTest : public testing::Test { EXPECT_TRUE(a->IsBlocking()); // Read a subset of the data - result = b->Recv(recv_buffer + recv_pos, 500); + result = b->Recv(recv_buffer + recv_pos, 500, nullptr); EXPECT_EQ(500, result); recv_pos += result; @@ -546,7 +546,7 @@ class VirtualSocketServerTest : public testing::Test { // Empty the recv buffer while (true) { - result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); + result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos, nullptr); if (result < 0) { EXPECT_EQ(-1, result); EXPECT_TRUE(b->IsBlocking()); @@ -560,7 +560,7 @@ class VirtualSocketServerTest : public testing::Test { // Continue to empty the recv buffer while (true) { - result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); + result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos, nullptr); if (result < 0) { EXPECT_EQ(-1, result); EXPECT_TRUE(b->IsBlocking()); @@ -579,7 +579,7 @@ class VirtualSocketServerTest : public testing::Test { // Receive the last of the data while (true) { - result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); + result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos, nullptr); if (result < 0) { EXPECT_EQ(-1, result); EXPECT_TRUE(b->IsBlocking()); @@ -626,7 +626,7 @@ class VirtualSocketServerTest : public testing::Test { ss_->ProcessMessagesUntilIdle(); for (char i = 0; i < cNumPackets; ++i) { - EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); + EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer), nullptr)); EXPECT_EQ(static_cast('0' + i), buffer[0]); } @@ -646,7 +646,7 @@ class VirtualSocketServerTest : public testing::Test { ss_->ProcessMessagesUntilIdle(); for (char i = 0; i < cNumPackets; ++i) { - EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); + EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer), nullptr)); EXPECT_EQ(static_cast('A' + i), buffer[0]); } } diff --git a/webrtc/base/virtualsocketserver.cc b/webrtc/base/virtualsocketserver.cc index c76fe42f1e..baeeb8e204 100644 --- a/webrtc/base/virtualsocketserver.cc +++ b/webrtc/base/virtualsocketserver.cc @@ -264,12 +264,18 @@ int VirtualSocket::SendTo(const void* pv, } } -int VirtualSocket::Recv(void* pv, size_t cb) { +int VirtualSocket::Recv(void* pv, size_t cb, int64_t* timestamp) { SocketAddress addr; - return RecvFrom(pv, cb, &addr); + return RecvFrom(pv, cb, &addr, timestamp); } -int VirtualSocket::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) { +int VirtualSocket::RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) { + if (timestamp) { + *timestamp = -1; + } // If we don't have a packet, then either error or wait for one to arrive. if (recv_buffer_.empty()) { if (async_) { diff --git a/webrtc/base/virtualsocketserver.h b/webrtc/base/virtualsocketserver.h index 897ba9e5eb..8673d40f83 100644 --- a/webrtc/base/virtualsocketserver.h +++ b/webrtc/base/virtualsocketserver.h @@ -274,8 +274,11 @@ class VirtualSocket : public AsyncSocket, public MessageHandler { int Close() override; int Send(const void* pv, size_t cb) override; int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override; - int Recv(void* pv, size_t cb) override; - int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override; + int Recv(void* pv, size_t cb, int64_t* timestamp) override; + int RecvFrom(void* pv, + size_t cb, + SocketAddress* paddr, + int64_t* timestamp) override; int Listen(int backlog) override; VirtualSocket* Accept(SocketAddress* paddr) override; diff --git a/webrtc/base/win32socketserver.cc b/webrtc/base/win32socketserver.cc index 5423eed9bf..ab25312df0 100644 --- a/webrtc/base/win32socketserver.cc +++ b/webrtc/base/win32socketserver.cc @@ -438,7 +438,10 @@ int Win32Socket::SendTo(const void* buffer, size_t length, return sent; } -int Win32Socket::Recv(void* buffer, size_t length) { +int Win32Socket::Recv(void* buffer, size_t length, int64_t* timestamp) { + if (timestamp) { + *timestamp = -1; + } int received = ::recv(socket_, static_cast(buffer), static_cast(length), 0); UpdateLastError(); @@ -447,8 +450,13 @@ int Win32Socket::Recv(void* buffer, size_t length) { return received; } -int Win32Socket::RecvFrom(void* buffer, size_t length, - SocketAddress* out_addr) { +int Win32Socket::RecvFrom(void* buffer, + size_t length, + SocketAddress* out_addr, + int64_t* timestamp) { + if (timestamp) { + *timestamp = -1; + } sockaddr_storage saddr; socklen_t addr_len = sizeof(saddr); int received = ::recvfrom(socket_, static_cast(buffer), diff --git a/webrtc/base/win32socketserver.h b/webrtc/base/win32socketserver.h index f47ed75696..28bee6af06 100644 --- a/webrtc/base/win32socketserver.h +++ b/webrtc/base/win32socketserver.h @@ -44,8 +44,11 @@ class Win32Socket : public AsyncSocket { virtual int Connect(const SocketAddress& addr); virtual int Send(const void *buffer, size_t length); virtual int SendTo(const void *buffer, size_t length, const SocketAddress& addr); - virtual int Recv(void *buffer, size_t length); - virtual int RecvFrom(void *buffer, size_t length, SocketAddress *out_addr); + virtual int Recv(void* buffer, size_t length, int64_t* timestamp); + virtual int RecvFrom(void* buffer, + size_t length, + SocketAddress* out_addr, + int64_t* timestamp); virtual int Listen(int backlog); virtual Win32Socket *Accept(SocketAddress *out_addr); virtual int Close(); diff --git a/webrtc/examples/peerconnection/client/peer_connection_client.cc b/webrtc/examples/peerconnection/client/peer_connection_client.cc index 9875115c4b..86d21193fd 100644 --- a/webrtc/examples/peerconnection/client/peer_connection_client.cc +++ b/webrtc/examples/peerconnection/client/peer_connection_client.cc @@ -296,7 +296,7 @@ bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket, size_t* content_length) { char buffer[0xffff]; do { - int bytes = socket->Recv(buffer, sizeof(buffer)); + int bytes = socket->Recv(buffer, sizeof(buffer), nullptr); if (bytes <= 0) break; data->append(buffer, bytes); diff --git a/webrtc/libjingle/xmpp/xmppsocket.cc b/webrtc/libjingle/xmpp/xmppsocket.cc index 9c1bf8b735..c42dcd128e 100644 --- a/webrtc/libjingle/xmpp/xmppsocket.cc +++ b/webrtc/libjingle/xmpp/xmppsocket.cc @@ -179,7 +179,7 @@ bool XmppSocket::Connect(const rtc::SocketAddress& addr) { bool XmppSocket::Read(char * data, size_t len, size_t* len_read) { #ifndef USE_SSLSTREAM - int read = cricket_socket_->Recv(data, len); + int read = cricket_socket_->Recv(data, len, nullptr); if (read > 0) { *len_read = (size_t)read; return true; diff --git a/webrtc/libjingle/xmpp/xmppsocket.h b/webrtc/libjingle/xmpp/xmppsocket.h index d862afd7de..02d645383c 100644 --- a/webrtc/libjingle/xmpp/xmppsocket.h +++ b/webrtc/libjingle/xmpp/xmppsocket.h @@ -19,7 +19,7 @@ // The below define selects the SSLStreamAdapter implementation for // SSL, as opposed to the SSLAdapter socket adapter. -// #define USE_SSLSTREAM +// #define USE_SSLSTREAM namespace rtc { class StreamInterface;