Introduce StreamInterface::FireEvent for firing stream events

This is a step towards removing StreamInterface::SignalEvent.
Downstream dependency will need to be updated to call FireEvent()
before further changes can land in webrtc.

Bug: webrtc:11943
Change-Id: Ia7d3f1c43fda52b7cf5bfa082aef3f462553cd67
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/347884
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42143}
This commit is contained in:
Tommi 2024-04-22 16:50:40 +02:00 committed by WebRTC LUCI CQ
parent 622ca1a011
commit d200488646
8 changed files with 50 additions and 24 deletions

View File

@ -79,7 +79,7 @@ StreamInterfaceChannel::StreamInterfaceChannel(
rtc::StreamResult StreamInterfaceChannel::Read(rtc::ArrayView<uint8_t> buffer,
size_t& read,
int& error) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
if (state_ == rtc::SS_CLOSED)
return rtc::SR_EOS;
@ -97,7 +97,7 @@ rtc::StreamResult StreamInterfaceChannel::Write(
rtc::ArrayView<const uint8_t> data,
size_t& written,
int& error) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
// Always succeeds, since this is an unreliable transport anyway.
// TODO(zhihuang): Should this block if ice_transport_'s temporarily
// unwritable?
@ -109,7 +109,7 @@ rtc::StreamResult StreamInterfaceChannel::Write(
}
bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
if (packets_.size() > 0) {
RTC_LOG(LS_WARNING) << "Packet already in queue.";
}
@ -121,17 +121,17 @@ bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) {
// packet currently in packets_.
RTC_LOG(LS_ERROR) << "Failed to write packet to queue.";
}
SignalEvent(this, rtc::SE_READ, 0);
FireEvent(rtc::SE_READ, 0);
return ret;
}
rtc::StreamState StreamInterfaceChannel::GetState() const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
return state_;
}
void StreamInterfaceChannel::Close() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
packets_.Clear();
state_ = rtc::SS_CLOSED;
}

View File

@ -58,10 +58,9 @@ class StreamInterfaceChannel : public rtc::StreamInterface {
int& error) override;
private:
RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_;
IceTransportInternal* const ice_transport_; // owned by DtlsTransport
rtc::StreamState state_ RTC_GUARDED_BY(sequence_checker_);
rtc::BufferQueue packets_ RTC_GUARDED_BY(sequence_checker_);
rtc::StreamState state_ RTC_GUARDED_BY(callback_sequence_);
rtc::BufferQueue packets_ RTC_GUARDED_BY(callback_sequence_);
};
// This class provides a DTLS SSLStreamAdapter inside a TransportChannel-style
@ -235,7 +234,7 @@ class DtlsTransport : public DtlsTransportInternal {
// Sets the DTLS state, signaling if necessary.
void set_dtls_state(webrtc::DtlsTransportState state);
webrtc::SequenceChecker thread_checker_;
RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker thread_checker_;
const int component_;
webrtc::DtlsTransportState dtls_state_ = webrtc::DtlsTransportState::kNew;

View File

@ -1446,8 +1446,11 @@ rtc_library("stream") {
deps = [
":buffer",
":checks",
":logging",
":threading",
"../api:array_view",
"../api:sequence_checker",
"system:no_unique_address",
"system:rtc_export",
"third_party/sigslot",
]

View File

@ -78,9 +78,11 @@ class FifoBuffer final : public StreamInterface {
private:
void PostEvent(int events, int err) {
owner_->PostTask(webrtc::SafeTask(
task_safety_.flag(),
[this, events, err]() { SignalEvent(this, events, err); }));
owner_->PostTask(
webrtc::SafeTask(task_safety_.flag(), [this, events, err]() {
RTC_DCHECK_RUN_ON(&callback_sequence_);
FireEvent(events, err);
}));
}
// Helper method that implements Read. Caller must acquire a lock

View File

@ -744,6 +744,7 @@ StreamState OpenSSLStreamAdapter::GetState() const {
void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
int events,
int err) {
RTC_DCHECK_RUN_ON(&callback_sequence_);
int events_to_signal = 0;
int signal_error = 0;
RTC_DCHECK(stream == stream_.get());
@ -800,13 +801,14 @@ void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
if (events_to_signal) {
// Note that the adapter presents itself as the origin of the stream events,
// since users of the adapter may not recognize the adapted object.
SignalEvent(this, events_to_signal, signal_error);
FireEvent(events_to_signal, signal_error);
}
}
void OpenSSLStreamAdapter::PostEvent(int events, int err) {
owner_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
RTC_DCHECK_RUN_ON(&callback_sequence_);
FireEvent(events, err);
}));
}
@ -885,8 +887,9 @@ int OpenSSLStreamAdapter::BeginSSL() {
}
int OpenSSLStreamAdapter::ContinueSSL() {
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DLOG(LS_VERBOSE) << "ContinueSSL";
RTC_DCHECK(state_ == SSL_CONNECTING);
RTC_DCHECK_EQ(state_, SSL_CONNECTING);
// Clear the DTLS timer
timeout_task_.Stop();
@ -911,7 +914,7 @@ int OpenSSLStreamAdapter::ContinueSSL() {
// The caller of ContinueSSL may be the same object listening for these
// events and may not be prepared for reentrancy.
// PostEvent(SE_OPEN | SE_READ | SE_WRITE, 0);
SignalEvent(this, SE_OPEN | SE_READ | SE_WRITE, 0);
FireEvent(SE_OPEN | SE_READ | SE_WRITE, 0);
}
break;
@ -950,13 +953,14 @@ void OpenSSLStreamAdapter::Error(absl::string_view context,
int err,
uint8_t alert,
bool signal) {
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_LOG(LS_WARNING) << "OpenSSLStreamAdapter::Error(" << context << ", "
<< err << ", " << static_cast<int>(alert) << ")";
state_ = SSL_ERROR;
ssl_error_code_ = err;
Cleanup(alert);
if (signal) {
SignalEvent(this, SE_CLOSE, err);
FireEvent(SE_CLOSE, err);
}
}

View File

@ -216,21 +216,25 @@ class SocketStream : public rtc::StreamInterface, public sigslot::has_slots<> {
private:
void OnConnectEvent(rtc::Socket* socket) {
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
SignalEvent(this, rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0);
FireEvent(rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0);
}
void OnReadEvent(rtc::Socket* socket) {
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
SignalEvent(this, rtc::SE_READ, 0);
FireEvent(rtc::SE_READ, 0);
}
void OnWriteEvent(rtc::Socket* socket) {
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
SignalEvent(this, rtc::SE_WRITE, 0);
FireEvent(rtc::SE_WRITE, 0);
}
void OnCloseEvent(rtc::Socket* socket, int err) {
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
SignalEvent(this, rtc::SE_CLOSE, err);
FireEvent(rtc::SE_CLOSE, err);
}
std::unique_ptr<rtc::Socket> socket_;

View File

@ -223,7 +223,8 @@ class SSLDummyStreamBase : public rtc::StreamInterface,
private:
void PostEvent(int events, int err) {
thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
RTC_DCHECK_RUN_ON(&callback_sequence_);
FireEvent(events, err);
}));
}
@ -293,7 +294,8 @@ class BufferQueueStream : public rtc::StreamInterface {
private:
void PostEvent(int events, int err) {
thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
RTC_DCHECK_RUN_ON(&callback_sequence_);
FireEvent(events, err);
}));
}

View File

@ -14,7 +14,9 @@
#include <memory>
#include "api/array_view.h"
#include "api/sequence_checker.h"
#include "rtc_base/buffer.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
@ -121,6 +123,16 @@ class RTC_EXPORT StreamInterface {
protected:
StreamInterface();
// Utility function for derived classes.
void FireEvent(int stream_events, int err) RTC_RUN_ON(&callback_sequence_) {
// TODO(tommi): This is for backwards compatibility only while `SignalEvent`
// is being replaced by `SetEventHandler`.
SignalEvent(this, stream_events, err);
}
RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker callback_sequence_{
webrtc::SequenceChecker::kDetached};
};
} // namespace rtc