From 13339483a4d19cdb42296bf62bd351b6ef391cf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niels=20M=C3=B6ller?= Date: Thu, 28 Mar 2019 13:30:15 +0100 Subject: [PATCH] Move FifoBuffer to its own file and build target Used only by test code and by pseudo_tcp. Bug: webrtc:6424 Change-Id: I28903e74f7b69cbdd8c368f4444c8a233eb76868 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/128868 Reviewed-by: Karl Wiberg Reviewed-by: Mirko Bonadei Commit-Queue: Niels Moller Cr-Commit-Position: refs/heads/master@{#27341} --- p2p/BUILD.gn | 3 + p2p/base/pseudo_tcp.h | 2 +- rtc_base/BUILD.gn | 3 +- rtc_base/memory/BUILD.gn | 21 ++ rtc_base/memory/fifo_buffer.cc | 247 ++++++++++++++++++ rtc_base/memory/fifo_buffer.h | 135 ++++++++++ .../fifo_buffer_unittest.cc} | 54 +--- rtc_base/proxy_server.h | 2 +- rtc_base/ssl_stream_adapter_unittest.cc | 1 + rtc_base/stream.cc | 232 ---------------- rtc_base/stream.h | 115 -------- 11 files changed, 412 insertions(+), 403 deletions(-) create mode 100644 rtc_base/memory/fifo_buffer.cc create mode 100644 rtc_base/memory/fifo_buffer.h rename rtc_base/{stream_unittest.cc => memory/fifo_buffer_unittest.cc} (88%) diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn index 83b07998e3..ce39958825 100644 --- a/p2p/BUILD.gn +++ b/p2p/BUILD.gn @@ -92,8 +92,11 @@ rtc_static_library("rtc_p2p") { "../logging:rtc_event_log_api", "../rtc_base", "../rtc_base:checks", + + # Needed by pseudo_tcp, which should move to a separate target. "../rtc_base:safe_minmax", "../rtc_base:weak_ptr", + "../rtc_base/memory:fifo_buffer", "../rtc_base/network:sent_packet", "../rtc_base/system:rtc_export", "../rtc_base/third_party/base64", diff --git a/p2p/base/pseudo_tcp.h b/p2p/base/pseudo_tcp.h index 4849c2aec0..375be3b7d7 100644 --- a/p2p/base/pseudo_tcp.h +++ b/p2p/base/pseudo_tcp.h @@ -15,7 +15,7 @@ #include #include -#include "rtc_base/stream.h" +#include "rtc_base/memory/fifo_buffer.h" #include "rtc_base/system/rtc_export.h" namespace cricket { diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 46c7138892..e22904250a 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -1106,6 +1106,7 @@ rtc_source_set("rtc_base_tests_utils") { ":checks", ":rtc_base", "../api/units:time_delta", + "memory:fifo_buffer", "third_party/sigslot", "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/memory", @@ -1383,7 +1384,6 @@ if (rtc_include_tests) { "rtc_certificate_unittest.cc", "signal_thread_unittest.cc", "sigslot_tester_unittest.cc", - "stream_unittest.cc", "test_client_unittest.cc", "thread_unittest.cc", "unique_id_generator_unittest.cc", @@ -1416,6 +1416,7 @@ if (rtc_include_tests) { "../test:field_trial", "../test:fileutils", "../test:test_support", + "memory:fifo_buffer", "synchronization:synchronization_unittests", "third_party/sigslot", "//third_party/abseil-cpp/absl/algorithm:container", diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn index 05fd7ababc..d151557218 100644 --- a/rtc_base/memory/BUILD.gn +++ b/rtc_base/memory/BUILD.gn @@ -30,15 +30,36 @@ rtc_source_set("aligned_malloc") { deps = [] } +rtc_source_set("fifo_buffer") { + visibility = [ + "../../p2p:rtc_p2p", + "..:rtc_base_tests_utils", + "..:rtc_base_unittests", + ":unittests", + ] + sources = [ + "fifo_buffer.cc", + "fifo_buffer.h", + ] + deps = [ + "..:rtc_base", + ] + if (is_nacl) { + deps += [ "//native_client_sdk/src/libraries/nacl_io" ] + } +} + rtc_source_set("unittests") { testonly = true sources = [ "aligned_array_unittest.cc", "aligned_malloc_unittest.cc", + "fifo_buffer_unittest.cc", ] deps = [ ":aligned_array", ":aligned_malloc", + ":fifo_buffer", "../../test:test_support", ] } diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc new file mode 100644 index 0000000000..44fb032e57 --- /dev/null +++ b/rtc_base/memory/fifo_buffer.cc @@ -0,0 +1,247 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/memory/fifo_buffer.h" + +#include + +#include "rtc_base/thread.h" + +namespace rtc { + +FifoBuffer::FifoBuffer(size_t size) + : state_(SS_OPEN), + buffer_(new char[size]), + buffer_length_(size), + data_length_(0), + read_position_(0), + owner_(Thread::Current()) { + // all events are done on the owner_ thread +} + +FifoBuffer::FifoBuffer(size_t size, Thread* owner) + : state_(SS_OPEN), + buffer_(new char[size]), + buffer_length_(size), + data_length_(0), + read_position_(0), + owner_(owner) { + // all events are done on the owner_ thread +} + +FifoBuffer::~FifoBuffer() {} + +bool FifoBuffer::GetBuffered(size_t* size) const { + CritScope cs(&crit_); + *size = data_length_; + return true; +} + +bool FifoBuffer::SetCapacity(size_t size) { + CritScope cs(&crit_); + if (data_length_ > size) { + return false; + } + + if (size != buffer_length_) { + char* buffer = new char[size]; + const size_t copy = data_length_; + const size_t tail_copy = std::min(copy, buffer_length_ - read_position_); + memcpy(buffer, &buffer_[read_position_], tail_copy); + memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); + buffer_.reset(buffer); + read_position_ = 0; + buffer_length_ = size; + } + return true; +} + +StreamResult FifoBuffer::ReadOffset(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read) { + CritScope cs(&crit_); + return ReadOffsetLocked(buffer, bytes, offset, bytes_read); +} + +StreamResult FifoBuffer::WriteOffset(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written) { + CritScope cs(&crit_); + return WriteOffsetLocked(buffer, bytes, offset, bytes_written); +} + +StreamState FifoBuffer::GetState() const { + CritScope cs(&crit_); + return state_; +} + +StreamResult FifoBuffer::Read(void* buffer, + size_t bytes, + size_t* bytes_read, + int* error) { + CritScope cs(&crit_); + const bool was_writable = data_length_ < buffer_length_; + size_t copy = 0; + StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); + + if (result == SR_SUCCESS) { + // If read was successful then adjust the read position and number of + // bytes buffered. + read_position_ = (read_position_ + copy) % buffer_length_; + data_length_ -= copy; + if (bytes_read) { + *bytes_read = copy; + } + + // if we were full before, and now we're not, post an event + if (!was_writable && copy > 0) { + PostEvent(owner_, SE_WRITE, 0); + } + } + return result; +} + +StreamResult FifoBuffer::Write(const void* buffer, + size_t bytes, + size_t* bytes_written, + int* error) { + CritScope cs(&crit_); + + const bool was_readable = (data_length_ > 0); + size_t copy = 0; + StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); + + if (result == SR_SUCCESS) { + // If write was successful then adjust the number of readable bytes. + data_length_ += copy; + if (bytes_written) { + *bytes_written = copy; + } + + // if we didn't have any data to read before, and now we do, post an event + if (!was_readable && copy > 0) { + PostEvent(owner_, SE_READ, 0); + } + } + return result; +} + +void FifoBuffer::Close() { + CritScope cs(&crit_); + state_ = SS_CLOSED; +} + +const void* FifoBuffer::GetReadData(size_t* size) { + CritScope cs(&crit_); + *size = (read_position_ + data_length_ <= buffer_length_) + ? data_length_ + : buffer_length_ - read_position_; + return &buffer_[read_position_]; +} + +void FifoBuffer::ConsumeReadData(size_t size) { + CritScope cs(&crit_); + RTC_DCHECK(size <= data_length_); + const bool was_writable = data_length_ < buffer_length_; + read_position_ = (read_position_ + size) % buffer_length_; + data_length_ -= size; + if (!was_writable && size > 0) { + PostEvent(owner_, SE_WRITE, 0); + } +} + +void* FifoBuffer::GetWriteBuffer(size_t* size) { + CritScope cs(&crit_); + if (state_ == SS_CLOSED) { + return nullptr; + } + + // if empty, reset the write position to the beginning, so we can get + // the biggest possible block + if (data_length_ == 0) { + read_position_ = 0; + } + + const size_t write_position = + (read_position_ + data_length_) % buffer_length_; + *size = (write_position > read_position_ || data_length_ == 0) + ? buffer_length_ - write_position + : read_position_ - write_position; + return &buffer_[write_position]; +} + +void FifoBuffer::ConsumeWriteBuffer(size_t size) { + CritScope cs(&crit_); + RTC_DCHECK(size <= buffer_length_ - data_length_); + const bool was_readable = (data_length_ > 0); + data_length_ += size; + if (!was_readable && size > 0) { + PostEvent(owner_, SE_READ, 0); + } +} + +bool FifoBuffer::GetWriteRemaining(size_t* size) const { + CritScope cs(&crit_); + *size = buffer_length_ - data_length_; + return true; +} + +StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read) { + if (offset >= data_length_) { + return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; + } + + const size_t available = data_length_ - offset; + const size_t read_position = (read_position_ + offset) % buffer_length_; + const size_t copy = std::min(bytes, available); + const size_t tail_copy = std::min(copy, buffer_length_ - read_position); + char* const p = static_cast(buffer); + memcpy(p, &buffer_[read_position], tail_copy); + memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); + + if (bytes_read) { + *bytes_read = copy; + } + return SR_SUCCESS; +} + +StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written) { + if (state_ == SS_CLOSED) { + return SR_EOS; + } + + if (data_length_ + offset >= buffer_length_) { + return SR_BLOCK; + } + + const size_t available = buffer_length_ - data_length_ - offset; + const size_t write_position = + (read_position_ + data_length_ + offset) % buffer_length_; + const size_t copy = std::min(bytes, available); + const size_t tail_copy = std::min(copy, buffer_length_ - write_position); + const char* const p = static_cast(buffer); + memcpy(&buffer_[write_position], p, tail_copy); + memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); + + if (bytes_written) { + *bytes_written = copy; + } + return SR_SUCCESS; +} + +} // namespace rtc diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h new file mode 100644 index 0000000000..f859815c70 --- /dev/null +++ b/rtc_base/memory/fifo_buffer.h @@ -0,0 +1,135 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_MEMORY_FIFO_BUFFER_H_ +#define RTC_BASE_MEMORY_FIFO_BUFFER_H_ + +#include + +#include "rtc_base/stream.h" + +namespace rtc { + +// FifoBuffer allows for efficient, thread-safe buffering of data between +// writer and reader. +class FifoBuffer final : public StreamInterface { + public: + // Creates a FIFO buffer with the specified capacity. + explicit FifoBuffer(size_t length); + // Creates a FIFO buffer with the specified capacity and owner + FifoBuffer(size_t length, Thread* owner); + ~FifoBuffer() override; + // Gets the amount of data currently readable from the buffer. + bool GetBuffered(size_t* data_len) const; + // Resizes the buffer to the specified capacity. Fails if data_length_ > size + bool SetCapacity(size_t length); + + // Read into |buffer| with an offset from the current read position, offset + // is specified in number of bytes. + // This method doesn't adjust read position nor the number of available + // bytes, user has to call ConsumeReadData() to do this. + StreamResult ReadOffset(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read); + + // Write |buffer| with an offset from the current write position, offset is + // specified in number of bytes. + // This method doesn't adjust the number of buffered bytes, user has to call + // ConsumeWriteBuffer() to do this. + StreamResult WriteOffset(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written); + + // StreamInterface methods + StreamState GetState() const override; + StreamResult Read(void* buffer, + size_t bytes, + size_t* bytes_read, + int* error) override; + StreamResult Write(const void* buffer, + size_t bytes, + size_t* bytes_written, + int* error) override; + void Close() override; + + // Seek to a byte offset from the beginning of the stream. Returns false if + // the stream does not support seeking, or cannot seek to the specified + // position. + bool SetPosition(size_t position); + + // Get the byte offset of the current position from the start of the stream. + // Returns false if the position is not known. + bool GetPosition(size_t* position) const; + + // Seek to the start of the stream. + bool Rewind() { return SetPosition(0); } + + // GetReadData returns a pointer to a buffer which is owned by the stream. + // The buffer contains data_len bytes. null is returned if no data is + // available, or if the method fails. If the caller processes the data, it + // must call ConsumeReadData with the number of processed bytes. GetReadData + // does not require a matching call to ConsumeReadData if the data is not + // processed. Read and ConsumeReadData invalidate the buffer returned by + // GetReadData. + const void* GetReadData(size_t* data_len); + void ConsumeReadData(size_t used); + // GetWriteBuffer returns a pointer to a buffer which is owned by the stream. + // The buffer has a capacity of buf_len bytes. null is returned if there is + // no buffer available, or if the method fails. The call may write data to + // the buffer, and then call ConsumeWriteBuffer with the number of bytes + // written. GetWriteBuffer does not require a matching call to + // ConsumeWriteData if no data is written. Write and + // ConsumeWriteData invalidate the buffer returned by GetWriteBuffer. + void* GetWriteBuffer(size_t* buf_len); + void ConsumeWriteBuffer(size_t used); + + // Return the number of Write()-able bytes remaining before end-of-stream. + // Returns false if not known. + bool GetWriteRemaining(size_t* size) const; + + private: + // Helper method that implements ReadOffset. Caller must acquire a lock + // when calling this method. + StreamResult ReadOffsetLocked(void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_read) + RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + + // Helper method that implements WriteOffset. Caller must acquire a lock + // when calling this method. + StreamResult WriteOffsetLocked(const void* buffer, + size_t bytes, + size_t offset, + size_t* bytes_written) + RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + + // keeps the opened/closed state of the stream + StreamState state_ RTC_GUARDED_BY(crit_); + // the allocated buffer + std::unique_ptr buffer_ RTC_GUARDED_BY(crit_); + // size of the allocated buffer + size_t buffer_length_ RTC_GUARDED_BY(crit_); + // amount of readable data in the buffer + size_t data_length_ RTC_GUARDED_BY(crit_); + // offset to the readable data + size_t read_position_ RTC_GUARDED_BY(crit_); + // stream callbacks are dispatched on this thread + Thread* owner_; + // object lock + CriticalSection crit_; + RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer); +}; + +} // namespace rtc + +#endif // RTC_BASE_MEMORY_FIFO_BUFFER_H_ diff --git a/rtc_base/stream_unittest.cc b/rtc_base/memory/fifo_buffer_unittest.cc similarity index 88% rename from rtc_base/stream_unittest.cc rename to rtc_base/memory/fifo_buffer_unittest.cc index bd6e84f591..c2926b379e 100644 --- a/rtc_base/stream_unittest.cc +++ b/rtc_base/memory/fifo_buffer_unittest.cc @@ -8,65 +8,13 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/stream.h" - #include +#include "rtc_base/memory/fifo_buffer.h" #include "test/gtest.h" namespace rtc { -/////////////////////////////////////////////////////////////////////////////// -// TestStream -/////////////////////////////////////////////////////////////////////////////// - -class TestStream : public StreamInterface { - public: - TestStream() : pos_(0) {} - - StreamState GetState() const override { return SS_OPEN; } - - StreamResult Read(void* buffer, - size_t buffer_len, - size_t* read, - int* error) override { - unsigned char* uc_buffer = static_cast(buffer); - for (size_t i = 0; i < buffer_len; ++i) { - uc_buffer[i] = static_cast(pos_++); - } - if (read) - *read = buffer_len; - return SR_SUCCESS; - } - - StreamResult Write(const void* data, - size_t data_len, - size_t* written, - int* error) override { - if (error) - *error = -1; - return SR_ERROR; - } - - void Close() override {} - - private: - size_t pos_; -}; - -bool VerifyTestBuffer(unsigned char* buffer, size_t len, unsigned char value) { - bool passed = true; - for (size_t i = 0; i < len; ++i) { - if (buffer[i] != value++) { - passed = false; - break; - } - } - // Ensure that we don't pass again without re-writing - memset(buffer, 0, len); - return passed; -} - TEST(FifoBufferTest, TestAll) { const size_t kSize = 16; const char in[kSize * 2 + 1] = "0123456789ABCDEFGHIJKLMNOPQRSTUV"; diff --git a/rtc_base/proxy_server.h b/rtc_base/proxy_server.h index c7e6078f1d..139cc9138c 100644 --- a/rtc_base/proxy_server.h +++ b/rtc_base/proxy_server.h @@ -17,9 +17,9 @@ #include "absl/memory/memory.h" #include "rtc_base/async_socket.h" #include "rtc_base/constructor_magic.h" +#include "rtc_base/memory/fifo_buffer.h" #include "rtc_base/server_socket_adapters.h" #include "rtc_base/socket_address.h" -#include "rtc_base/stream.h" namespace rtc { diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc index abf9880462..585d080869 100644 --- a/rtc_base/ssl_stream_adapter_unittest.cc +++ b/rtc_base/ssl_stream_adapter_unittest.cc @@ -17,6 +17,7 @@ #include "rtc_base/checks.h" #include "rtc_base/gunit.h" #include "rtc_base/helpers.h" +#include "rtc_base/memory/fifo_buffer.h" #include "rtc_base/memory_stream.h" #include "rtc_base/message_digest.h" #include "rtc_base/ssl_adapter.h" diff --git a/rtc_base/stream.cc b/rtc_base/stream.cc index 6b510ddd3a..4ad005eee8 100644 --- a/rtc_base/stream.cc +++ b/rtc_base/stream.cc @@ -267,236 +267,4 @@ void FileStream::DoClose() { fclose(file_); } -/////////////////////////////////////////////////////////////////////////////// -// FifoBuffer -/////////////////////////////////////////////////////////////////////////////// - -FifoBuffer::FifoBuffer(size_t size) - : state_(SS_OPEN), - buffer_(new char[size]), - buffer_length_(size), - data_length_(0), - read_position_(0), - owner_(Thread::Current()) { - // all events are done on the owner_ thread -} - -FifoBuffer::FifoBuffer(size_t size, Thread* owner) - : state_(SS_OPEN), - buffer_(new char[size]), - buffer_length_(size), - data_length_(0), - read_position_(0), - owner_(owner) { - // all events are done on the owner_ thread -} - -FifoBuffer::~FifoBuffer() {} - -bool FifoBuffer::GetBuffered(size_t* size) const { - CritScope cs(&crit_); - *size = data_length_; - return true; -} - -bool FifoBuffer::SetCapacity(size_t size) { - CritScope cs(&crit_); - if (data_length_ > size) { - return false; - } - - if (size != buffer_length_) { - char* buffer = new char[size]; - const size_t copy = data_length_; - const size_t tail_copy = std::min(copy, buffer_length_ - read_position_); - memcpy(buffer, &buffer_[read_position_], tail_copy); - memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); - buffer_.reset(buffer); - read_position_ = 0; - buffer_length_ = size; - } - return true; -} - -StreamResult FifoBuffer::ReadOffset(void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_read) { - CritScope cs(&crit_); - return ReadOffsetLocked(buffer, bytes, offset, bytes_read); -} - -StreamResult FifoBuffer::WriteOffset(const void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_written) { - CritScope cs(&crit_); - return WriteOffsetLocked(buffer, bytes, offset, bytes_written); -} - -StreamState FifoBuffer::GetState() const { - CritScope cs(&crit_); - return state_; -} - -StreamResult FifoBuffer::Read(void* buffer, - size_t bytes, - size_t* bytes_read, - int* error) { - CritScope cs(&crit_); - const bool was_writable = data_length_ < buffer_length_; - size_t copy = 0; - StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); - - if (result == SR_SUCCESS) { - // If read was successful then adjust the read position and number of - // bytes buffered. - read_position_ = (read_position_ + copy) % buffer_length_; - data_length_ -= copy; - if (bytes_read) { - *bytes_read = copy; - } - - // if we were full before, and now we're not, post an event - if (!was_writable && copy > 0) { - PostEvent(owner_, SE_WRITE, 0); - } - } - return result; -} - -StreamResult FifoBuffer::Write(const void* buffer, - size_t bytes, - size_t* bytes_written, - int* error) { - CritScope cs(&crit_); - - const bool was_readable = (data_length_ > 0); - size_t copy = 0; - StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); - - if (result == SR_SUCCESS) { - // If write was successful then adjust the number of readable bytes. - data_length_ += copy; - if (bytes_written) { - *bytes_written = copy; - } - - // if we didn't have any data to read before, and now we do, post an event - if (!was_readable && copy > 0) { - PostEvent(owner_, SE_READ, 0); - } - } - return result; -} - -void FifoBuffer::Close() { - CritScope cs(&crit_); - state_ = SS_CLOSED; -} - -const void* FifoBuffer::GetReadData(size_t* size) { - CritScope cs(&crit_); - *size = (read_position_ + data_length_ <= buffer_length_) - ? data_length_ - : buffer_length_ - read_position_; - return &buffer_[read_position_]; -} - -void FifoBuffer::ConsumeReadData(size_t size) { - CritScope cs(&crit_); - RTC_DCHECK(size <= data_length_); - const bool was_writable = data_length_ < buffer_length_; - read_position_ = (read_position_ + size) % buffer_length_; - data_length_ -= size; - if (!was_writable && size > 0) { - PostEvent(owner_, SE_WRITE, 0); - } -} - -void* FifoBuffer::GetWriteBuffer(size_t* size) { - CritScope cs(&crit_); - if (state_ == SS_CLOSED) { - return nullptr; - } - - // if empty, reset the write position to the beginning, so we can get - // the biggest possible block - if (data_length_ == 0) { - read_position_ = 0; - } - - const size_t write_position = - (read_position_ + data_length_) % buffer_length_; - *size = (write_position > read_position_ || data_length_ == 0) - ? buffer_length_ - write_position - : read_position_ - write_position; - return &buffer_[write_position]; -} - -void FifoBuffer::ConsumeWriteBuffer(size_t size) { - CritScope cs(&crit_); - RTC_DCHECK(size <= buffer_length_ - data_length_); - const bool was_readable = (data_length_ > 0); - data_length_ += size; - if (!was_readable && size > 0) { - PostEvent(owner_, SE_READ, 0); - } -} - -bool FifoBuffer::GetWriteRemaining(size_t* size) const { - CritScope cs(&crit_); - *size = buffer_length_ - data_length_; - return true; -} - -StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_read) { - if (offset >= data_length_) { - return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; - } - - const size_t available = data_length_ - offset; - const size_t read_position = (read_position_ + offset) % buffer_length_; - const size_t copy = std::min(bytes, available); - const size_t tail_copy = std::min(copy, buffer_length_ - read_position); - char* const p = static_cast(buffer); - memcpy(p, &buffer_[read_position], tail_copy); - memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); - - if (bytes_read) { - *bytes_read = copy; - } - return SR_SUCCESS; -} - -StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_written) { - if (state_ == SS_CLOSED) { - return SR_EOS; - } - - if (data_length_ + offset >= buffer_length_) { - return SR_BLOCK; - } - - const size_t available = buffer_length_ - data_length_ - offset; - const size_t write_position = - (read_position_ + data_length_ + offset) % buffer_length_; - const size_t copy = std::min(bytes, available); - const size_t tail_copy = std::min(copy, buffer_length_ - write_position); - const char* const p = static_cast(buffer); - memcpy(&buffer_[write_position], p, tail_copy); - memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); - - if (bytes_written) { - *bytes_written = copy; - } - return SR_SUCCESS; -} - } // namespace rtc diff --git a/rtc_base/stream.h b/rtc_base/stream.h index 060985752c..b535f16882 100644 --- a/rtc_base/stream.h +++ b/rtc_base/stream.h @@ -225,121 +225,6 @@ class FileStream : public StreamInterface { RTC_DISALLOW_COPY_AND_ASSIGN(FileStream); }; -// FifoBuffer allows for efficient, thread-safe buffering of data between -// writer and reader. As the data can wrap around the end of the buffer, -// MemoryStreamBase can't help us here. - -class FifoBuffer final : public StreamInterface { - public: - // Creates a FIFO buffer with the specified capacity. - explicit FifoBuffer(size_t length); - // Creates a FIFO buffer with the specified capacity and owner - FifoBuffer(size_t length, Thread* owner); - ~FifoBuffer() override; - // Gets the amount of data currently readable from the buffer. - bool GetBuffered(size_t* data_len) const; - // Resizes the buffer to the specified capacity. Fails if data_length_ > size - bool SetCapacity(size_t length); - - // Read into |buffer| with an offset from the current read position, offset - // is specified in number of bytes. - // This method doesn't adjust read position nor the number of available - // bytes, user has to call ConsumeReadData() to do this. - StreamResult ReadOffset(void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_read); - - // Write |buffer| with an offset from the current write position, offset is - // specified in number of bytes. - // This method doesn't adjust the number of buffered bytes, user has to call - // ConsumeWriteBuffer() to do this. - StreamResult WriteOffset(const void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_written); - - // StreamInterface methods - StreamState GetState() const override; - StreamResult Read(void* buffer, - size_t bytes, - size_t* bytes_read, - int* error) override; - StreamResult Write(const void* buffer, - size_t bytes, - size_t* bytes_written, - int* error) override; - void Close() override; - - // Seek to a byte offset from the beginning of the stream. Returns false if - // the stream does not support seeking, or cannot seek to the specified - // position. - bool SetPosition(size_t position); - - // Get the byte offset of the current position from the start of the stream. - // Returns false if the position is not known. - bool GetPosition(size_t* position) const; - - // Seek to the start of the stream. - bool Rewind() { return SetPosition(0); } - - // GetReadData returns a pointer to a buffer which is owned by the stream. - // The buffer contains data_len bytes. null is returned if no data is - // available, or if the method fails. If the caller processes the data, it - // must call ConsumeReadData with the number of processed bytes. GetReadData - // does not require a matching call to ConsumeReadData if the data is not - // processed. Read and ConsumeReadData invalidate the buffer returned by - // GetReadData. - const void* GetReadData(size_t* data_len); - void ConsumeReadData(size_t used); - // GetWriteBuffer returns a pointer to a buffer which is owned by the stream. - // The buffer has a capacity of buf_len bytes. null is returned if there is - // no buffer available, or if the method fails. The call may write data to - // the buffer, and then call ConsumeWriteBuffer with the number of bytes - // written. GetWriteBuffer does not require a matching call to - // ConsumeWriteData if no data is written. Write and - // ConsumeWriteData invalidate the buffer returned by GetWriteBuffer. - void* GetWriteBuffer(size_t* buf_len); - void ConsumeWriteBuffer(size_t used); - - // Return the number of Write()-able bytes remaining before end-of-stream. - // Returns false if not known. - bool GetWriteRemaining(size_t* size) const; - - private: - // Helper method that implements ReadOffset. Caller must acquire a lock - // when calling this method. - StreamResult ReadOffsetLocked(void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_read) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); - - // Helper method that implements WriteOffset. Caller must acquire a lock - // when calling this method. - StreamResult WriteOffsetLocked(const void* buffer, - size_t bytes, - size_t offset, - size_t* bytes_written) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); - - // keeps the opened/closed state of the stream - StreamState state_ RTC_GUARDED_BY(crit_); - // the allocated buffer - std::unique_ptr buffer_ RTC_GUARDED_BY(crit_); - // size of the allocated buffer - size_t buffer_length_ RTC_GUARDED_BY(crit_); - // amount of readable data in the buffer - size_t data_length_ RTC_GUARDED_BY(crit_); - // offset to the readable data - size_t read_position_ RTC_GUARDED_BY(crit_); - // stream callbacks are dispatched on this thread - Thread* owner_; - // object lock - CriticalSection crit_; - RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer); -}; - } // namespace rtc #endif // RTC_BASE_STREAM_H_