Read recv timestamps from socket (posix only).

This helps a lot on Android devices where the user threads can be scheduled with low priority when the app is in the background, causing spurious significantly delayed before a packet can be read from the socket. With this patch the timestamp is taken by the kernel when the packet actually arrives.

R=juberti@chromium.org
TBR=juberti@webrtc.org

BUG=webrtc:5773

Review URL: https://codereview.webrtc.org/1944683002 .

Cr-Commit-Position: refs/heads/master@{#12850}
This commit is contained in:
Stefan Holmer 2016-05-23 18:19:26 +02:00
parent 181310fb6f
commit 9131efdb30
34 changed files with 256 additions and 89 deletions

View File

@ -64,12 +64,15 @@ int AsyncSocketAdapter::SendTo(const void* pv,
return socket_->SendTo(pv, cb, addr);
}
int AsyncSocketAdapter::Recv(void* pv, size_t cb) {
return socket_->Recv(pv, cb);
int AsyncSocketAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) {
return socket_->Recv(pv, cb, timestamp);
}
int AsyncSocketAdapter::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) {
return socket_->RecvFrom(pv, cb, paddr);
int AsyncSocketAdapter::RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) {
return socket_->RecvFrom(pv, cb, paddr, timestamp);
}
int AsyncSocketAdapter::Listen(int backlog) {

View File

@ -56,8 +56,11 @@ class AsyncSocketAdapter : public AsyncSocket, public sigslot::has_slots<> {
int Connect(const SocketAddress& addr) override;
int Send(const void* pv, size_t cb) override;
int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override;
int Recv(void* pv, size_t cb) override;
int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override;
int Recv(void* pv, size_t cb, int64_t* timestamp) override;
int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) override;
int Listen(int backlog) override;
AsyncSocket* Accept(SocketAddress* paddr) override;
int Close() override;

View File

@ -208,7 +208,8 @@ void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) {
free_size = inbuf_.capacity() - inbuf_.size();
}
int len = socket_->Recv(inbuf_.data() + inbuf_.size(), free_size);
int len =
socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr);
if (len < 0) {
// TODO(stefan): Do something better like forwarding the error to the
// user.

View File

@ -102,7 +102,8 @@ void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket) {
ASSERT(socket_.get() == socket);
SocketAddress remote_addr;
int len = socket_->RecvFrom(buf_, size_, &remote_addr);
int64_t timestamp;
int len = socket_->RecvFrom(buf_, size_, &remote_addr, &timestamp);
if (len < 0) {
// An error here typically means we got an ICMP error in response to our
// send datagram, indicating the remote address was unreachable.
@ -116,8 +117,9 @@ void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket) {
// TODO: Make sure that we got all of the packet.
// If we did not, then we should resize our buffer to be large enough.
SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr,
CreatePacketTime(0));
SignalReadPacket(
this, buf_, static_cast<size_t>(len), remote_addr,
(timestamp > -1 ? PacketTime(timestamp, 0) : CreatePacketTime(0)));
}
void AsyncUDPSocket::OnWriteEvent(AsyncSocket* socket) {

View File

@ -247,7 +247,7 @@ void AutoDetectProxy::OnConnectEvent(AsyncSocket * socket) {
void AutoDetectProxy::OnReadEvent(AsyncSocket * socket) {
char data[257];
int len = socket_->Recv(data, 256);
int len = socket_->Recv(data, 256, nullptr);
if (len > 0) {
data[len] = 0;
LOG(LS_VERBOSE) << "AutoDetectProxy read " << len << " bytes";

View File

@ -52,14 +52,17 @@ class FirewallSocket : public AsyncSocketAdapter {
}
return AsyncSocketAdapter::SendTo(pv, cb, addr);
}
int Recv(void* pv, size_t cb) override {
int Recv(void* pv, size_t cb, int64_t* timestamp) override {
SocketAddress addr;
return RecvFrom(pv, cb, &addr);
return RecvFrom(pv, cb, &addr, timestamp);
}
int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override {
int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) override {
if (type_ == SOCK_DGRAM) {
while (true) {
int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr);
int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr, timestamp);
if (res <= 0)
return res;
if (server_->Check(FP_UDP, *paddr, GetLocalAddress()))
@ -69,7 +72,7 @@ class FirewallSocket : public AsyncSocketAdapter {
<< GetLocalAddress().ToSensitiveString() << " dropped";
}
}
return AsyncSocketAdapter::RecvFrom(pv, cb, paddr);
return AsyncSocketAdapter::RecvFrom(pv, cb, paddr, timestamp);
}
int Listen(int backlog) override {

View File

@ -188,7 +188,10 @@ int MacAsyncSocket::SendTo(const void* buffer, size_t length,
}
// Read data received from the remote end we're connected to.
int MacAsyncSocket::Recv(void* buffer, size_t length) {
int MacAsyncSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
if (timestamp) {
*timestamp = -1;
}
int received = ::recv(native_socket_, reinterpret_cast<char*>(buffer),
length, 0);
if (received == SOCKET_ERROR) error_ = errno;
@ -199,8 +202,13 @@ int MacAsyncSocket::Recv(void* buffer, size_t length) {
}
// Read data received from any remote party
int MacAsyncSocket::RecvFrom(void* buffer, size_t length,
SocketAddress* out_addr) {
int MacAsyncSocket::RecvFrom(void* buffer,
size_t length,
SocketAddress* out_addr,
int64_t* timestamp) {
if (timestamp) {
*timestamp = -1;
}
sockaddr_storage saddr;
socklen_t addr_len = sizeof(saddr);
int received = ::recvfrom(native_socket_, reinterpret_cast<char*>(buffer),

View File

@ -42,8 +42,11 @@ class MacAsyncSocket : public AsyncSocket, public sigslot::has_slots<> {
int SendTo(const void* buffer,
size_t length,
const SocketAddress& addr) override;
int Recv(void* buffer, size_t length) override;
int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override;
int Recv(void* buffer, size_t length, int64_t* timestamp) override;
int RecvFrom(void* buffer,
size_t length,
SocketAddress* out_addr,
int64_t* timestamp) override;
int Listen(int backlog) override;
MacAsyncSocket* Accept(SocketAddress* out_addr) override;
int Close() override;

View File

@ -195,7 +195,7 @@ bool TestConnectivity(const SocketAddress& src, const IPAddress& dst) {
const size_t kRecvBufSize = 64;
char recvbuf[kRecvBufSize];
Thread::Current()->SleepMs(100);
int received = server->RecvFrom(recvbuf, kRecvBufSize, &addr);
int received = server->RecvFrom(recvbuf, kRecvBufSize, &addr, nullptr);
return received == sent && ::memcmp(buf, recvbuf, len) == 0;
}

View File

@ -155,14 +155,17 @@ class NATSocket : public AsyncSocket, public sigslot::has_slots<> {
return result;
}
int Recv(void* data, size_t size) override {
int Recv(void* data, size_t size, int64_t* timestamp) override {
SocketAddress addr;
return RecvFrom(data, size, &addr);
return RecvFrom(data, size, &addr, timestamp);
}
int RecvFrom(void* data, size_t size, SocketAddress* out_addr) override {
int RecvFrom(void* data,
size_t size,
SocketAddress* out_addr,
int64_t* timestamp) override {
if (server_addr_.IsNil() || type_ == SOCK_STREAM) {
return socket_->RecvFrom(data, size, out_addr);
return socket_->RecvFrom(data, size, out_addr, timestamp);
}
// Make sure we have enough room to read the requested amount plus the
// largest possible header address.
@ -170,7 +173,7 @@ class NATSocket : public AsyncSocket, public sigslot::has_slots<> {
Grow(size + kNATEncodedIPv6AddressSize);
// Read the packet from the socket.
int result = socket_->RecvFrom(buf_, size_, &remote_addr);
int result = socket_->RecvFrom(buf_, size_, &remote_addr, timestamp);
if (result >= 0) {
ASSERT(remote_addr == server_addr_);
@ -278,7 +281,7 @@ class NATSocket : public AsyncSocket, public sigslot::has_slots<> {
// Handles the byte sent back from the server and fires the appropriate event.
void HandleConnectReply() {
char code;
socket_->Recv(&code, sizeof(code));
socket_->Recv(&code, sizeof(code), nullptr);
if (code == 0) {
connected_ = true;
SignalConnectEvent(this);

View File

@ -119,7 +119,7 @@ static int socket_read(BIO* b, char* out, int outl) {
return -1;
rtc::AsyncSocket* socket = static_cast<rtc::AsyncSocket*>(b->ptr);
BIO_clear_retry_flags(b);
int result = socket->Recv(out, outl);
int result = socket->Recv(out, outl, nullptr);
if (result > 0) {
return result;
} else if (result == 0) {
@ -524,13 +524,12 @@ OpenSSLAdapter::SendTo(const void* pv, size_t cb, const SocketAddress& addr) {
return SOCKET_ERROR;
}
int
OpenSSLAdapter::Recv(void* pv, size_t cb) {
int OpenSSLAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) {
//LOG(LS_INFO) << "OpenSSLAdapter::Recv(" << cb << ")";
switch (state_) {
case SSL_NONE:
return AsyncSocketAdapter::Recv(pv, cb);
return AsyncSocketAdapter::Recv(pv, cb, timestamp);
case SSL_WAIT:
case SSL_CONNECTING:
@ -579,10 +578,12 @@ OpenSSLAdapter::Recv(void* pv, size_t cb) {
return SOCKET_ERROR;
}
int
OpenSSLAdapter::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) {
int OpenSSLAdapter::RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) {
if (socket_->GetState() == Socket::CS_CONNECTED) {
int ret = Recv(pv, cb);
int ret = Recv(pv, cb, timestamp);
*paddr = GetRemoteAddress();

View File

@ -37,8 +37,11 @@ public:
int StartSSL(const char* hostname, bool restartable) override;
int Send(const void* pv, size_t cb) override;
int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override;
int Recv(void* pv, size_t cb) override;
int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override;
int Recv(void* pv, size_t cb, int64_t* timestamp) override;
int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) override;
int Close() override;
// Note that the socket returns ST_CONNECTING while SSL is being negotiated.

View File

@ -23,6 +23,7 @@
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/select.h>
#include <unistd.h>
@ -55,8 +56,28 @@
#include <netinet/tcp.h> // for TCP_NODELAY
#define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
typedef void* SockOptArg;
#endif // WEBRTC_POSIX
#if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC)
int64_t GetSocketRecvTimestamp(int socket) {
struct timeval tv_ioctl;
int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl);
if (ret != 0)
return -1;
int64_t timestamp =
rtc::kNumMicrosecsPerSec * static_cast<int64_t>(tv_ioctl.tv_sec) +
static_cast<int64_t>(tv_ioctl.tv_usec);
return timestamp;
}
#else
int64_t GetSocketRecvTimestamp(int socket) {
return -1;
}
#endif
#if defined(WEBRTC_WIN)
typedef char* SockOptArg;
#endif
@ -324,7 +345,7 @@ int PhysicalSocket::SendTo(const void* buffer,
return sent;
}
int PhysicalSocket::Recv(void* buffer, size_t length) {
int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
int received = ::recv(s_, static_cast<char*>(buffer),
static_cast<int>(length), 0);
if ((received == 0) && (length != 0)) {
@ -338,6 +359,9 @@ int PhysicalSocket::Recv(void* buffer, size_t length) {
SetError(EWOULDBLOCK);
return SOCKET_ERROR;
}
if (timestamp) {
*timestamp = GetSocketRecvTimestamp(s_);
}
UpdateLastError();
int error = GetError();
bool success = (received >= 0) || IsBlockingError(error);
@ -352,12 +376,16 @@ int PhysicalSocket::Recv(void* buffer, size_t length) {
int PhysicalSocket::RecvFrom(void* buffer,
size_t length,
SocketAddress* out_addr) {
SocketAddress* out_addr,
int64_t* timestamp) {
sockaddr_storage addr_storage;
socklen_t addr_len = sizeof(addr_storage);
sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
int received = ::recvfrom(s_, static_cast<char*>(buffer),
static_cast<int>(length), 0, addr, &addr_len);
if (timestamp) {
*timestamp = GetSocketRecvTimestamp(s_);
}
UpdateLastError();
if ((received >= 0) && (out_addr != nullptr))
SocketAddressFromSockAddrStorage(addr_storage, out_addr);

View File

@ -143,8 +143,11 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
size_t length,
const SocketAddress& addr) override;
int Recv(void* buffer, size_t length) override;
int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override;
int Recv(void* buffer, size_t length, int64_t* timestamp) override;
int RecvFrom(void* buffer,
size_t length,
SocketAddress* out_addr,
int64_t* timestamp) override;
int Listen(int backlog) override;
AsyncSocket* Accept(SocketAddress* out_addr) override;

View File

@ -386,6 +386,21 @@ TEST_F(PhysicalSocketTest, TestGetSetOptionsIPv6) {
#if defined(WEBRTC_POSIX)
#if !defined(WEBRTC_MAC)
TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv4) {
SocketTest::TestSocketRecvTimestamp();
}
#if defined(WEBRTC_LINUX)
#define MAYBE_TestSocketRecvTimestampIPv6 DISABLED_TestSocketRecvTimestampIPv6
#else
#define MAYBE_TestSocketRecvTimestampIPv6 TestSocketRecvTimestampIPv6
#endif
TEST_F(PhysicalSocketTest, MAYBE_TestSocketRecvTimestampIPv6) {
SocketTest::TestSocketRecvTimestamp();
}
#endif
class PosixSignalDeliveryTest : public testing::Test {
public:
static void RecordSignal(int signum) {

View File

@ -129,7 +129,7 @@ void ProxyBinding::Read(AsyncSocket* socket, FifoBuffer* buffer) {
int read;
if (buffer->GetBuffered(&size) && size == 0) {
void* p = buffer->GetWriteBuffer(&size);
read = socket->Recv(p, size);
read = socket->Recv(p, size, nullptr);
buffer->ConsumeWriteBuffer(std::max(read, 0));
}
}

View File

@ -151,8 +151,11 @@ class Socket {
virtual int Connect(const SocketAddress& addr) = 0;
virtual int Send(const void *pv, size_t cb) = 0;
virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) = 0;
virtual int Recv(void *pv, size_t cb) = 0;
virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) = 0;
virtual int Recv(void* pv, size_t cb, int64_t* timestamp) = 0;
virtual int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) = 0;
virtual int Listen(int backlog) = 0;
virtual Socket *Accept(SocketAddress *paddr) = 0;
virtual int Close() = 0;

View File

@ -184,6 +184,10 @@ void SocketTest::TestGetSetOptionsIPv6() {
GetSetOptionsInternal(kIPv6Loopback);
}
void SocketTest::TestSocketRecvTimestamp() {
SocketRecvTimestamp(kIPv4Loopback);
}
// For unbound sockets, GetLocalAddress / GetRemoteAddress return AF_UNSPEC
// values on Windows, but an empty address of the same family on Linux/MacOS X.
bool IsUnspecOrEmptyIP(const IPAddress& address) {
@ -541,7 +545,7 @@ void SocketTest::ServerCloseInternal(const IPAddress& loopback) {
// Ensure the data can be read.
char buffer[10];
EXPECT_EQ(1, client->Recv(buffer, sizeof(buffer)));
EXPECT_EQ(1, client->Recv(buffer, sizeof(buffer), nullptr));
EXPECT_EQ('a', buffer[0]);
// Now we should close, but the remote address will remain.
@ -673,7 +677,7 @@ void SocketTest::SocketServerWaitInternal(const IPAddress& loopback) {
// But should signal when process_io is true.
EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout);
EXPECT_LT(0, accepted->Recv(buf, 1024));
EXPECT_LT(0, accepted->Recv(buf, 1024, nullptr));
}
void SocketTest::TcpInternal(const IPAddress& loopback, size_t data_size,
@ -763,7 +767,7 @@ void SocketTest::TcpInternal(const IPAddress& loopback, size_t data_size,
// Receive as much as we can get in a single recv call.
char recved_data[data_size];
int recved_size = receiver->Recv(recved_data, data_size);
int recved_size = receiver->Recv(recved_data, data_size, nullptr);
if (!recv_called) {
// The first Recv() after getting readability should succeed and receive
@ -850,7 +854,7 @@ void SocketTest::SingleFlowControlCallbackInternal(const IPAddress& loopback) {
// Pull data.
for (int i = 0; i < sends; ++i) {
client->Recv(buf, arraysize(buf));
client->Recv(buf, arraysize(buf), nullptr);
}
// Expect at least one additional writable callback.
@ -1023,4 +1027,25 @@ void SocketTest::GetSetOptionsInternal(const IPAddress& loopback) {
}
}
void SocketTest::SocketRecvTimestamp(const IPAddress& loopback) {
std::unique_ptr<Socket> socket(
ss_->CreateSocket(loopback.family(), SOCK_DGRAM));
EXPECT_EQ(0, socket->Bind(SocketAddress(loopback, 0)));
SocketAddress address = socket->GetLocalAddress();
socket->SendTo("foo", 3, address);
int64_t timestamp;
char buffer[3];
socket->RecvFrom(buffer, 3, nullptr, &timestamp);
EXPECT_GT(timestamp, -1);
int64_t prev_timestamp = timestamp;
const int64_t kTimeBetweenPacketsMs = 10;
Thread::SleepMs(kTimeBetweenPacketsMs);
socket->SendTo("bar", 3, address);
socket->RecvFrom(buffer, 3, nullptr, &timestamp);
EXPECT_NEAR(timestamp, prev_timestamp + kTimeBetweenPacketsMs * 1000, 2000);
}
} // namespace rtc

View File

@ -57,6 +57,7 @@ class SocketTest : public testing::Test {
void TestUdpReadyToSendIPv6();
void TestGetSetOptionsIPv4();
void TestGetSetOptionsIPv6();
void TestSocketRecvTimestamp();
static const int kTimeout = 5000; // ms
const IPAddress kIPv4Loopback;
@ -84,6 +85,7 @@ class SocketTest : public testing::Test {
void UdpInternal(const IPAddress& loopback);
void UdpReadyToSend(const IPAddress& loopback);
void GetSetOptionsInternal(const IPAddress& loopback);
void SocketRecvTimestamp(const IPAddress& loopback);
SocketServer* ss_;
};

View File

@ -36,7 +36,7 @@
#if defined(WEBRTC_WIN)
#include "webrtc/base/sec_buffer.h"
#endif // WEBRTC_WIN
#endif // WEBRTC_WIN
namespace rtc {
@ -59,7 +59,7 @@ int BufferedReadAdapter::Send(const void *pv, size_t cb) {
return AsyncSocketAdapter::Send(pv, cb);
}
int BufferedReadAdapter::Recv(void *pv, size_t cb) {
int BufferedReadAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) {
if (buffering_) {
socket_->SetError(EWOULDBLOCK);
return -1;
@ -80,7 +80,7 @@ int BufferedReadAdapter::Recv(void *pv, size_t cb) {
// FIX: If cb == 0, we won't generate another read event
int res = AsyncSocketAdapter::Recv(pv, cb);
int res = AsyncSocketAdapter::Recv(pv, cb, timestamp);
if (res >= 0) {
// Read from socket and possibly buffer; return combined length
return res + static_cast<int>(read);
@ -113,7 +113,8 @@ void BufferedReadAdapter::OnReadEvent(AsyncSocket * socket) {
data_len_ = 0;
}
int len = socket_->Recv(buffer_ + data_len_, buffer_size_ - data_len_);
int len =
socket_->Recv(buffer_ + data_len_, buffer_size_ - data_len_, nullptr);
if (len < 0) {
// TODO: Do something better like forwarding the error to the user.
LOG_ERR(INFO) << "Recv";
@ -874,15 +875,18 @@ int LoggingSocketAdapter::SendTo(const void *pv, size_t cb,
return res;
}
int LoggingSocketAdapter::Recv(void *pv, size_t cb) {
int res = AsyncSocketAdapter::Recv(pv, cb);
int LoggingSocketAdapter::Recv(void* pv, size_t cb, int64_t* timestamp) {
int res = AsyncSocketAdapter::Recv(pv, cb, timestamp);
if (res > 0)
LogMultiline(level_, label_.c_str(), true, pv, res, hex_mode_, &lms_);
return res;
}
int LoggingSocketAdapter::RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr);
int LoggingSocketAdapter::RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) {
int res = AsyncSocketAdapter::RecvFrom(pv, cb, paddr, timestamp);
if (res > 0)
LogMultiline(level_, label_.c_str(), true, pv, res, hex_mode_, &lms_);
return res;

View File

@ -36,7 +36,7 @@ class BufferedReadAdapter : public AsyncSocketAdapter {
~BufferedReadAdapter() override;
int Send(const void* pv, size_t cb) override;
int Recv(void* pv, size_t cb) override;
int Recv(void* pv, size_t cb, int64_t* timestamp) override;
protected:
int DirectSend(const void* pv, size_t cb) {
@ -224,8 +224,11 @@ class LoggingSocketAdapter : public AsyncSocketAdapter {
int Send(const void* pv, size_t cb) override;
int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override;
int Recv(void* pv, size_t cb) override;
int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override;
int Recv(void* pv, size_t cb, int64_t* timestamp) override;
int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) override;
int Close() override;
protected:

View File

@ -60,7 +60,7 @@ StreamState SocketStream::GetState() const {
StreamResult SocketStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
ASSERT(socket_ != NULL);
int result = socket_->Recv(buffer, buffer_len);
int result = socket_->Recv(buffer, buffer_len, nullptr);
if (result < 0) {
if (socket_->IsBlocking())
return SR_BLOCK;

View File

@ -102,7 +102,7 @@ class SSLAdapterTestDummyClient : public sigslot::has_slots<> {
char buffer[4096] = "";
// Read data received from the server and store it in our internal buffer.
int read = socket->Recv(buffer, sizeof(buffer) - 1);
int read = socket->Recv(buffer, sizeof(buffer) - 1, nullptr);
if (read != -1) {
buffer[read] = '\0';

View File

@ -19,7 +19,7 @@ namespace rtc {
// NextPacket.
TestClient::TestClient(AsyncPacketSocket* socket)
: socket_(socket), ready_to_send_(false) {
: socket_(socket), ready_to_send_(false), prev_packet_timestamp_(-1) {
packets_ = new std::vector<Packet*>();
socket_->SignalReadPacket.connect(this, &TestClient::OnPacket);
socket_->SignalReadyToSend.connect(this, &TestClient::OnReadyToSend);
@ -91,7 +91,8 @@ bool TestClient::CheckNextPacket(const char* buf, size_t size,
bool res = false;
Packet* packet = NextPacket(kTimeoutMs);
if (packet) {
res = (packet->size == size && memcmp(packet->buf, buf, size) == 0);
res = (packet->size == size && memcmp(packet->buf, buf, size) == 0 &&
CheckTimestamp(packet->packet_time.timestamp));
if (addr)
*addr = packet->addr;
delete packet;
@ -99,6 +100,29 @@ bool TestClient::CheckNextPacket(const char* buf, size_t size,
return res;
}
bool TestClient::CheckTimestamp(int64_t packet_timestamp) {
bool res = true;
if (packet_timestamp == -1) {
res = false;
}
int64_t time_us = rtc::TimeMicros();
if (prev_packet_timestamp_ != -1) {
if (packet_timestamp < prev_packet_timestamp_) {
res = false;
}
const int64_t kErrorMarginUs = 20000;
if (packet_timestamp - prev_packet_timestamp_ <
time_us - prev_time_us_ - kErrorMarginUs ||
packet_timestamp - prev_packet_timestamp_ >
time_us - prev_time_us_ + kErrorMarginUs) {
res = false;
}
}
prev_packet_timestamp_ = packet_timestamp;
prev_time_us_ = time_us;
return res;
}
bool TestClient::CheckNoPacket() {
bool res;
Packet* packet = NextPacket(kNoPacketTimeoutMs);
@ -123,21 +147,24 @@ void TestClient::OnPacket(AsyncPacketSocket* socket, const char* buf,
size_t size, const SocketAddress& remote_addr,
const PacketTime& packet_time) {
CritScope cs(&crit_);
packets_->push_back(new Packet(remote_addr, buf, size));
packets_->push_back(new Packet(remote_addr, buf, size, packet_time));
}
void TestClient::OnReadyToSend(AsyncPacketSocket* socket) {
ready_to_send_ = true;
}
TestClient::Packet::Packet(const SocketAddress& a, const char* b, size_t s)
: addr(a), buf(0), size(s) {
TestClient::Packet::Packet(const SocketAddress& a,
const char* b,
size_t s,
const PacketTime& packet_time)
: addr(a), buf(0), size(s), packet_time(packet_time) {
buf = new char[size];
memcpy(buf, b, size);
}
TestClient::Packet::Packet(const Packet& p)
: addr(p.addr), buf(0), size(p.size) {
: addr(p.addr), buf(0), size(p.size), packet_time(p.packet_time) {
buf = new char[size];
memcpy(buf, p.buf, size);
}

View File

@ -24,13 +24,17 @@ class TestClient : public sigslot::has_slots<> {
public:
// Records the contents of a packet that was received.
struct Packet {
Packet(const SocketAddress& a, const char* b, size_t s);
Packet(const SocketAddress& a,
const char* b,
size_t s,
const PacketTime& packet_time);
Packet(const Packet& p);
virtual ~Packet();
SocketAddress addr;
char* buf;
size_t size;
PacketTime packet_time;
};
// Default timeout for NextPacket reads.
@ -85,11 +89,14 @@ class TestClient : public sigslot::has_slots<> {
const SocketAddress& remote_addr,
const PacketTime& packet_time);
void OnReadyToSend(AsyncPacketSocket* socket);
bool CheckTimestamp(int64_t packet_timestamp);
CriticalSection crit_;
AsyncPacketSocket* socket_;
std::vector<Packet*>* packets_;
bool ready_to_send_;
int64_t prev_packet_timestamp_;
int64_t prev_time_us_;
RTC_DISALLOW_COPY_AND_ASSIGN(TestClient);
};

View File

@ -359,7 +359,7 @@ private:
}
void OnReadEvent(AsyncSocket* socket) {
char data[64 * 1024];
int result = socket_->Recv(data, arraysize(data));
int result = socket_->Recv(data, arraysize(data), nullptr);
if (result > 0) {
recv_buffer_.insert(recv_buffer_.end(), data, data + result);
}

View File

@ -464,7 +464,7 @@ class VirtualSocketServerTest : public testing::Test {
char buffer[10];
EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
EXPECT_EQ(-1, b->Recv(buffer, 10));
EXPECT_EQ(-1, b->Recv(buffer, 10, nullptr));
EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
@ -531,7 +531,7 @@ class VirtualSocketServerTest : public testing::Test {
EXPECT_TRUE(a->IsBlocking());
// Read a subset of the data
result = b->Recv(recv_buffer + recv_pos, 500);
result = b->Recv(recv_buffer + recv_pos, 500, nullptr);
EXPECT_EQ(500, result);
recv_pos += result;
@ -546,7 +546,7 @@ class VirtualSocketServerTest : public testing::Test {
// Empty the recv buffer
while (true) {
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos, nullptr);
if (result < 0) {
EXPECT_EQ(-1, result);
EXPECT_TRUE(b->IsBlocking());
@ -560,7 +560,7 @@ class VirtualSocketServerTest : public testing::Test {
// Continue to empty the recv buffer
while (true) {
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos, nullptr);
if (result < 0) {
EXPECT_EQ(-1, result);
EXPECT_TRUE(b->IsBlocking());
@ -579,7 +579,7 @@ class VirtualSocketServerTest : public testing::Test {
// Receive the last of the data
while (true) {
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos, nullptr);
if (result < 0) {
EXPECT_EQ(-1, result);
EXPECT_TRUE(b->IsBlocking());
@ -626,7 +626,7 @@ class VirtualSocketServerTest : public testing::Test {
ss_->ProcessMessagesUntilIdle();
for (char i = 0; i < cNumPackets; ++i) {
EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer), nullptr));
EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
}
@ -646,7 +646,7 @@ class VirtualSocketServerTest : public testing::Test {
ss_->ProcessMessagesUntilIdle();
for (char i = 0; i < cNumPackets; ++i) {
EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer), nullptr));
EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
}
}

View File

@ -264,12 +264,18 @@ int VirtualSocket::SendTo(const void* pv,
}
}
int VirtualSocket::Recv(void* pv, size_t cb) {
int VirtualSocket::Recv(void* pv, size_t cb, int64_t* timestamp) {
SocketAddress addr;
return RecvFrom(pv, cb, &addr);
return RecvFrom(pv, cb, &addr, timestamp);
}
int VirtualSocket::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) {
int VirtualSocket::RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) {
if (timestamp) {
*timestamp = -1;
}
// If we don't have a packet, then either error or wait for one to arrive.
if (recv_buffer_.empty()) {
if (async_) {

View File

@ -274,8 +274,11 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
int Close() override;
int Send(const void* pv, size_t cb) override;
int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override;
int Recv(void* pv, size_t cb) override;
int RecvFrom(void* pv, size_t cb, SocketAddress* paddr) override;
int Recv(void* pv, size_t cb, int64_t* timestamp) override;
int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) override;
int Listen(int backlog) override;
VirtualSocket* Accept(SocketAddress* paddr) override;

View File

@ -438,7 +438,10 @@ int Win32Socket::SendTo(const void* buffer, size_t length,
return sent;
}
int Win32Socket::Recv(void* buffer, size_t length) {
int Win32Socket::Recv(void* buffer, size_t length, int64_t* timestamp) {
if (timestamp) {
*timestamp = -1;
}
int received = ::recv(socket_, static_cast<char*>(buffer),
static_cast<int>(length), 0);
UpdateLastError();
@ -447,8 +450,13 @@ int Win32Socket::Recv(void* buffer, size_t length) {
return received;
}
int Win32Socket::RecvFrom(void* buffer, size_t length,
SocketAddress* out_addr) {
int Win32Socket::RecvFrom(void* buffer,
size_t length,
SocketAddress* out_addr,
int64_t* timestamp) {
if (timestamp) {
*timestamp = -1;
}
sockaddr_storage saddr;
socklen_t addr_len = sizeof(saddr);
int received = ::recvfrom(socket_, static_cast<char*>(buffer),

View File

@ -44,8 +44,11 @@ class Win32Socket : public AsyncSocket {
virtual int Connect(const SocketAddress& addr);
virtual int Send(const void *buffer, size_t length);
virtual int SendTo(const void *buffer, size_t length, const SocketAddress& addr);
virtual int Recv(void *buffer, size_t length);
virtual int RecvFrom(void *buffer, size_t length, SocketAddress *out_addr);
virtual int Recv(void* buffer, size_t length, int64_t* timestamp);
virtual int RecvFrom(void* buffer,
size_t length,
SocketAddress* out_addr,
int64_t* timestamp);
virtual int Listen(int backlog);
virtual Win32Socket *Accept(SocketAddress *out_addr);
virtual int Close();

View File

@ -296,7 +296,7 @@ bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
size_t* content_length) {
char buffer[0xffff];
do {
int bytes = socket->Recv(buffer, sizeof(buffer));
int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
if (bytes <= 0)
break;
data->append(buffer, bytes);

View File

@ -179,7 +179,7 @@ bool XmppSocket::Connect(const rtc::SocketAddress& addr) {
bool XmppSocket::Read(char * data, size_t len, size_t* len_read) {
#ifndef USE_SSLSTREAM
int read = cricket_socket_->Recv(data, len);
int read = cricket_socket_->Recv(data, len, nullptr);
if (read > 0) {
*len_read = (size_t)read;
return true;

View File

@ -19,7 +19,7 @@
// The below define selects the SSLStreamAdapter implementation for
// SSL, as opposed to the SSLAdapter socket adapter.
// #define USE_SSLSTREAM
// #define USE_SSLSTREAM
namespace rtc {
class StreamInterface;