Delete leftovers of synchronous code path in VirtualSocketServer

Followup to https://webrtc-review.googlesource.com/c/src/+/227031.

Bug: webrtc:13065
Change-Id: Ifa8943e81bd90c19807d2fc55768201c915726d8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/229185
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34815}
This commit is contained in:
Niels Möller 2021-08-19 10:13:31 +02:00 committed by WebRTC LUCI CQ
parent 8167c2fa59
commit ea423a5b8d
2 changed files with 134 additions and 160 deletions

View File

@ -92,13 +92,9 @@ struct MessageAddress : public MessageData {
SocketAddress addr;
};
VirtualSocket::VirtualSocket(VirtualSocketServer* server,
int family,
int type,
bool async)
VirtualSocket::VirtualSocket(VirtualSocketServer* server, int family, int type)
: server_(server),
type_(type),
async_(async),
state_(CS_CLOSED),
error_(0),
listen_queue_(nullptr),
@ -107,8 +103,6 @@ VirtualSocket::VirtualSocket(VirtualSocketServer* server,
bound_(false),
was_any_(false) {
RTC_DCHECK((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
RTC_DCHECK(async_ ||
(type_ != SOCK_STREAM)); // We only support async streams
server->SignalReadyToSend.connect(this,
&VirtualSocket::OnSocketServerReadyToSend);
}
@ -235,13 +229,8 @@ int VirtualSocket::RecvFrom(void* pv,
webrtc::MutexLock lock(&mutex_);
// If we don't have a packet, then either error or wait for one to arrive.
if (recv_buffer_.empty()) {
if (async_) {
error_ = EAGAIN;
return -1;
}
while (recv_buffer_.empty()) {
server_->ProcessOneMessage();
}
error_ = EAGAIN;
return -1;
}
// Return the packet at the front of the queue.
@ -295,7 +284,7 @@ VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
return nullptr;
}
while (!listen_queue_->empty()) {
VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_, async_);
VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_);
// Set the new local address to the same as this server socket.
socket->SetLocalAddress(local_addr_);
@ -356,16 +345,16 @@ void VirtualSocket::OnMessage(Message* pmsg) {
Packet* packet = static_cast<Packet*>(pmsg->pdata);
recv_buffer_.push_back(packet);
signal_read_event = async_;
signal_read_event = true;
} else if (pmsg->message_id == MSG_ID_CONNECT) {
RTC_DCHECK(nullptr != pmsg->pdata);
MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
if (listen_queue_ != nullptr) {
listen_queue_->push_back(data->addr);
signal_read_event = async_;
signal_read_event = true;
} else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
CompleteConnect(data->addr);
signal_connect_event = async_;
signal_connect_event = true;
} else {
RTC_LOG(LS_VERBOSE)
<< "Socket at " << local_addr_.ToString() << " is not listening";
@ -378,7 +367,7 @@ void VirtualSocket::OnMessage(Message* pmsg) {
error_to_signal = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
state_ = CS_CLOSED;
remote_addr_.Clear();
signal_close_event = async_;
signal_close_event = true;
}
} else if (pmsg->message_id == MSG_ID_SIGNALREADEVENT) {
signal_read_event = !recv_buffer_.empty();
@ -610,12 +599,8 @@ void VirtualSocketServer::SetSendingBlocked(bool blocked) {
}
}
Socket* VirtualSocketServer::CreateSocket(int family, int type) {
return CreateSocketInternal(family, type);
}
VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
return new VirtualSocket(this, family, type, true);
VirtualSocket* VirtualSocketServer::CreateSocket(int family, int type) {
return new VirtualSocket(this, family, type);
}
void VirtualSocketServer::SetMessageQueue(Thread* msg_queue) {
@ -881,12 +866,6 @@ void VirtualSocketServer::Clear(VirtualSocket* socket) {
}
}
void VirtualSocketServer::ProcessOneMessage() {
Message msg;
msg_queue_->Get(&msg);
msg_queue_->Dispatch(&msg);
}
void VirtualSocketServer::PostSignalReadEvent(VirtualSocket* socket) {
// Clear the message so it doesn't end up posted multiple times.
msg_queue_->Clear(socket, MSG_ID_SIGNALREADEVENT);
@ -928,7 +907,7 @@ int VirtualSocketServer::SendUdp(VirtualSocket* socket,
if (!recipient) {
// Make a fake recipient for address family checking.
std::unique_ptr<VirtualSocket> dummy_socket(
CreateSocketInternal(AF_INET, SOCK_DGRAM));
CreateSocket(AF_INET, SOCK_DGRAM));
dummy_socket->SetLocalAddress(remote_addr);
if (!CanInteractWith(socket, dummy_socket.get())) {
RTC_LOG(LS_VERBOSE) << "Incompatible address families: "

View File

@ -26,9 +26,130 @@
namespace rtc {
class Packet;
class VirtualSocket;
class VirtualSocketServer;
class SocketAddressPair;
// Implements the socket interface using the virtual network. Packets are
// passed as messages using the message queue of the socket server.
class VirtualSocket : public Socket,
public MessageHandler,
public sigslot::has_slots<> {
public:
VirtualSocket(VirtualSocketServer* server, int family, int type);
~VirtualSocket() override;
SocketAddress GetLocalAddress() const override;
SocketAddress GetRemoteAddress() const override;
int Bind(const SocketAddress& addr) override;
int Connect(const SocketAddress& addr) override;
int Close() override;
int Send(const void* pv, size_t cb) override;
int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override;
int Recv(void* pv, size_t cb, int64_t* timestamp) override;
int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) override;
int Listen(int backlog) override;
VirtualSocket* Accept(SocketAddress* paddr) override;
int GetError() const override;
void SetError(int error) override;
ConnState GetState() const override;
int GetOption(Option opt, int* value) override;
int SetOption(Option opt, int value) override;
void OnMessage(Message* pmsg) override;
size_t recv_buffer_size() const { return recv_buffer_size_; }
size_t send_buffer_size() const { return send_buffer_.size(); }
const char* send_buffer_data() const { return send_buffer_.data(); }
// Used by server sockets to set the local address without binding.
void SetLocalAddress(const SocketAddress& addr);
bool was_any() { return was_any_; }
void set_was_any(bool was_any) { was_any_ = was_any; }
void SetToBlocked();
void UpdateRecv(size_t data_size);
void UpdateSend(size_t data_size);
void MaybeSignalWriteEvent(size_t capacity);
// Adds a packet to be sent. Returns delay, based on network_size_.
uint32_t AddPacket(int64_t cur_time, size_t packet_size);
int64_t UpdateOrderedDelivery(int64_t ts);
// Removes stale packets from the network. Returns current size.
size_t PurgeNetworkPackets(int64_t cur_time);
private:
struct NetworkEntry {
size_t size;
int64_t done_time;
};
typedef std::deque<SocketAddress> ListenQueue;
typedef std::deque<NetworkEntry> NetworkQueue;
typedef std::vector<char> SendBuffer;
typedef std::list<Packet*> RecvBuffer;
typedef std::map<Option, int> OptionsMap;
int InitiateConnect(const SocketAddress& addr, bool use_delay);
void CompleteConnect(const SocketAddress& addr);
int SendUdp(const void* pv, size_t cb, const SocketAddress& addr);
int SendTcp(const void* pv, size_t cb);
void OnSocketServerReadyToSend();
VirtualSocketServer* const server_;
const int type_;
ConnState state_;
int error_;
SocketAddress local_addr_;
SocketAddress remote_addr_;
// Pending sockets which can be Accepted
std::unique_ptr<ListenQueue> listen_queue_ RTC_GUARDED_BY(mutex_)
RTC_PT_GUARDED_BY(mutex_);
// Data which tcp has buffered for sending
SendBuffer send_buffer_;
// Set to false if the last attempt to send resulted in EWOULDBLOCK.
// Set back to true when the socket can send again.
bool ready_to_send_ = true;
// Mutex to protect the recv_buffer and listen_queue_
webrtc::Mutex mutex_;
// Network model that enforces bandwidth and capacity constraints
NetworkQueue network_;
size_t network_size_;
// The scheduled delivery time of the last packet sent on this socket.
// It is used to ensure ordered delivery of packets sent on this socket.
int64_t last_delivery_time_ = 0;
// Data which has been received from the network
RecvBuffer recv_buffer_ RTC_GUARDED_BY(mutex_);
// The amount of data which is in flight or in recv_buffer_
size_t recv_buffer_size_;
// Is this socket bound?
bool bound_;
// When we bind a socket to Any, VSS's Bind gives it another address. For
// dual-stack sockets, we want to distinguish between sockets that were
// explicitly given a particular address and sockets that had one picked
// for them by VSS.
bool was_any_;
// Store the options that are set
OptionsMap options_map_;
};
// Simulates a network in the same manner as a loopback interface. The
// interface can create as many addresses as you want. All of the sockets
// created by this network will be able to communicate with one another, unless
@ -115,7 +236,7 @@ class VirtualSocketServer : public SocketServer {
void SetSendingBlocked(bool blocked);
// SocketFactory:
Socket* CreateSocket(int family, int type) override;
VirtualSocket* CreateSocket(int family, int type) override;
// SocketServer:
void SetMessageQueue(Thread* queue) override;
@ -209,8 +330,6 @@ class VirtualSocketServer : public SocketServer {
// Clear incoming messages for a socket that is being closed.
void Clear(VirtualSocket* socket);
void ProcessOneMessage();
void PostSignalReadEvent(VirtualSocket* socket);
// Sending was previously blocked, but now isn't.
@ -226,8 +345,6 @@ class VirtualSocketServer : public SocketServer {
private:
uint16_t GetNextPort();
VirtualSocket* CreateSocketInternal(int family, int type);
// Find the socket pair corresponding to this server address.
VirtualSocket* LookupConnection(const SocketAddress& client,
const SocketAddress& server);
@ -323,128 +440,6 @@ class VirtualSocketServer : public SocketServer {
RTC_DISALLOW_COPY_AND_ASSIGN(VirtualSocketServer);
};
// Implements the socket interface using the virtual network. Packets are
// passed as messages using the message queue of the socket server.
class VirtualSocket : public Socket,
public MessageHandler,
public sigslot::has_slots<> {
public:
VirtualSocket(VirtualSocketServer* server, int family, int type, bool async);
~VirtualSocket() override;
SocketAddress GetLocalAddress() const override;
SocketAddress GetRemoteAddress() const override;
int Bind(const SocketAddress& addr) override;
int Connect(const SocketAddress& addr) override;
int Close() override;
int Send(const void* pv, size_t cb) override;
int SendTo(const void* pv, size_t cb, const SocketAddress& addr) override;
int Recv(void* pv, size_t cb, int64_t* timestamp) override;
int RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) override;
int Listen(int backlog) override;
VirtualSocket* Accept(SocketAddress* paddr) override;
int GetError() const override;
void SetError(int error) override;
ConnState GetState() const override;
int GetOption(Option opt, int* value) override;
int SetOption(Option opt, int value) override;
void OnMessage(Message* pmsg) override;
size_t recv_buffer_size() const { return recv_buffer_size_; }
size_t send_buffer_size() const { return send_buffer_.size(); }
const char* send_buffer_data() const { return send_buffer_.data(); }
// Used by server sockets to set the local address without binding.
void SetLocalAddress(const SocketAddress& addr);
bool was_any() { return was_any_; }
void set_was_any(bool was_any) { was_any_ = was_any; }
void SetToBlocked();
void UpdateRecv(size_t data_size);
void UpdateSend(size_t data_size);
void MaybeSignalWriteEvent(size_t capacity);
// Adds a packet to be sent. Returns delay, based on network_size_.
uint32_t AddPacket(int64_t cur_time, size_t packet_size);
int64_t UpdateOrderedDelivery(int64_t ts);
// Removes stale packets from the network. Returns current size.
size_t PurgeNetworkPackets(int64_t cur_time);
private:
struct NetworkEntry {
size_t size;
int64_t done_time;
};
typedef std::deque<SocketAddress> ListenQueue;
typedef std::deque<NetworkEntry> NetworkQueue;
typedef std::vector<char> SendBuffer;
typedef std::list<Packet*> RecvBuffer;
typedef std::map<Option, int> OptionsMap;
int InitiateConnect(const SocketAddress& addr, bool use_delay);
void CompleteConnect(const SocketAddress& addr);
int SendUdp(const void* pv, size_t cb, const SocketAddress& addr);
int SendTcp(const void* pv, size_t cb);
void OnSocketServerReadyToSend();
VirtualSocketServer* const server_;
const int type_;
const bool async_;
ConnState state_;
int error_;
SocketAddress local_addr_;
SocketAddress remote_addr_;
// Pending sockets which can be Accepted
std::unique_ptr<ListenQueue> listen_queue_ RTC_GUARDED_BY(mutex_)
RTC_PT_GUARDED_BY(mutex_);
// Data which tcp has buffered for sending
SendBuffer send_buffer_;
// Set to false if the last attempt to send resulted in EWOULDBLOCK.
// Set back to true when the socket can send again.
bool ready_to_send_ = true;
// Mutex to protect the recv_buffer and listen_queue_
webrtc::Mutex mutex_;
// Network model that enforces bandwidth and capacity constraints
NetworkQueue network_;
size_t network_size_;
// The scheduled delivery time of the last packet sent on this socket.
// It is used to ensure ordered delivery of packets sent on this socket.
int64_t last_delivery_time_ = 0;
// Data which has been received from the network
RecvBuffer recv_buffer_ RTC_GUARDED_BY(mutex_);
// The amount of data which is in flight or in recv_buffer_
size_t recv_buffer_size_;
// Is this socket bound?
bool bound_;
// When we bind a socket to Any, VSS's Bind gives it another address. For
// dual-stack sockets, we want to distinguish between sockets that were
// explicitly given a particular address and sockets that had one picked
// for them by VSS.
bool was_any_;
// Store the options that are set
OptionsMap options_map_;
};
} // namespace rtc
#endif // RTC_BASE_VIRTUAL_SOCKET_SERVER_H_