[Sheriff] Revert "Remove MessageHandler[AutoCleanup] dependency from StreamInterface."

This reverts commit eb79dd9ffdc41e4ca86803bfc1317e0961a8a8a6.

Reason for revert: breaks WebRTC roll into Chrome:
https://crrev.com/c/2445696

Sample failure:
https://ci.chromium.org/p/chromium/builders/try/linux-rel/506049
[ RUN      ] PseudoTcpAdapterTest.DeleteOnConnected

Original change's description:
> Remove MessageHandler[AutoCleanup] dependency from StreamInterface.
>
> This includes relying on related types such as MessageData and
> PostEvent functionality inside the StreamInterface itself.
>
> This affects mostly tests but OpenSSLStreamAdapter
> requires special attention.
>
> Bug: webrtc:11988
> Change-Id: Ib5c895f1bdf77bb49e3162bd49718f8a98812d91
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185505
> Commit-Queue: Tommi <tommi@webrtc.org>
> Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#32290}

TBR=kwiberg@webrtc.org,tommi@webrtc.org

Change-Id: I23d7a311a73c739eba872a21e6123235465c28cc
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:11988
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/186564
Commit-Queue: Marina Ciocea <marinaciocea@webrtc.org>
Reviewed-by: Marina Ciocea <marinaciocea@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32299}
This commit is contained in:
Marina Ciocea 2020-10-02 20:06:22 +00:00 committed by Commit Bot
parent 389bf0feb8
commit af05c833da
9 changed files with 66 additions and 86 deletions

View File

@ -835,7 +835,6 @@ rtc_library("rtc_base") {
"system:no_unique_address",
"system:rtc_export",
"task_utils:pending_task_safety_flag",
"task_utils:repeating_task",
"task_utils:to_queued_task",
"third_party/base64",
"third_party/sigslot",
@ -1426,7 +1425,6 @@ if (rtc_include_tests) {
"memory:fifo_buffer",
"synchronization:mutex",
"synchronization:synchronization_unittests",
"task_utils:pending_task_safety_flag",
"task_utils:to_queued_task",
"third_party/sigslot",
]

View File

@ -20,15 +20,12 @@ rtc_library("aligned_malloc") {
deps = [ "..:checks" ]
}
# Test only utility.
# TODO: Tag with `testonly = true` once all depending targets are correctly
# tagged.
rtc_library("fifo_buffer") {
visibility = [
":unittests",
"../../p2p:rtc_p2p",
"..:rtc_base_tests_utils",
"..:rtc_base_unittests",
"../../p2p:rtc_p2p", # This needs to be fixed.
":unittests",
]
sources = [
"fifo_buffer.cc",
@ -37,8 +34,6 @@ rtc_library("fifo_buffer") {
deps = [
"..:rtc_base",
"../synchronization:mutex",
"../task_utils:pending_task_safety_flag",
"../task_utils:to_queued_task",
]
}

View File

@ -104,7 +104,7 @@ StreamResult FifoBuffer::Read(void* buffer,
// if we were full before, and now we're not, post an event
if (!was_writable && copy > 0) {
PostEvent(SE_WRITE, 0);
PostEvent(owner_, SE_WRITE, 0);
}
}
return result;
@ -129,7 +129,7 @@ StreamResult FifoBuffer::Write(const void* buffer,
// if we didn't have any data to read before, and now we do, post an event
if (!was_readable && copy > 0) {
PostEvent(SE_READ, 0);
PostEvent(owner_, SE_READ, 0);
}
}
return result;
@ -155,7 +155,7 @@ void FifoBuffer::ConsumeReadData(size_t size) {
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
if (!was_writable && size > 0) {
PostEvent(SE_WRITE, 0);
PostEvent(owner_, SE_WRITE, 0);
}
}
@ -185,7 +185,7 @@ void FifoBuffer::ConsumeWriteBuffer(size_t size) {
const bool was_readable = (data_length_ > 0);
data_length_ += size;
if (!was_readable && size > 0) {
PostEvent(SE_READ, 0);
PostEvent(owner_, SE_READ, 0);
}
}

View File

@ -15,8 +15,6 @@
#include "rtc_base/stream.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace rtc {
@ -100,12 +98,6 @@ class FifoBuffer final : public StreamInterface {
bool GetWriteRemaining(size_t* size) const;
private:
void PostEvent(int events, int err) {
owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
SignalEvent(this, events, err);
}));
}
// Helper method that implements ReadOffset. Caller must acquire a lock
// when calling this method.
StreamResult ReadOffsetLocked(void* buffer,
@ -122,8 +114,6 @@ class FifoBuffer final : public StreamInterface {
size_t* bytes_written)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
webrtc::ScopedTaskSafety task_safety_;
// keeps the opened/closed state of the stream
StreamState state_ RTC_GUARDED_BY(mutex_);
// the allocated buffer
@ -135,7 +125,7 @@ class FifoBuffer final : public StreamInterface {
// offset to the readable data
size_t read_position_ RTC_GUARDED_BY(mutex_);
// stream callbacks are dispatched on this thread
Thread* const owner_;
Thread* owner_;
// object lock
mutable webrtc::Mutex mutex_;
RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);

View File

@ -35,7 +35,6 @@
#include "rtc_base/openssl_identity.h"
#include "rtc_base/ssl_certificate.h"
#include "rtc_base/stream.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/field_trial.h"
@ -284,7 +283,6 @@ bool ShouldAllowLegacyTLSProtocols() {
OpenSSLStreamAdapter::OpenSSLStreamAdapter(
std::unique_ptr<StreamInterface> stream)
: SSLStreamAdapter(std::move(stream)),
owner_(rtc::Thread::Current()),
state_(SSL_NONE),
role_(SSL_CLIENT),
ssl_read_needs_write_(false),
@ -298,7 +296,6 @@ OpenSSLStreamAdapter::OpenSSLStreamAdapter(
support_legacy_tls_protocols_flag_(ShouldAllowLegacyTLSProtocols()) {}
OpenSSLStreamAdapter::~OpenSSLStreamAdapter() {
timeout_task_.Stop();
Cleanup(0);
}
@ -804,33 +801,6 @@ void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
}
}
void OpenSSLStreamAdapter::PostEvent(int events, int err) {
owner_->PostTask(webrtc::ToQueuedTask(
task_safety_, [this, events, err]() { SignalEvent(this, events, err); }));
}
void OpenSSLStreamAdapter::SetTimeout(int delay_ms) {
// We need to accept 0 delay here as well as >0 delay, because
// DTLSv1_get_timeout seems to frequently return 0 ms.
RTC_DCHECK_GE(delay_ms, 0);
RTC_DCHECK(!timeout_task_.Running());
timeout_task_ = webrtc::RepeatingTaskHandle::DelayedStart(
owner_, webrtc::TimeDelta::Millis(delay_ms),
[flag = task_safety_.flag(), this]() {
if (flag->alive()) {
RTC_DLOG(LS_INFO) << "DTLS timeout expired";
timeout_task_.Stop();
DTLSv1_handle_timeout(ssl_);
ContinueSSL();
} else {
RTC_NOTREACHED();
}
// This callback will never run again (stopped above).
return webrtc::TimeDelta::PlusInfinity();
});
}
int OpenSSLStreamAdapter::BeginSSL() {
RTC_DCHECK(state_ == SSL_CONNECTING);
// The underlying stream has opened.
@ -881,7 +851,7 @@ int OpenSSLStreamAdapter::ContinueSSL() {
RTC_DCHECK(state_ == SSL_CONNECTING);
// Clear the DTLS timer
timeout_task_.Stop();
Thread::Current()->Clear(this, MSG_TIMEOUT);
const int code = (role_ == SSL_CLIENT) ? SSL_connect(ssl_) : SSL_accept(ssl_);
const int ssl_error = SSL_get_error(ssl_, code);
@ -913,7 +883,9 @@ int OpenSSLStreamAdapter::ContinueSSL() {
struct timeval timeout;
if (DTLSv1_get_timeout(ssl_, &timeout)) {
int delay = timeout.tv_sec * 1000 + timeout.tv_usec / 1000;
SetTimeout(delay);
Thread::Current()->PostDelayed(RTC_FROM_HERE, delay, this, MSG_TIMEOUT,
0);
}
} break;
@ -990,7 +962,18 @@ void OpenSSLStreamAdapter::Cleanup(uint8_t alert) {
peer_cert_chain_.reset();
// Clear the DTLS timer
timeout_task_.Stop();
Thread::Current()->Clear(this, MSG_TIMEOUT);
}
void OpenSSLStreamAdapter::OnMessage(Message* msg) {
// Process our own messages and then pass others to the superclass
if (MSG_TIMEOUT == msg->message_id) {
RTC_DLOG(LS_INFO) << "DTLS timeout expired";
DTLSv1_handle_timeout(ssl_);
ContinueSSL();
} else {
StreamInterface::OnMessage(msg);
}
}
SSL_CTX* OpenSSLStreamAdapter::SetupSSLContext() {

View File

@ -26,8 +26,6 @@
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/repeating_task.h"
namespace rtc {
@ -147,8 +145,7 @@ class OpenSSLStreamAdapter final : public SSLStreamAdapter {
SSL_CLOSED // Clean close
};
void PostEvent(int events, int err);
void SetTimeout(int delay_ms);
enum { MSG_TIMEOUT = MSG_MAX + 1 };
// The following three methods return 0 on success and a negative
// error code on failure. The error code may be from OpenSSL or -1
@ -172,6 +169,9 @@ class OpenSSLStreamAdapter final : public SSLStreamAdapter {
void Error(const char* context, int err, uint8_t alert, bool signal);
void Cleanup(uint8_t alert);
// Override MessageHandler
void OnMessage(Message* msg) override;
// Flush the input buffers by reading left bytes (for DTLS)
void FlushInput(unsigned int left);
@ -192,10 +192,6 @@ class OpenSSLStreamAdapter final : public SSLStreamAdapter {
!peer_certificate_digest_value_.empty();
}
rtc::Thread* const owner_;
webrtc::ScopedTaskSafety task_safety_;
webrtc::RepeatingTaskHandle timeout_task_;
SSLState state_;
SSLRole role_;
int ssl_error_code_; // valid when state_ == SSL_ERROR or SSL_CLOSED

View File

@ -26,8 +26,6 @@
#include "rtc_base/ssl_identity.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "test/field_trial.h"
using ::testing::Combine;
@ -216,15 +214,7 @@ class SSLDummyStreamBase : public rtc::StreamInterface,
out_->Close();
}
private:
void PostEvent(int events, int err) {
thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
SignalEvent(this, events, err);
}));
}
webrtc::ScopedTaskSafety task_safety_;
rtc::Thread* const thread_ = rtc::Thread::Current();
protected:
SSLStreamAdapterTestBase* test_base_;
const std::string side_;
rtc::StreamInterface* in_;
@ -286,17 +276,10 @@ class BufferQueueStream : public rtc::StreamInterface {
protected:
void NotifyReadableForTest() { PostEvent(rtc::SE_READ, 0); }
void NotifyWritableForTest() { PostEvent(rtc::SE_WRITE, 0); }
private:
void PostEvent(int events, int err) {
thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
SignalEvent(this, events, err);
}));
}
rtc::Thread* const thread_ = rtc::Thread::Current();
webrtc::ScopedTaskSafety task_safety_;
rtc::BufferQueue buffer_;
};

View File

@ -24,6 +24,7 @@ namespace rtc {
///////////////////////////////////////////////////////////////////////////////
// StreamInterface
///////////////////////////////////////////////////////////////////////////////
StreamInterface::~StreamInterface() {}
StreamResult StreamInterface::WriteAll(const void* data,
size_t data_len,
@ -43,12 +44,29 @@ StreamResult StreamInterface::WriteAll(const void* data,
return result;
}
void StreamInterface::PostEvent(Thread* t, int events, int err) {
t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
new StreamEventData(events, err));
}
void StreamInterface::PostEvent(int events, int err) {
PostEvent(Thread::Current(), events, err);
}
bool StreamInterface::Flush() {
return false;
}
StreamInterface::StreamInterface() {}
void StreamInterface::OnMessage(Message* msg) {
if (MSG_POST_EVENT == msg->message_id) {
StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
SignalEvent(this, pe->events, pe->error);
delete msg->pdata;
}
}
///////////////////////////////////////////////////////////////////////////////
// StreamAdapterInterface
///////////////////////////////////////////////////////////////////////////////

View File

@ -48,9 +48,16 @@ enum StreamResult { SR_ERROR, SR_SUCCESS, SR_BLOCK, SR_EOS };
// SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK
enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 };
class RTC_EXPORT StreamInterface {
struct StreamEventData : public MessageData {
int events, error;
StreamEventData(int ev, int er) : events(ev), error(er) {}
};
class RTC_EXPORT StreamInterface : public MessageHandlerAutoCleanup {
public:
virtual ~StreamInterface() {}
enum { MSG_POST_EVENT = 0xF1F1, MSG_MAX = MSG_POST_EVENT };
~StreamInterface() override;
virtual StreamState GetState() const = 0;
@ -89,6 +96,13 @@ class RTC_EXPORT StreamInterface {
// certain events will be raised in the future.
sigslot::signal3<StreamInterface*, int, int> SignalEvent;
// Like calling SignalEvent, but posts a message to the specified thread,
// which will call SignalEvent. This helps unroll the stack and prevent
// re-entrancy.
void PostEvent(Thread* t, int events, int err);
// Like the aforementioned method, but posts to the current thread.
void PostEvent(int events, int err);
// Return true if flush is successful.
virtual bool Flush();
@ -111,6 +125,9 @@ class RTC_EXPORT StreamInterface {
protected:
StreamInterface();
// MessageHandler Interface
void OnMessage(Message* msg) override;
private:
RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterface);
};