From d200488646d855665d4877d67d45738f5a30e8b6 Mon Sep 17 00:00:00 2001 From: Tommi Date: Mon, 22 Apr 2024 16:50:40 +0200 Subject: [PATCH] Introduce StreamInterface::FireEvent for firing stream events This is a step towards removing StreamInterface::SignalEvent. Downstream dependency will need to be updated to call FireEvent() before further changes can land in webrtc. Bug: webrtc:11943 Change-Id: Ia7d3f1c43fda52b7cf5bfa082aef3f462553cd67 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/347884 Commit-Queue: Tomas Gunnarsson Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/main@{#42143} --- p2p/base/dtls_transport.cc | 12 ++++++------ p2p/base/dtls_transport.h | 7 +++---- rtc_base/BUILD.gn | 3 +++ rtc_base/memory/fifo_buffer.h | 8 +++++--- rtc_base/openssl_stream_adapter.cc | 14 +++++++++----- rtc_base/ssl_adapter_unittest.cc | 12 ++++++++---- rtc_base/ssl_stream_adapter_unittest.cc | 6 ++++-- rtc_base/stream.h | 12 ++++++++++++ 8 files changed, 50 insertions(+), 24 deletions(-) diff --git a/p2p/base/dtls_transport.cc b/p2p/base/dtls_transport.cc index 42353e2c22..f6f6847a96 100644 --- a/p2p/base/dtls_transport.cc +++ b/p2p/base/dtls_transport.cc @@ -79,7 +79,7 @@ StreamInterfaceChannel::StreamInterfaceChannel( rtc::StreamResult StreamInterfaceChannel::Read(rtc::ArrayView buffer, size_t& read, int& error) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(&callback_sequence_); if (state_ == rtc::SS_CLOSED) return rtc::SR_EOS; @@ -97,7 +97,7 @@ rtc::StreamResult StreamInterfaceChannel::Write( rtc::ArrayView data, size_t& written, int& error) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(&callback_sequence_); // Always succeeds, since this is an unreliable transport anyway. // TODO(zhihuang): Should this block if ice_transport_'s temporarily // unwritable? @@ -109,7 +109,7 @@ rtc::StreamResult StreamInterfaceChannel::Write( } bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(&callback_sequence_); if (packets_.size() > 0) { RTC_LOG(LS_WARNING) << "Packet already in queue."; } @@ -121,17 +121,17 @@ bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) { // packet currently in packets_. RTC_LOG(LS_ERROR) << "Failed to write packet to queue."; } - SignalEvent(this, rtc::SE_READ, 0); + FireEvent(rtc::SE_READ, 0); return ret; } rtc::StreamState StreamInterfaceChannel::GetState() const { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(&callback_sequence_); return state_; } void StreamInterfaceChannel::Close() { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(&callback_sequence_); packets_.Clear(); state_ = rtc::SS_CLOSED; } diff --git a/p2p/base/dtls_transport.h b/p2p/base/dtls_transport.h index f479325258..109dbf58c9 100644 --- a/p2p/base/dtls_transport.h +++ b/p2p/base/dtls_transport.h @@ -58,10 +58,9 @@ class StreamInterfaceChannel : public rtc::StreamInterface { int& error) override; private: - RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_; IceTransportInternal* const ice_transport_; // owned by DtlsTransport - rtc::StreamState state_ RTC_GUARDED_BY(sequence_checker_); - rtc::BufferQueue packets_ RTC_GUARDED_BY(sequence_checker_); + rtc::StreamState state_ RTC_GUARDED_BY(callback_sequence_); + rtc::BufferQueue packets_ RTC_GUARDED_BY(callback_sequence_); }; // This class provides a DTLS SSLStreamAdapter inside a TransportChannel-style @@ -235,7 +234,7 @@ class DtlsTransport : public DtlsTransportInternal { // Sets the DTLS state, signaling if necessary. void set_dtls_state(webrtc::DtlsTransportState state); - webrtc::SequenceChecker thread_checker_; + RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker thread_checker_; const int component_; webrtc::DtlsTransportState dtls_state_ = webrtc::DtlsTransportState::kNew; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index c5828633e8..69b434394f 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -1446,8 +1446,11 @@ rtc_library("stream") { deps = [ ":buffer", ":checks", + ":logging", ":threading", "../api:array_view", + "../api:sequence_checker", + "system:no_unique_address", "system:rtc_export", "third_party/sigslot", ] diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h index a225c688ac..658a3c7df1 100644 --- a/rtc_base/memory/fifo_buffer.h +++ b/rtc_base/memory/fifo_buffer.h @@ -78,9 +78,11 @@ class FifoBuffer final : public StreamInterface { private: void PostEvent(int events, int err) { - owner_->PostTask(webrtc::SafeTask( - task_safety_.flag(), - [this, events, err]() { SignalEvent(this, events, err); })); + owner_->PostTask( + webrtc::SafeTask(task_safety_.flag(), [this, events, err]() { + RTC_DCHECK_RUN_ON(&callback_sequence_); + FireEvent(events, err); + })); } // Helper method that implements Read. Caller must acquire a lock diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc index e2c242bf0e..357510ba10 100644 --- a/rtc_base/openssl_stream_adapter.cc +++ b/rtc_base/openssl_stream_adapter.cc @@ -744,6 +744,7 @@ StreamState OpenSSLStreamAdapter::GetState() const { void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream, int events, int err) { + RTC_DCHECK_RUN_ON(&callback_sequence_); int events_to_signal = 0; int signal_error = 0; RTC_DCHECK(stream == stream_.get()); @@ -800,13 +801,14 @@ void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream, if (events_to_signal) { // Note that the adapter presents itself as the origin of the stream events, // since users of the adapter may not recognize the adapted object. - SignalEvent(this, events_to_signal, signal_error); + FireEvent(events_to_signal, signal_error); } } void OpenSSLStreamAdapter::PostEvent(int events, int err) { owner_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() { - SignalEvent(this, events, err); + RTC_DCHECK_RUN_ON(&callback_sequence_); + FireEvent(events, err); })); } @@ -885,8 +887,9 @@ int OpenSSLStreamAdapter::BeginSSL() { } int OpenSSLStreamAdapter::ContinueSSL() { + RTC_DCHECK_RUN_ON(&callback_sequence_); RTC_DLOG(LS_VERBOSE) << "ContinueSSL"; - RTC_DCHECK(state_ == SSL_CONNECTING); + RTC_DCHECK_EQ(state_, SSL_CONNECTING); // Clear the DTLS timer timeout_task_.Stop(); @@ -911,7 +914,7 @@ int OpenSSLStreamAdapter::ContinueSSL() { // The caller of ContinueSSL may be the same object listening for these // events and may not be prepared for reentrancy. // PostEvent(SE_OPEN | SE_READ | SE_WRITE, 0); - SignalEvent(this, SE_OPEN | SE_READ | SE_WRITE, 0); + FireEvent(SE_OPEN | SE_READ | SE_WRITE, 0); } break; @@ -950,13 +953,14 @@ void OpenSSLStreamAdapter::Error(absl::string_view context, int err, uint8_t alert, bool signal) { + RTC_DCHECK_RUN_ON(&callback_sequence_); RTC_LOG(LS_WARNING) << "OpenSSLStreamAdapter::Error(" << context << ", " << err << ", " << static_cast(alert) << ")"; state_ = SSL_ERROR; ssl_error_code_ = err; Cleanup(alert); if (signal) { - SignalEvent(this, SE_CLOSE, err); + FireEvent(SE_CLOSE, err); } } diff --git a/rtc_base/ssl_adapter_unittest.cc b/rtc_base/ssl_adapter_unittest.cc index d0aedb8c35..084594f6b9 100644 --- a/rtc_base/ssl_adapter_unittest.cc +++ b/rtc_base/ssl_adapter_unittest.cc @@ -216,21 +216,25 @@ class SocketStream : public rtc::StreamInterface, public sigslot::has_slots<> { private: void OnConnectEvent(rtc::Socket* socket) { + RTC_DCHECK_RUN_ON(&callback_sequence_); RTC_DCHECK_EQ(socket, socket_.get()); - SignalEvent(this, rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0); + FireEvent(rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0); } void OnReadEvent(rtc::Socket* socket) { + RTC_DCHECK_RUN_ON(&callback_sequence_); RTC_DCHECK_EQ(socket, socket_.get()); - SignalEvent(this, rtc::SE_READ, 0); + FireEvent(rtc::SE_READ, 0); } void OnWriteEvent(rtc::Socket* socket) { + RTC_DCHECK_RUN_ON(&callback_sequence_); RTC_DCHECK_EQ(socket, socket_.get()); - SignalEvent(this, rtc::SE_WRITE, 0); + FireEvent(rtc::SE_WRITE, 0); } void OnCloseEvent(rtc::Socket* socket, int err) { + RTC_DCHECK_RUN_ON(&callback_sequence_); RTC_DCHECK_EQ(socket, socket_.get()); - SignalEvent(this, rtc::SE_CLOSE, err); + FireEvent(rtc::SE_CLOSE, err); } std::unique_ptr socket_; diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc index 338921824d..fc6532c1f0 100644 --- a/rtc_base/ssl_stream_adapter_unittest.cc +++ b/rtc_base/ssl_stream_adapter_unittest.cc @@ -223,7 +223,8 @@ class SSLDummyStreamBase : public rtc::StreamInterface, private: void PostEvent(int events, int err) { thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() { - SignalEvent(this, events, err); + RTC_DCHECK_RUN_ON(&callback_sequence_); + FireEvent(events, err); })); } @@ -293,7 +294,8 @@ class BufferQueueStream : public rtc::StreamInterface { private: void PostEvent(int events, int err) { thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() { - SignalEvent(this, events, err); + RTC_DCHECK_RUN_ON(&callback_sequence_); + FireEvent(events, err); })); } diff --git a/rtc_base/stream.h b/rtc_base/stream.h index e02349aed3..4b2236a86e 100644 --- a/rtc_base/stream.h +++ b/rtc_base/stream.h @@ -14,7 +14,9 @@ #include #include "api/array_view.h" +#include "api/sequence_checker.h" #include "rtc_base/buffer.h" +#include "rtc_base/system/no_unique_address.h" #include "rtc_base/system/rtc_export.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread.h" @@ -121,6 +123,16 @@ class RTC_EXPORT StreamInterface { protected: StreamInterface(); + + // Utility function for derived classes. + void FireEvent(int stream_events, int err) RTC_RUN_ON(&callback_sequence_) { + // TODO(tommi): This is for backwards compatibility only while `SignalEvent` + // is being replaced by `SetEventHandler`. + SignalEvent(this, stream_events, err); + } + + RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker callback_sequence_{ + webrtc::SequenceChecker::kDetached}; }; } // namespace rtc