Give PseudoTcp its own FifoBuffer.

The PseudoTcp test class is being used outside of WebRTC in ways
that WebRTC itself doesn't, which caused this revert:
https://webrtc-review.googlesource.com/c/src/+/186564

As it happens though, PseudoTcp doesn't actually use the
StreamInterface part of FifoBuffer, so this CL cuts the dependency
from PseudoTcp on FifoBuffer.

Moving forward, we could just remove this class from WebRTC.

Bug: webrtc:11988
Change-Id: Id34a2a6305e8fe37d705ba5e8876dd6398515125
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/186665
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32309}
This commit is contained in:
Tomas Gunnarsson 2020-10-05 13:44:39 +02:00 committed by Commit Bot
parent c401923f3e
commit 8e4cda7de0
3 changed files with 207 additions and 32 deletions

View File

@ -107,6 +107,7 @@ rtc_library("rtc_p2p") {
"../rtc_base:weak_ptr",
"../rtc_base/memory:fifo_buffer",
"../rtc_base/network:sent_packet",
"../rtc_base/synchronization:mutex",
"../rtc_base/system:rtc_export",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/third_party/base64",

View File

@ -12,6 +12,7 @@
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <algorithm>
#include <cstdint>
@ -402,9 +403,7 @@ uint32_t PseudoTcp::GetBytesInFlight() const {
}
uint32_t PseudoTcp::GetBytesBufferedNotSent() const {
size_t buffered_bytes = 0;
m_sbuf.GetBuffered(&buffered_bytes);
return static_cast<uint32_t>(m_snd_una + buffered_bytes - m_snd_nxt);
return static_cast<uint32_t>(m_snd_una + m_sbuf.GetBuffered() - m_snd_nxt);
}
uint32_t PseudoTcp::GetRoundTripTimeEstimateMs() const {
@ -422,15 +421,11 @@ int PseudoTcp::Recv(char* buffer, size_t len) {
}
size_t read = 0;
rtc::StreamResult result = m_rbuf.Read(buffer, len, &read, NULL);
// If there's no data in |m_rbuf|.
if (result == rtc::SR_BLOCK) {
if (!m_rbuf.Read(buffer, len, &read)) {
m_bReadEnable = true;
m_error = EWOULDBLOCK;
return SOCKET_ERROR;
}
RTC_DCHECK(result == rtc::SR_SUCCESS);
size_t available_space = 0;
m_rbuf.GetWriteRemaining(&available_space);
@ -497,14 +492,13 @@ uint32_t PseudoTcp::queue(const char* data, uint32_t len, bool bCtrl) {
(m_slist.back().xmit == 0)) {
m_slist.back().len += len;
} else {
size_t snd_buffered = 0;
m_sbuf.GetBuffered(&snd_buffered);
SSegment sseg(static_cast<uint32_t>(m_snd_una + snd_buffered), len, bCtrl);
SSegment sseg(static_cast<uint32_t>(m_snd_una + m_sbuf.GetBuffered()), len,
bCtrl);
m_slist.push_back(sseg);
}
size_t written = 0;
m_sbuf.Write(data, len, &written, NULL);
m_sbuf.Write(data, len, &written);
return static_cast<uint32_t>(written);
}
@ -532,9 +526,9 @@ IPseudoTcpNotify::WriteResult PseudoTcp::packet(uint32_t seq,
if (len) {
size_t bytes_read = 0;
rtc::StreamResult result =
bool result =
m_sbuf.ReadOffset(buffer.get() + HEADER_SIZE, len, offset, &bytes_read);
RTC_DCHECK(result == rtc::SR_SUCCESS);
RTC_DCHECK(result);
RTC_DCHECK(static_cast<uint32_t>(bytes_read) == len);
}
@ -601,11 +595,9 @@ bool PseudoTcp::clock_check(uint32_t now, long& nTimeout) {
if (m_shutdown == SD_FORCEFUL)
return false;
size_t snd_buffered = 0;
m_sbuf.GetBuffered(&snd_buffered);
if ((m_shutdown == SD_GRACEFUL) &&
((m_state != TCP_ESTABLISHED) ||
((snd_buffered == 0) && (m_t_ack == 0)))) {
((m_sbuf.GetBuffered() == 0) && (m_t_ack == 0)))) {
return false;
}
@ -830,10 +822,8 @@ bool PseudoTcp::process(Segment& seg) {
// The goal it to make sure we always have at least enough data to fill the
// window. We'd like to notify the app when we are halfway to that point.
const uint32_t kIdealRefillSize = (m_sbuf_len + m_rbuf_len) / 2;
size_t snd_buffered = 0;
m_sbuf.GetBuffered(&snd_buffered);
if (m_bWriteEnable &&
static_cast<uint32_t>(snd_buffered) < kIdealRefillSize) {
static_cast<uint32_t>(m_sbuf.GetBuffered()) < kIdealRefillSize) {
m_bWriteEnable = false;
if (m_notify) {
m_notify->OnTcpWriteable(this);
@ -912,8 +902,7 @@ bool PseudoTcp::process(Segment& seg) {
// there's not already data ready to read, but this should always be
// true in the problematic scenario, since control frames are always
// sent first in the stream.
size_t rcv_buffered;
if (m_rbuf.GetBuffered(&rcv_buffered) && rcv_buffered == 0) {
if (m_rbuf.GetBuffered() == 0) {
m_rbuf.ConsumeWriteBuffer(seg.len);
m_rbuf.ConsumeReadData(seg.len);
// After shifting the position in the buffer, we may have
@ -924,15 +913,11 @@ bool PseudoTcp::process(Segment& seg) {
} else {
uint32_t nOffset = seg.seq - m_rcv_nxt;
rtc::StreamResult result =
m_rbuf.WriteOffset(seg.data, seg.len, nOffset, NULL);
if (result == rtc::SR_BLOCK) {
if (!m_rbuf.WriteOffset(seg.data, seg.len, nOffset, NULL)) {
// Ignore incoming packets outside of the receive window.
return false;
}
RTC_DCHECK(result == rtc::SR_SUCCESS);
if (seg.seq == m_rcv_nxt) {
m_rbuf.ConsumeWriteBuffer(seg.len);
m_rcv_nxt += seg.len;
@ -1078,8 +1063,7 @@ void PseudoTcp::attemptSend(SendFlags sflags) {
uint32_t nInFlight = m_snd_nxt - m_snd_una;
uint32_t nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
size_t snd_buffered = 0;
m_sbuf.GetBuffered(&snd_buffered);
size_t snd_buffered = m_sbuf.GetBuffered();
uint32_t nAvailable =
std::min(static_cast<uint32_t>(snd_buffered) - nInFlight, m_mss);
@ -1300,4 +1284,149 @@ void PseudoTcp::resizeReceiveBuffer(uint32_t new_size) {
m_rcv_wnd = static_cast<uint32_t>(available_space);
}
PseudoTcp::LockedFifoBuffer::LockedFifoBuffer(size_t size)
: buffer_(new char[size]),
buffer_length_(size),
data_length_(0),
read_position_(0) {}
PseudoTcp::LockedFifoBuffer::~LockedFifoBuffer() {}
size_t PseudoTcp::LockedFifoBuffer::GetBuffered() const {
webrtc::MutexLock lock(&mutex_);
return data_length_;
}
bool PseudoTcp::LockedFifoBuffer::SetCapacity(size_t size) {
webrtc::MutexLock lock(&mutex_);
if (data_length_ > size)
return false;
if (size != buffer_length_) {
char* buffer = new char[size];
const size_t copy = data_length_;
const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
memcpy(buffer, &buffer_[read_position_], tail_copy);
memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
buffer_.reset(buffer);
read_position_ = 0;
buffer_length_ = size;
}
return true;
}
bool PseudoTcp::LockedFifoBuffer::ReadOffset(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read) {
webrtc::MutexLock lock(&mutex_);
return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
}
bool PseudoTcp::LockedFifoBuffer::WriteOffset(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written) {
webrtc::MutexLock lock(&mutex_);
return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
}
bool PseudoTcp::LockedFifoBuffer::Read(void* buffer,
size_t bytes,
size_t* bytes_read) {
webrtc::MutexLock lock(&mutex_);
size_t copy = 0;
if (!ReadOffsetLocked(buffer, bytes, 0, &copy))
return false;
// If read was successful then adjust the read position and number of
// bytes buffered.
read_position_ = (read_position_ + copy) % buffer_length_;
data_length_ -= copy;
if (bytes_read)
*bytes_read = copy;
return true;
}
bool PseudoTcp::LockedFifoBuffer::Write(const void* buffer,
size_t bytes,
size_t* bytes_written) {
webrtc::MutexLock lock(&mutex_);
size_t copy = 0;
if (!WriteOffsetLocked(buffer, bytes, 0, &copy))
return false;
// If write was successful then adjust the number of readable bytes.
data_length_ += copy;
if (bytes_written) {
*bytes_written = copy;
}
return true;
}
void PseudoTcp::LockedFifoBuffer::ConsumeReadData(size_t size) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(size <= data_length_);
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
}
void PseudoTcp::LockedFifoBuffer::ConsumeWriteBuffer(size_t size) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(size <= buffer_length_ - data_length_);
data_length_ += size;
}
bool PseudoTcp::LockedFifoBuffer::GetWriteRemaining(size_t* size) const {
webrtc::MutexLock lock(&mutex_);
*size = buffer_length_ - data_length_;
return true;
}
bool PseudoTcp::LockedFifoBuffer::ReadOffsetLocked(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read) {
if (offset >= data_length_)
return false;
const size_t available = data_length_ - offset;
const size_t read_position = (read_position_ + offset) % buffer_length_;
const size_t copy = std::min(bytes, available);
const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
char* const p = static_cast<char*>(buffer);
memcpy(p, &buffer_[read_position], tail_copy);
memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
if (bytes_read)
*bytes_read = copy;
return true;
}
bool PseudoTcp::LockedFifoBuffer::WriteOffsetLocked(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written) {
if (data_length_ + offset >= buffer_length_)
return false;
const size_t available = buffer_length_ - data_length_ - offset;
const size_t write_position =
(read_position_ + data_length_ + offset) % buffer_length_;
const size_t copy = std::min(bytes, available);
const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
const char* const p = static_cast<const char*>(buffer);
memcpy(&buffer_[write_position], p, tail_copy);
memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
if (bytes_written)
*bytes_written = copy;
return true;
}
} // namespace cricket

View File

@ -15,8 +15,9 @@
#include <stdint.h>
#include <list>
#include <memory>
#include "rtc_base/memory/fifo_buffer.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/rtc_export.h"
namespace cricket {
@ -196,6 +197,50 @@ class RTC_EXPORT PseudoTcp {
// window scale factor |m_swnd_scale| accordingly.
void resizeReceiveBuffer(uint32_t new_size);
class LockedFifoBuffer final {
public:
explicit LockedFifoBuffer(size_t size);
~LockedFifoBuffer();
size_t GetBuffered() const;
bool SetCapacity(size_t size);
bool ReadOffset(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read);
bool WriteOffset(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written);
bool Read(void* buffer, size_t bytes, size_t* bytes_read);
bool Write(const void* buffer, size_t bytes, size_t* bytes_written);
void ConsumeReadData(size_t size);
void ConsumeWriteBuffer(size_t size);
bool GetWriteRemaining(size_t* size) const;
private:
bool ReadOffsetLocked(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
bool WriteOffsetLocked(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// the allocated buffer
std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(mutex_);
// size of the allocated buffer
size_t buffer_length_ RTC_GUARDED_BY(mutex_);
// amount of readable data in the buffer
size_t data_length_ RTC_GUARDED_BY(mutex_);
// offset to the readable data
size_t read_position_ RTC_GUARDED_BY(mutex_);
mutable webrtc::Mutex mutex_;
};
IPseudoTcpNotify* m_notify;
enum Shutdown { SD_NONE, SD_GRACEFUL, SD_FORCEFUL } m_shutdown;
int m_error;
@ -211,13 +256,13 @@ class RTC_EXPORT PseudoTcp {
RList m_rlist;
uint32_t m_rbuf_len, m_rcv_nxt, m_rcv_wnd, m_lastrecv;
uint8_t m_rwnd_scale; // Window scale factor.
rtc::FifoBuffer m_rbuf;
LockedFifoBuffer m_rbuf;
// Outgoing data
SList m_slist;
uint32_t m_sbuf_len, m_snd_nxt, m_snd_wnd, m_lastsend, m_snd_una;
uint8_t m_swnd_scale; // Window scale factor.
rtc::FifoBuffer m_sbuf;
LockedFifoBuffer m_sbuf;
// Maximum segment size, estimated protocol level, largest segment sent
uint32_t m_mss, m_msslevel, m_largest, m_mtu_advise;