From 5bbfb00d1da78a9c5dc437e03d5dc977bfe87890 Mon Sep 17 00:00:00 2001 From: Tommi Date: Sat, 4 Mar 2023 16:47:53 +0100 Subject: [PATCH] Check for sctp open messages on the network thread. Previously we did this on the worker thread, but are transitioning network traffic away from thread hopping and this is one step. Bug: webrtc:11547 Change-Id: Ia6fd6540f31a5383c70bb2bf46695e0ee526c4f7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/296081 Reviewed-by: Harald Alvestrand Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#39478} --- pc/data_channel_controller.cc | 51 +++++++++++++++++++---------------- pc/data_channel_controller.h | 6 ++--- pc/sctp_utils.cc | 4 +-- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index a13066b27c..6338c55b5b 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -114,21 +114,19 @@ void DataChannelController::OnDataReceived( cricket::ReceiveDataParams params; params.sid = channel_id; params.type = type; + + if (HandleOpenMessage_n(params, buffer)) + return; + signaling_thread()->PostTask( [self = weak_factory_.GetWeakPtr(), params, buffer] { if (self) { RTC_DCHECK_RUN_ON(self->signaling_thread()); // TODO(bugs.webrtc.org/11547): The data being received should be - // delivered on the network thread. The way HandleOpenMessage_s works - // right now is that it's called for all types of buffers and operates - // as a selector function. Change this so that it's only called for - // buffers that it should be able to handle. Once we do that, we can - // deliver all other buffers on the network thread (change + // delivered on the network thread (change // SignalDataChannelTransportReceivedData_s to // SignalDataChannelTransportReceivedData_n). - if (!self->HandleOpenMessage_s(params, buffer)) { - self->SignalDataChannelTransportReceivedData_s(params, buffer); - } + self->SignalDataChannelTransportReceivedData_s(params, buffer); } }); } @@ -222,25 +220,32 @@ std::vector DataChannelController::GetDataChannelStats() return stats; } -bool DataChannelController::HandleOpenMessage_s( +bool DataChannelController::HandleOpenMessage_n( const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& buffer) { - if (params.type == DataMessageType::kControl && IsOpenMessage(buffer)) { - // Received OPEN message; parse and signal that a new data channel should - // be created. - std::string label; - InternalDataChannelInit config; - config.id = params.sid; - if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { - RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid " - << params.sid; - return true; - } + if (params.type != DataMessageType::kControl || !IsOpenMessage(buffer)) + return false; + + // Received OPEN message; parse and signal that a new data channel should + // be created. + std::string label; + InternalDataChannelInit config; + config.id = params.sid; + if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { + RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid " + << params.sid; + } else { config.open_handshake_role = InternalDataChannelInit::kAcker; - OnDataChannelOpenMessage(label, config); - return true; + signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), + label = std::move(label), + config = std::move(config)] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->OnDataChannelOpenMessage(label, config); + } + }); } - return false; + return true; } void DataChannelController::OnDataChannelOpenMessage( diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index f2b6425961..aa3f54fb8d 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -113,10 +113,10 @@ class DataChannelController : public SctpDataChannelControllerInterface, config) /* RTC_RUN_ON(signaling_thread()) */; // Parses and handles open messages. Returns true if the message is an open - // message, false otherwise. - bool HandleOpenMessage_s(const cricket::ReceiveDataParams& params, + // message and should be considered to be handled, false otherwise. + bool HandleOpenMessage_n(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& buffer) - RTC_RUN_ON(signaling_thread()); + RTC_RUN_ON(network_thread()); // Called when a valid data channel OPEN message is received. void OnDataChannelOpenMessage(const std::string& label, const InternalDataChannelInit& config) diff --git a/pc/sctp_utils.cc b/pc/sctp_utils.cc index c60e339b08..dc83da4f62 100644 --- a/pc/sctp_utils.cc +++ b/pc/sctp_utils.cc @@ -48,9 +48,9 @@ enum DataChannelPriority { bool IsOpenMessage(const rtc::CopyOnWriteBuffer& payload) { // Format defined at - // http://tools.ietf.org/html/draft-jesup-rtcweb-data-protocol-04 + // https://www.rfc-editor.org/rfc/rfc8832#section-5.1 if (payload.size() < 1) { - RTC_LOG(LS_WARNING) << "Could not read OPEN message type."; + RTC_DLOG(LS_WARNING) << "Could not read OPEN message type."; return false; }