From 8e4cda7de0087f8a8de151e7964e6fa87c03d342 Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Mon, 5 Oct 2020 13:44:39 +0200 Subject: [PATCH] Give PseudoTcp its own FifoBuffer. The PseudoTcp test class is being used outside of WebRTC in ways that WebRTC itself doesn't, which caused this revert: https://webrtc-review.googlesource.com/c/src/+/186564 As it happens though, PseudoTcp doesn't actually use the StreamInterface part of FifoBuffer, so this CL cuts the dependency from PseudoTcp on FifoBuffer. Moving forward, we could just remove this class from WebRTC. Bug: webrtc:11988 Change-Id: Id34a2a6305e8fe37d705ba5e8876dd6398515125 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/186665 Commit-Queue: Tommi Reviewed-by: Mirko Bonadei Cr-Commit-Position: refs/heads/master@{#32309} --- p2p/BUILD.gn | 1 + p2p/base/pseudo_tcp.cc | 187 ++++++++++++++++++++++++++++++++++------- p2p/base/pseudo_tcp.h | 51 ++++++++++- 3 files changed, 207 insertions(+), 32 deletions(-) diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn index 65408f4a2b..a376c74f89 100644 --- a/p2p/BUILD.gn +++ b/p2p/BUILD.gn @@ -107,6 +107,7 @@ rtc_library("rtc_p2p") { "../rtc_base:weak_ptr", "../rtc_base/memory:fifo_buffer", "../rtc_base/network:sent_packet", + "../rtc_base/synchronization:mutex", "../rtc_base/system:rtc_export", "../rtc_base/task_utils:to_queued_task", "../rtc_base/third_party/base64", diff --git a/p2p/base/pseudo_tcp.cc b/p2p/base/pseudo_tcp.cc index 56e6b9b6ad..13e7a2214f 100644 --- a/p2p/base/pseudo_tcp.cc +++ b/p2p/base/pseudo_tcp.cc @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -402,9 +403,7 @@ uint32_t PseudoTcp::GetBytesInFlight() const { } uint32_t PseudoTcp::GetBytesBufferedNotSent() const { - size_t buffered_bytes = 0; - m_sbuf.GetBuffered(&buffered_bytes); - return static_cast(m_snd_una + buffered_bytes - m_snd_nxt); + return static_cast(m_snd_una + m_sbuf.GetBuffered() - m_snd_nxt); } uint32_t PseudoTcp::GetRoundTripTimeEstimateMs() const { @@ -422,15 +421,11 @@ int PseudoTcp::Recv(char* buffer, size_t len) { } size_t read = 0; - rtc::StreamResult result = m_rbuf.Read(buffer, len, &read, NULL); - - // If there's no data in |m_rbuf|. - if (result == rtc::SR_BLOCK) { + if (!m_rbuf.Read(buffer, len, &read)) { m_bReadEnable = true; m_error = EWOULDBLOCK; return SOCKET_ERROR; } - RTC_DCHECK(result == rtc::SR_SUCCESS); size_t available_space = 0; m_rbuf.GetWriteRemaining(&available_space); @@ -497,14 +492,13 @@ uint32_t PseudoTcp::queue(const char* data, uint32_t len, bool bCtrl) { (m_slist.back().xmit == 0)) { m_slist.back().len += len; } else { - size_t snd_buffered = 0; - m_sbuf.GetBuffered(&snd_buffered); - SSegment sseg(static_cast(m_snd_una + snd_buffered), len, bCtrl); + SSegment sseg(static_cast(m_snd_una + m_sbuf.GetBuffered()), len, + bCtrl); m_slist.push_back(sseg); } size_t written = 0; - m_sbuf.Write(data, len, &written, NULL); + m_sbuf.Write(data, len, &written); return static_cast(written); } @@ -532,9 +526,9 @@ IPseudoTcpNotify::WriteResult PseudoTcp::packet(uint32_t seq, if (len) { size_t bytes_read = 0; - rtc::StreamResult result = + bool result = m_sbuf.ReadOffset(buffer.get() + HEADER_SIZE, len, offset, &bytes_read); - RTC_DCHECK(result == rtc::SR_SUCCESS); + RTC_DCHECK(result); RTC_DCHECK(static_cast(bytes_read) == len); } @@ -601,11 +595,9 @@ bool PseudoTcp::clock_check(uint32_t now, long& nTimeout) { if (m_shutdown == SD_FORCEFUL) return false; - size_t snd_buffered = 0; - m_sbuf.GetBuffered(&snd_buffered); if ((m_shutdown == SD_GRACEFUL) && ((m_state != TCP_ESTABLISHED) || - ((snd_buffered == 0) && (m_t_ack == 0)))) { + ((m_sbuf.GetBuffered() == 0) && (m_t_ack == 0)))) { return false; } @@ -830,10 +822,8 @@ bool PseudoTcp::process(Segment& seg) { // The goal it to make sure we always have at least enough data to fill the // window. We'd like to notify the app when we are halfway to that point. const uint32_t kIdealRefillSize = (m_sbuf_len + m_rbuf_len) / 2; - size_t snd_buffered = 0; - m_sbuf.GetBuffered(&snd_buffered); if (m_bWriteEnable && - static_cast(snd_buffered) < kIdealRefillSize) { + static_cast(m_sbuf.GetBuffered()) < kIdealRefillSize) { m_bWriteEnable = false; if (m_notify) { m_notify->OnTcpWriteable(this); @@ -912,8 +902,7 @@ bool PseudoTcp::process(Segment& seg) { // there's not already data ready to read, but this should always be // true in the problematic scenario, since control frames are always // sent first in the stream. - size_t rcv_buffered; - if (m_rbuf.GetBuffered(&rcv_buffered) && rcv_buffered == 0) { + if (m_rbuf.GetBuffered() == 0) { m_rbuf.ConsumeWriteBuffer(seg.len); m_rbuf.ConsumeReadData(seg.len); // After shifting the position in the buffer, we may have @@ -924,15 +913,11 @@ bool PseudoTcp::process(Segment& seg) { } else { uint32_t nOffset = seg.seq - m_rcv_nxt; - rtc::StreamResult result = - m_rbuf.WriteOffset(seg.data, seg.len, nOffset, NULL); - if (result == rtc::SR_BLOCK) { + if (!m_rbuf.WriteOffset(seg.data, seg.len, nOffset, NULL)) { // Ignore incoming packets outside of the receive window. return false; } - RTC_DCHECK(result == rtc::SR_SUCCESS); - if (seg.seq == m_rcv_nxt) { m_rbuf.ConsumeWriteBuffer(seg.len); m_rcv_nxt += seg.len; @@ -1078,8 +1063,7 @@ void PseudoTcp::attemptSend(SendFlags sflags) { uint32_t nInFlight = m_snd_nxt - m_snd_una; uint32_t nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0; - size_t snd_buffered = 0; - m_sbuf.GetBuffered(&snd_buffered); + size_t snd_buffered = m_sbuf.GetBuffered(); uint32_t nAvailable = std::min(static_cast(snd_buffered) - nInFlight, m_mss); @@ -1300,4 +1284,149 @@ void PseudoTcp::resizeReceiveBuffer(uint32_t new_size) { m_rcv_wnd = static_cast(available_space); } +PseudoTcp::LockedFifoBuffer::LockedFifoBuffer(size_t size) + : buffer_(new char[size]), + buffer_length_(size), + data_length_(0), + read_position_(0) {} + +PseudoTcp::LockedFifoBuffer::~LockedFifoBuffer() {} + +size_t PseudoTcp::LockedFifoBuffer::GetBuffered() const { + webrtc::MutexLock lock(&mutex_); + return data_length_; +} + +bool PseudoTcp::LockedFifoBuffer::SetCapacity(size_t size) { + webrtc::MutexLock lock(&mutex_); + if (data_length_ > size) + return false; + + if (size != buffer_length_) { + char* buffer = new char[size]; + const size_t copy = data_length_; + const size_t tail_copy = std::min(copy, buffer_length_ - read_position_); + memcpy(buffer, &buffer_[read_position_], tail_copy); + memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); + buffer_.reset(buffer); + read_position_ = 0; + buffer_length_ = size; + } + + return true; +} + +bool PseudoTcp::LockedFifoBuffer::ReadOffset(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read) { + webrtc::MutexLock lock(&mutex_); + return ReadOffsetLocked(buffer, bytes, offset, bytes_read); +} + +bool PseudoTcp::LockedFifoBuffer::WriteOffset(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written) { + webrtc::MutexLock lock(&mutex_); + return WriteOffsetLocked(buffer, bytes, offset, bytes_written); +} + +bool PseudoTcp::LockedFifoBuffer::Read(void* buffer, + size_t bytes, + size_t* bytes_read) { + webrtc::MutexLock lock(&mutex_); + size_t copy = 0; + if (!ReadOffsetLocked(buffer, bytes, 0, ©)) + return false; + + // If read was successful then adjust the read position and number of + // bytes buffered. + read_position_ = (read_position_ + copy) % buffer_length_; + data_length_ -= copy; + if (bytes_read) + *bytes_read = copy; + + return true; +} + +bool PseudoTcp::LockedFifoBuffer::Write(const void* buffer, + size_t bytes, + size_t* bytes_written) { + webrtc::MutexLock lock(&mutex_); + size_t copy = 0; + if (!WriteOffsetLocked(buffer, bytes, 0, ©)) + return false; + + // If write was successful then adjust the number of readable bytes. + data_length_ += copy; + if (bytes_written) { + *bytes_written = copy; + } + + return true; +} + +void PseudoTcp::LockedFifoBuffer::ConsumeReadData(size_t size) { + webrtc::MutexLock lock(&mutex_); + RTC_DCHECK(size <= data_length_); + read_position_ = (read_position_ + size) % buffer_length_; + data_length_ -= size; +} + +void PseudoTcp::LockedFifoBuffer::ConsumeWriteBuffer(size_t size) { + webrtc::MutexLock lock(&mutex_); + RTC_DCHECK(size <= buffer_length_ - data_length_); + data_length_ += size; +} + +bool PseudoTcp::LockedFifoBuffer::GetWriteRemaining(size_t* size) const { + webrtc::MutexLock lock(&mutex_); + *size = buffer_length_ - data_length_; + return true; +} + +bool PseudoTcp::LockedFifoBuffer::ReadOffsetLocked(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read) { + if (offset >= data_length_) + return false; + + const size_t available = data_length_ - offset; + const size_t read_position = (read_position_ + offset) % buffer_length_; + const size_t copy = std::min(bytes, available); + const size_t tail_copy = std::min(copy, buffer_length_ - read_position); + char* const p = static_cast(buffer); + memcpy(p, &buffer_[read_position], tail_copy); + memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); + + if (bytes_read) + *bytes_read = copy; + + return true; +} + +bool PseudoTcp::LockedFifoBuffer::WriteOffsetLocked(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written) { + if (data_length_ + offset >= buffer_length_) + return false; + + const size_t available = buffer_length_ - data_length_ - offset; + const size_t write_position = + (read_position_ + data_length_ + offset) % buffer_length_; + const size_t copy = std::min(bytes, available); + const size_t tail_copy = std::min(copy, buffer_length_ - write_position); + const char* const p = static_cast(buffer); + memcpy(&buffer_[write_position], p, tail_copy); + memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); + + if (bytes_written) + *bytes_written = copy; + + return true; +} + } // namespace cricket diff --git a/p2p/base/pseudo_tcp.h b/p2p/base/pseudo_tcp.h index cb6d974496..74ffee631c 100644 --- a/p2p/base/pseudo_tcp.h +++ b/p2p/base/pseudo_tcp.h @@ -15,8 +15,9 @@ #include #include +#include -#include "rtc_base/memory/fifo_buffer.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/rtc_export.h" namespace cricket { @@ -196,6 +197,50 @@ class RTC_EXPORT PseudoTcp { // window scale factor |m_swnd_scale| accordingly. void resizeReceiveBuffer(uint32_t new_size); + class LockedFifoBuffer final { + public: + explicit LockedFifoBuffer(size_t size); + ~LockedFifoBuffer(); + + size_t GetBuffered() const; + bool SetCapacity(size_t size); + bool ReadOffset(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read); + bool WriteOffset(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written); + bool Read(void* buffer, size_t bytes, size_t* bytes_read); + bool Write(const void* buffer, size_t bytes, size_t* bytes_written); + void ConsumeReadData(size_t size); + void ConsumeWriteBuffer(size_t size); + bool GetWriteRemaining(size_t* size) const; + + private: + bool ReadOffsetLocked(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read) + RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + bool WriteOffsetLocked(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written) + RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + // the allocated buffer + std::unique_ptr buffer_ RTC_GUARDED_BY(mutex_); + // size of the allocated buffer + size_t buffer_length_ RTC_GUARDED_BY(mutex_); + // amount of readable data in the buffer + size_t data_length_ RTC_GUARDED_BY(mutex_); + // offset to the readable data + size_t read_position_ RTC_GUARDED_BY(mutex_); + mutable webrtc::Mutex mutex_; + }; + IPseudoTcpNotify* m_notify; enum Shutdown { SD_NONE, SD_GRACEFUL, SD_FORCEFUL } m_shutdown; int m_error; @@ -211,13 +256,13 @@ class RTC_EXPORT PseudoTcp { RList m_rlist; uint32_t m_rbuf_len, m_rcv_nxt, m_rcv_wnd, m_lastrecv; uint8_t m_rwnd_scale; // Window scale factor. - rtc::FifoBuffer m_rbuf; + LockedFifoBuffer m_rbuf; // Outgoing data SList m_slist; uint32_t m_sbuf_len, m_snd_nxt, m_snd_wnd, m_lastsend, m_snd_una; uint8_t m_swnd_scale; // Window scale factor. - rtc::FifoBuffer m_sbuf; + LockedFifoBuffer m_sbuf; // Maximum segment size, estimated protocol level, largest segment sent uint32_t m_mss, m_msslevel, m_largest, m_mtu_advise;