diff --git a/webrtc/base/testclient.cc b/webrtc/base/testclient.cc index fcf64e7d62..361bf36261 100644 --- a/webrtc/base/testclient.cc +++ b/webrtc/base/testclient.cc @@ -19,7 +19,7 @@ namespace rtc { // NextPacket. TestClient::TestClient(AsyncPacketSocket* socket) - : socket_(socket), ready_to_send_(false), prev_packet_timestamp_(-1) { + : socket_(socket), prev_packet_timestamp_(-1) { packets_ = new std::vector(); socket_->SignalReadPacket.connect(this, &TestClient::OnPacket); socket_->SignalReadyToSend.connect(this, &TestClient::OnReadyToSend); @@ -130,10 +130,6 @@ int TestClient::SetOption(Socket::Option opt, int value) { return socket_->SetOption(opt, value); } -bool TestClient::ready_to_send() const { - return ready_to_send_; -} - void TestClient::OnPacket(AsyncPacketSocket* socket, const char* buf, size_t size, const SocketAddress& remote_addr, const PacketTime& packet_time) { @@ -142,7 +138,7 @@ void TestClient::OnPacket(AsyncPacketSocket* socket, const char* buf, } void TestClient::OnReadyToSend(AsyncPacketSocket* socket) { - ready_to_send_ = true; + ++ready_to_send_count_; } TestClient::Packet::Packet(const SocketAddress& a, diff --git a/webrtc/base/testclient.h b/webrtc/base/testclient.h index 6c0cfbecaf..74ef8cc376 100644 --- a/webrtc/base/testclient.h +++ b/webrtc/base/testclient.h @@ -77,7 +77,10 @@ class TestClient : public sigslot::has_slots<> { int GetError(); int SetOption(Socket::Option opt, int value); - bool ready_to_send() const; + bool ready_to_send() const { return ready_to_send_count() > 0; } + + // How many times SignalReadyToSend has been fired. + int ready_to_send_count() const { return ready_to_send_count_; } private: // Timeout for reads when no packet is expected. @@ -94,7 +97,7 @@ class TestClient : public sigslot::has_slots<> { CriticalSection crit_; AsyncPacketSocket* socket_; std::vector* packets_; - bool ready_to_send_; + int ready_to_send_count_ = 0; int64_t prev_packet_timestamp_; RTC_DISALLOW_COPY_AND_ASSIGN(TestClient); }; diff --git a/webrtc/base/virtualsocket_unittest.cc b/webrtc/base/virtualsocket_unittest.cc index f517346f5a..4ee2c75617 100644 --- a/webrtc/base/virtualsocket_unittest.cc +++ b/webrtc/base/virtualsocket_unittest.cc @@ -1018,10 +1018,73 @@ TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) { true); } +TEST_F(VirtualSocketServerTest, SetSendingBlockedWithUdpSocket) { + AsyncSocket* socket1 = + ss_->CreateAsyncSocket(kIPv4AnyAddress.family(), SOCK_DGRAM); + AsyncSocket* socket2 = + ss_->CreateAsyncSocket(kIPv4AnyAddress.family(), SOCK_DGRAM); + socket1->Bind(kIPv4AnyAddress); + socket2->Bind(kIPv4AnyAddress); + TestClient* client1 = new TestClient(new AsyncUDPSocket(socket1)); + + ss_->SetSendingBlocked(true); + EXPECT_EQ(-1, client1->SendTo("foo", 3, socket2->GetLocalAddress())); + EXPECT_TRUE(socket1->IsBlocking()); + EXPECT_EQ(0, client1->ready_to_send_count()); + + ss_->SetSendingBlocked(false); + EXPECT_EQ(1, client1->ready_to_send_count()); + EXPECT_EQ(3, client1->SendTo("foo", 3, socket2->GetLocalAddress())); +} + +TEST_F(VirtualSocketServerTest, SetSendingBlockedWithTcpSocket) { + constexpr size_t kBufferSize = 1024; + ss_->set_send_buffer_capacity(kBufferSize); + ss_->set_recv_buffer_capacity(kBufferSize); + + testing::StreamSink sink; + AsyncSocket* socket1 = + ss_->CreateAsyncSocket(kIPv4AnyAddress.family(), SOCK_STREAM); + AsyncSocket* socket2 = + ss_->CreateAsyncSocket(kIPv4AnyAddress.family(), SOCK_STREAM); + sink.Monitor(socket1); + sink.Monitor(socket2); + socket1->Bind(kIPv4AnyAddress); + socket2->Bind(kIPv4AnyAddress); + + // Connect sockets. + EXPECT_EQ(0, socket1->Connect(socket2->GetLocalAddress())); + EXPECT_EQ(0, socket2->Connect(socket1->GetLocalAddress())); + ss_->ProcessMessagesUntilIdle(); + + char data[kBufferSize] = {}; + + // First Send call will fill the send buffer but not send anything. + ss_->SetSendingBlocked(true); + EXPECT_EQ(static_cast(kBufferSize), socket1->Send(data, kBufferSize)); + ss_->ProcessMessagesUntilIdle(); + EXPECT_FALSE(sink.Check(socket1, testing::SSE_WRITE)); + EXPECT_FALSE(sink.Check(socket2, testing::SSE_READ)); + EXPECT_FALSE(socket1->IsBlocking()); + + // Since the send buffer is full, next Send will result in EWOULDBLOCK. + EXPECT_EQ(-1, socket1->Send(data, kBufferSize)); + EXPECT_FALSE(sink.Check(socket1, testing::SSE_WRITE)); + EXPECT_FALSE(sink.Check(socket2, testing::SSE_READ)); + EXPECT_TRUE(socket1->IsBlocking()); + + // When sending is unblocked, the buffered data should be sent and + // SignalWriteEvent should fire. + ss_->SetSendingBlocked(false); + ss_->ProcessMessagesUntilIdle(); + EXPECT_TRUE(sink.Check(socket1, testing::SSE_WRITE)); + EXPECT_TRUE(sink.Check(socket2, testing::SSE_READ)); +} + TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) { const uint32_t kTestMean[] = {10, 100, 333, 1000}; const double kTestDev[] = { 0.25, 0.1, 0.01 }; - // TODO: The current code only works for 1000 data points or more. + // TODO(deadbeef): The current code only works for 1000 data points or more. const uint32_t kTestSamples[] = {/*10, 100,*/ 1000}; for (size_t midx = 0; midx < arraysize(kTestMean); ++midx) { for (size_t didx = 0; didx < arraysize(kTestDev); ++didx) { diff --git a/webrtc/base/virtualsocketserver.cc b/webrtc/base/virtualsocketserver.cc index ef3c6e5329..87775cd63c 100644 --- a/webrtc/base/virtualsocketserver.cc +++ b/webrtc/base/virtualsocketserver.cc @@ -65,7 +65,7 @@ class Packet : public MessageData { public: Packet(const char* data, size_t size, const SocketAddress& from) : size_(size), consumed_(0), from_(from) { - ASSERT(NULL != data); + RTC_DCHECK(NULL != data); data_ = new char[size_]; memcpy(data_, data, size_); } @@ -80,7 +80,7 @@ class Packet : public MessageData { // Remove the first size bytes from the data. void Consume(size_t size) { - ASSERT(size + consumed_ < size_); + RTC_DCHECK(size + consumed_ < size_); consumed_ += size; } @@ -105,13 +105,15 @@ VirtualSocket::VirtualSocket(VirtualSocketServer* server, state_(CS_CLOSED), error_(0), listen_queue_(NULL), - write_enabled_(false), network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) { - ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); - ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams + RTC_DCHECK((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); + RTC_DCHECK(async_ || + (type_ != SOCK_STREAM)); // We only support async streams + server->SignalReadyToSend.connect(this, + &VirtualSocket::OnSocketServerReadyToSend); } VirtualSocket::~VirtualSocket() { @@ -209,7 +211,7 @@ int VirtualSocket::Close() { server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); } for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { - ASSERT(NULL != it->pdata); + RTC_DCHECK(NULL != it->pdata); MessageAddress* data = static_cast(it->pdata); // Lookup remote side. @@ -307,7 +309,7 @@ int VirtualSocket::RecvFrom(void* pv, recv_buffer_size_ -= data_read; if (was_full) { VirtualSocket* sender = server_->LookupBinding(remote_addr_); - ASSERT(NULL != sender); + RTC_DCHECK(NULL != sender); server_->SendTcp(sender); } } @@ -316,13 +318,13 @@ int VirtualSocket::RecvFrom(void* pv, } int VirtualSocket::Listen(int backlog) { - ASSERT(SOCK_STREAM == type_); - ASSERT(CS_CLOSED == state_); + RTC_DCHECK(SOCK_STREAM == type_); + RTC_DCHECK(CS_CLOSED == state_); if (local_addr_.IsNil()) { error_ = EINVAL; return -1; } - ASSERT(NULL == listen_queue_); + RTC_DCHECK(NULL == listen_queue_); listen_queue_ = new ListenQueue; state_ = CS_CONNECTING; return 0; @@ -392,8 +394,7 @@ int VirtualSocket::EstimateMTU(uint16_t* mtu) { void VirtualSocket::OnMessage(Message* pmsg) { if (pmsg->message_id == MSG_ID_PACKET) { - // ASSERT(!local_addr_.IsAnyIP()); - ASSERT(NULL != pmsg->pdata); + RTC_DCHECK(NULL != pmsg->pdata); Packet* packet = static_cast(pmsg->pdata); recv_buffer_.push_back(packet); @@ -402,7 +403,7 @@ void VirtualSocket::OnMessage(Message* pmsg) { SignalReadEvent(this); } } else if (pmsg->message_id == MSG_ID_CONNECT) { - ASSERT(NULL != pmsg->pdata); + RTC_DCHECK(NULL != pmsg->pdata); MessageAddress* data = static_cast(pmsg->pdata); if (listen_queue_ != NULL) { listen_queue_->push_back(data->addr); @@ -417,7 +418,7 @@ void VirtualSocket::OnMessage(Message* pmsg) { } delete data; } else if (pmsg->message_id == MSG_ID_DISCONNECT) { - ASSERT(SOCK_STREAM == type_); + RTC_DCHECK(SOCK_STREAM == type_); if (CS_CLOSED != state_) { int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; state_ = CS_CLOSED; @@ -429,7 +430,7 @@ void VirtualSocket::OnMessage(Message* pmsg) { } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { SignalAddressReady(this, GetLocalAddress()); } else { - ASSERT(false); + RTC_DCHECK(false); } } @@ -465,7 +466,7 @@ int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { } void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { - ASSERT(CS_CONNECTING == state_); + RTC_DCHECK(CS_CONNECTING == state_); remote_addr_ = addr; state_ = CS_CONNECTED; server_->AddConnection(remote_addr_, local_addr_, this); @@ -495,7 +496,7 @@ int VirtualSocket::SendUdp(const void* pv, int VirtualSocket::SendTcp(const void* pv, size_t cb) { size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); if (0 == capacity) { - write_enabled_ = true; + ready_to_send_ = false; error_ = EWOULDBLOCK; return -1; } @@ -506,6 +507,22 @@ int VirtualSocket::SendTcp(const void* pv, size_t cb) { return static_cast(consumed); } +void VirtualSocket::OnSocketServerReadyToSend() { + if (ready_to_send_) { + // This socket didn't encounter EWOULDBLOCK, so there's nothing to do. + return; + } + if (type_ == SOCK_DGRAM) { + ready_to_send_ = true; + SignalWriteEvent(this); + } else { + RTC_DCHECK(type_ == SOCK_STREAM); + // This will attempt to empty the full send buffer, and will fire + // SignalWriteEvent if successful. + server_->SendTcp(this); + } +} + VirtualSocketServer::VirtualSocketServer(SocketServer* ss) : server_(ss), server_owned_(false), @@ -567,6 +584,19 @@ uint16_t VirtualSocketServer::GetNextPort() { return port; } +void VirtualSocketServer::SetSendingBlocked(bool blocked) { + if (blocked == sending_blocked_) { + // Unchanged; nothing to do. + return; + } + sending_blocked_ = blocked; + if (!sending_blocked_) { + // Sending was blocked, but is now unblocked. This signal gives sockets a + // chance to fire SignalWriteEvent, and for TCP, send buffered data. + SignalReadyToSend(); + } +} + Socket* VirtualSocketServer::CreateSocket(int type) { return CreateSocket(AF_INET, type); } @@ -598,7 +628,7 @@ void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { } bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { - ASSERT(msg_queue_ == Thread::Current()); + RTC_DCHECK(msg_queue_ == Thread::Current()); if (stop_on_idle_ && Thread::Current()->empty()) { return false; } @@ -610,7 +640,7 @@ void VirtualSocketServer::WakeUp() { } bool VirtualSocketServer::ProcessMessagesUntilIdle() { - ASSERT(msg_queue_ == Thread::Current()); + RTC_DCHECK(msg_queue_ == Thread::Current()); stop_on_idle_ = true; while (!msg_queue_->empty()) { Message msg; @@ -644,10 +674,10 @@ bool VirtualSocketServer::CloseTcpConnections( int VirtualSocketServer::Bind(VirtualSocket* socket, const SocketAddress& addr) { - ASSERT(NULL != socket); + RTC_DCHECK(NULL != socket); // Address must be completely specified at this point - ASSERT(!IPIsUnspec(addr.ipaddr())); - ASSERT(addr.port() != 0); + RTC_DCHECK(!IPIsUnspec(addr.ipaddr())); + RTC_DCHECK(addr.port() != 0); // Normalize the address (turns v6-mapped addresses into v4-addresses). SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); @@ -657,12 +687,12 @@ int VirtualSocketServer::Bind(VirtualSocket* socket, } int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { - ASSERT(NULL != socket); + RTC_DCHECK(NULL != socket); if (!IPIsUnspec(addr->ipaddr())) { addr->SetIP(addr->ipaddr().Normalized()); } else { - ASSERT(false); + RTC_DCHECK(false); } if (addr->port() == 0) { @@ -703,7 +733,7 @@ int VirtualSocketServer::Unbind(const SocketAddress& addr, VirtualSocket* socket) { SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); - ASSERT((*bindings_)[normalized] == socket); + RTC_DCHECK((*bindings_)[normalized] == socket); bindings_->erase(bindings_->find(normalized)); return 0; } @@ -784,6 +814,13 @@ bool VirtualSocketServer::Disconnect(VirtualSocket* socket) { int VirtualSocketServer::SendUdp(VirtualSocket* socket, const char* data, size_t data_size, const SocketAddress& remote_addr) { + if (sending_blocked_) { + CritScope cs(&socket->crit_); + socket->ready_to_send_ = false; + socket->error_ = EWOULDBLOCK; + return -1; + } + // See if we want to drop this packet. if (Random() < drop_prob_) { LOG(LS_VERBOSE) << "Dropping packet: bad luck"; @@ -811,32 +848,40 @@ int VirtualSocketServer::SendUdp(VirtualSocket* socket, return -1; } - CritScope cs(&socket->crit_); + { + CritScope cs(&socket->crit_); - int64_t cur_time = TimeMillis(); - PurgeNetworkPackets(socket, cur_time); + int64_t cur_time = TimeMillis(); + PurgeNetworkPackets(socket, cur_time); - // Determine whether we have enough bandwidth to accept this packet. To do - // this, we need to update the send queue. Once we know it's current size, - // we know whether we can fit this packet. - // - // NOTE: There are better algorithms for maintaining such a queue (such as - // "Derivative Random Drop"); however, this algorithm is a more accurate - // simulation of what a normal network would do. + // Determine whether we have enough bandwidth to accept this packet. To do + // this, we need to update the send queue. Once we know it's current size, + // we know whether we can fit this packet. + // + // NOTE: There are better algorithms for maintaining such a queue (such as + // "Derivative Random Drop"); however, this algorithm is a more accurate + // simulation of what a normal network would do. + + size_t packet_size = data_size + UDP_HEADER_SIZE; + if (socket->network_size_ + packet_size > network_capacity_) { + LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; + return static_cast(data_size); + } + + AddPacketToNetwork(socket, recipient, cur_time, data, data_size, + UDP_HEADER_SIZE, false); - size_t packet_size = data_size + UDP_HEADER_SIZE; - if (socket->network_size_ + packet_size > network_capacity_) { - LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; return static_cast(data_size); } - - AddPacketToNetwork(socket, recipient, cur_time, data, data_size, - UDP_HEADER_SIZE, false); - - return static_cast(data_size); } void VirtualSocketServer::SendTcp(VirtualSocket* socket) { + if (sending_blocked_) { + // Eventually the socket's buffer will fill and VirtualSocket::SendTcp will + // set EWOULDBLOCK. + return; + } + // TCP can't send more data than will fill up the receiver's buffer. // We track the data that is in the buffer plus data in flight using the // recipient's recv_buffer_size_. Anything beyond that must be stored in the @@ -879,9 +924,9 @@ void VirtualSocketServer::SendTcp(VirtualSocket* socket) { socket->send_buffer_.resize(new_buffer_size); } - if (socket->write_enabled_ - && (socket->send_buffer_.size() < send_buffer_capacity_)) { - socket->write_enabled_ = false; + if (!socket->ready_to_send_ && + (socket->send_buffer_.size() < send_buffer_capacity_)) { + socket->ready_to_send_ = true; socket->SignalWriteEvent(socket); } } @@ -931,7 +976,7 @@ void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, int64_t cur_time) { while (!socket->network_.empty() && (socket->network_.front().done_time <= cur_time)) { - ASSERT(socket->network_size_ >= socket->network_.front().size); + RTC_DCHECK(socket->network_size_ >= socket->network_.front().size); socket->network_size_ -= socket->network_.front().size; socket->network_.pop_front(); } @@ -1036,7 +1081,7 @@ struct FunctionDomainCmp { }; VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { - ASSERT(f->size() >= 1); + RTC_DCHECK(f->size() >= 1); double v = 0; for (Function::size_type i = 0; i < f->size() - 1; ++i) { double dx = (*f)[i + 1].first - (*f)[i].first; @@ -1078,7 +1123,7 @@ double VirtualSocketServer::Evaluate(Function* f, double x) { if (iter == f->begin()) { return (*f)[0].second; } else if (iter == f->end()) { - ASSERT(f->size() >= 1); + RTC_DCHECK(f->size() >= 1); return (*f)[f->size() - 1].second; } else if (iter->first == x) { return iter->second; diff --git a/webrtc/base/virtualsocketserver.h b/webrtc/base/virtualsocketserver.h index 8673d40f83..565222bfd2 100644 --- a/webrtc/base/virtualsocketserver.h +++ b/webrtc/base/virtualsocketserver.h @@ -90,6 +90,16 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> { drop_prob_ = drop_prob; } + // If |blocked| is true, subsequent attempts to send will result in -1 being + // returned, with the socket error set to EWOULDBLOCK. + // + // If this method is later called with |blocked| set to false, any sockets + // that previously failed to send with EWOULDBLOCK will emit SignalWriteEvent. + // + // This can be used to simulate the send buffer on a network interface being + // full, and test functionality related to EWOULDBLOCK/SignalWriteEvent. + void SetSendingBlocked(bool blocked); + // SocketFactory: Socket* CreateSocket(int type) override; Socket* CreateSocket(int family, int type) override; @@ -223,6 +233,9 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> { private: friend class VirtualSocket; + // Sending was previously blocked, but now isn't. + sigslot::signal0<> SignalReadyToSend; + typedef std::map AddressMap; typedef std::map ConnectionMap; @@ -251,12 +264,15 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> { CriticalSection delay_crit_; double drop_prob_; + bool sending_blocked_ = false; RTC_DISALLOW_COPY_AND_ASSIGN(VirtualSocketServer); }; // Implements the socket interface using the virtual network. Packets are // passed as messages using the message queue of the socket server. -class VirtualSocket : public AsyncSocket, public MessageHandler { +class VirtualSocket : public AsyncSocket, + public MessageHandler, + public sigslot::has_slots<> { public: VirtualSocket(VirtualSocketServer* server, int family, int type, bool async); ~VirtualSocket() override; @@ -316,6 +332,8 @@ class VirtualSocket : public AsyncSocket, public MessageHandler { // Used by server sockets to set the local address without binding. void SetLocalAddress(const SocketAddress& addr); + void OnSocketServerReadyToSend(); + VirtualSocketServer* server_; int type_; bool async_; @@ -330,7 +348,9 @@ class VirtualSocket : public AsyncSocket, public MessageHandler { // Data which tcp has buffered for sending SendBuffer send_buffer_; - bool write_enabled_; + // Set to false if the last attempt to send resulted in EWOULDBLOCK. + // Set back to true when the socket can send again. + bool ready_to_send_ = true; // Critical section to protect the recv_buffer and queue_ CriticalSection crit_;