diff --git a/media/BUILD.gn b/media/BUILD.gn index 2a8f28a4d4..794f7a1e32 100644 --- a/media/BUILD.gn +++ b/media/BUILD.gn @@ -721,6 +721,7 @@ if (rtc_build_dcsctp) { ":rtc_data_sctp_transport_internal", "../api:array_view", "../api/environment", + "../api:libjingle_peerconnection_api", "../api/task_queue:pending_task_safety_flag", "../api/task_queue:task_queue", "../net/dcsctp/public:factory", diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc index 53a535f245..99ecc94a68 100644 --- a/media/sctp/dcsctp_transport.cc +++ b/media/sctp/dcsctp_transport.cc @@ -19,6 +19,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/data_channel_interface.h" #include "api/environment/environment.h" #include "media/base/media_channel.h" #include "net/dcsctp/public/dcsctp_socket_factory.h" @@ -192,6 +193,10 @@ bool DcSctpTransport::Start(int local_sctp_port, // Don't close the connection automatically on too many retransmissions. options.max_retransmissions = absl::nullopt; options.max_init_retransmits = absl::nullopt; + options.per_stream_send_queue_limit = + DataChannelInterface::MaxSendQueueSize(); + // This is just set to avoid denial-of-service. Practically unlimited. + options.max_send_buffer_size = std::numeric_limits::max(); std::unique_ptr packet_observer; if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) { diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc index a7d362728a..ac229d6f9b 100644 --- a/pc/data_channel_unittest.cc +++ b/pc/data_channel_unittest.cc @@ -222,266 +222,6 @@ TEST_F(SctpDataChannelTest, StateTransition) { EXPECT_FALSE(controller_->IsConnected(inner_channel_.get())); } -// Tests that DataChannel::buffered_amount() is correct after the channel is -// blocked. -TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { - AddObserver(); - SetChannelReady(); - DataBuffer buffer("abcd"); - size_t successful_sends = 0; - auto send_complete = [&](RTCError err) { - EXPECT_TRUE(err.ok()); - ++successful_sends; - }; - channel_->SendAsync(buffer, send_complete); - FlushNetworkThreadAndPendingOperations(); - EXPECT_EQ(channel_->buffered_amount(), 0u); - size_t successful_send_count = 1; - EXPECT_EQ(successful_send_count, successful_sends); - EXPECT_EQ(successful_send_count, - observer_->on_buffered_amount_change_count()); - - controller_->set_send_blocked(true); - const int number_of_packets = 3; - for (int i = 0; i < number_of_packets; ++i) { - channel_->SendAsync(buffer, send_complete); - ++successful_send_count; - } - FlushNetworkThreadAndPendingOperations(); - EXPECT_EQ(buffer.data.size() * number_of_packets, - channel_->buffered_amount()); - EXPECT_EQ(successful_send_count, successful_sends); - - // An event should not have been fired for buffered amount. - EXPECT_EQ(1u, observer_->on_buffered_amount_change_count()); - - // Now buffered amount events should get fired and the value - // get down to 0u. - controller_->set_send_blocked(false); - run_loop_.Flush(); - EXPECT_EQ(channel_->buffered_amount(), 0u); - EXPECT_EQ(successful_send_count, successful_sends); - EXPECT_EQ(successful_send_count, - observer_->on_buffered_amount_change_count()); -} - -// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. -TEST_F(SctpDataChannelTest, DeprecatedBufferedAmountWhenBlocked) { - AddObserver(); - SetChannelReady(); - DataBuffer buffer("abcd"); - EXPECT_TRUE(channel_->Send(buffer)); - size_t successful_send_count = 1; - - run_loop_.Flush(); - EXPECT_EQ(0U, channel_->buffered_amount()); - EXPECT_EQ(successful_send_count, - observer_->on_buffered_amount_change_count()); - - controller_->set_send_blocked(true); - - const int number_of_packets = 3; - for (int i = 0; i < number_of_packets; ++i) { - EXPECT_TRUE(channel_->Send(buffer)); - } - EXPECT_EQ(buffer.data.size() * number_of_packets, - channel_->buffered_amount()); - EXPECT_EQ(successful_send_count, - observer_->on_buffered_amount_change_count()); - - controller_->set_send_blocked(false); - run_loop_.Flush(); - successful_send_count += number_of_packets; - EXPECT_EQ(channel_->buffered_amount(), 0u); - EXPECT_EQ(successful_send_count, - observer_->on_buffered_amount_change_count()); -} - -// Tests that the queued data are sent when the channel transitions from blocked -// to unblocked. -TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { - AddObserver(); - SetChannelReady(); - DataBuffer buffer("abcd"); - controller_->set_send_blocked(true); - size_t successful_send = 0u; - auto send_complete = [&](RTCError err) { - EXPECT_TRUE(err.ok()); - ++successful_send; - }; - channel_->SendAsync(buffer, send_complete); - FlushNetworkThreadAndPendingOperations(); - EXPECT_EQ(1U, successful_send); - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); - - controller_->set_send_blocked(false); - SetChannelReady(); - EXPECT_EQ(channel_->buffered_amount(), 0u); - EXPECT_EQ(observer_->on_buffered_amount_change_count(), 1u); -} - -// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. -TEST_F(SctpDataChannelTest, DeprecatedQueuedDataSentWhenUnblocked) { - AddObserver(); - SetChannelReady(); - DataBuffer buffer("abcd"); - controller_->set_send_blocked(true); - EXPECT_TRUE(channel_->Send(buffer)); - - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); - - controller_->set_send_blocked(false); - SetChannelReady(); - EXPECT_EQ(0U, channel_->buffered_amount()); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); -} - -// Tests that no crash when the channel is blocked right away while trying to -// send queued data. -TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { - AddObserver(); - SetChannelReady(); - DataBuffer buffer("abcd"); - controller_->set_send_blocked(true); - size_t successful_send = 0u; - auto send_complete = [&](RTCError err) { - EXPECT_TRUE(err.ok()); - ++successful_send; - }; - channel_->SendAsync(buffer, send_complete); - FlushNetworkThreadAndPendingOperations(); - EXPECT_EQ(1U, successful_send); - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); - - // Set channel ready while it is still blocked. - SetChannelReady(); - EXPECT_EQ(buffer.size(), channel_->buffered_amount()); - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); - - // Unblock the channel to send queued data again, there should be no crash. - controller_->set_send_blocked(false); - SetChannelReady(); - EXPECT_EQ(0U, channel_->buffered_amount()); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); -} - -// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. -TEST_F(SctpDataChannelTest, DeprecatedBlockedWhenSendQueuedDataNoCrash) { - AddObserver(); - SetChannelReady(); - DataBuffer buffer("abcd"); - controller_->set_send_blocked(true); - EXPECT_TRUE(channel_->Send(buffer)); - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); - - // Set channel ready while it is still blocked. - SetChannelReady(); - EXPECT_EQ(buffer.size(), channel_->buffered_amount()); - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); - - // Unblock the channel to send queued data again, there should be no crash. - controller_->set_send_blocked(false); - SetChannelReady(); - EXPECT_EQ(0U, channel_->buffered_amount()); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); -} - -// Tests that DataChannel::messages_sent() and DataChannel::bytes_sent() are -// correct, sending data both while unblocked and while blocked. -TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesSent) { - AddObserver(); - SetChannelReady(); - std::vector buffers({ - DataBuffer("message 1"), - DataBuffer("msg 2"), - DataBuffer("message three"), - DataBuffer("quadra message"), - DataBuffer("fifthmsg"), - DataBuffer("message of the beast"), - }); - - // Default values. - EXPECT_EQ(0U, channel_->messages_sent()); - EXPECT_EQ(0U, channel_->bytes_sent()); - - // Send three buffers while not blocked. - controller_->set_send_blocked(false); - for (int i : {0, 1, 2}) { - channel_->SendAsync(buffers[i], nullptr); - } - FlushNetworkThreadAndPendingOperations(); - - size_t bytes_sent = buffers[0].size() + buffers[1].size() + buffers[2].size(); - EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout); - EXPECT_EQ(3U, channel_->messages_sent()); - EXPECT_EQ(bytes_sent, channel_->bytes_sent()); - - // Send three buffers while blocked, queuing the buffers. - controller_->set_send_blocked(true); - for (int i : {3, 4, 5}) { - channel_->SendAsync(buffers[i], nullptr); - } - FlushNetworkThreadAndPendingOperations(); - size_t bytes_queued = - buffers[3].size() + buffers[4].size() + buffers[5].size(); - EXPECT_EQ(bytes_queued, channel_->buffered_amount()); - EXPECT_EQ(3U, channel_->messages_sent()); - EXPECT_EQ(bytes_sent, channel_->bytes_sent()); - - // Unblock and make sure everything was sent. - controller_->set_send_blocked(false); - EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout); - bytes_sent += bytes_queued; - EXPECT_EQ(6U, channel_->messages_sent()); - EXPECT_EQ(bytes_sent, channel_->bytes_sent()); -} - -// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. -TEST_F(SctpDataChannelTest, DeprecatedVerifyMessagesAndBytesSent) { - AddObserver(); - SetChannelReady(); - std::vector buffers({ - DataBuffer("message 1"), - DataBuffer("msg 2"), - DataBuffer("message three"), - DataBuffer("quadra message"), - DataBuffer("fifthmsg"), - DataBuffer("message of the beast"), - }); - - // Default values. - EXPECT_EQ(0U, channel_->messages_sent()); - EXPECT_EQ(0U, channel_->bytes_sent()); - - // Send three buffers while not blocked. - controller_->set_send_blocked(false); - EXPECT_TRUE(channel_->Send(buffers[0])); - EXPECT_TRUE(channel_->Send(buffers[1])); - EXPECT_TRUE(channel_->Send(buffers[2])); - size_t bytes_sent = buffers[0].size() + buffers[1].size() + buffers[2].size(); - EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout); - EXPECT_EQ(3U, channel_->messages_sent()); - EXPECT_EQ(bytes_sent, channel_->bytes_sent()); - - // Send three buffers while blocked, queuing the buffers. - controller_->set_send_blocked(true); - EXPECT_TRUE(channel_->Send(buffers[3])); - EXPECT_TRUE(channel_->Send(buffers[4])); - EXPECT_TRUE(channel_->Send(buffers[5])); - size_t bytes_queued = - buffers[3].size() + buffers[4].size() + buffers[5].size(); - EXPECT_EQ(bytes_queued, channel_->buffered_amount()); - EXPECT_EQ(3U, channel_->messages_sent()); - EXPECT_EQ(bytes_sent, channel_->bytes_sent()); - - // Unblock and make sure everything was sent. - controller_->set_send_blocked(false); - EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout); - bytes_sent += bytes_queued; - EXPECT_EQ(6U, channel_->messages_sent()); - EXPECT_EQ(bytes_sent, channel_->bytes_sent()); -} - // Tests that the queued control message is sent when channel is ready. TEST_F(SctpDataChannelTest, OpenMessageSent) { // Initially the id is unassigned. @@ -494,16 +234,6 @@ TEST_F(SctpDataChannelTest, OpenMessageSent) { EXPECT_EQ(controller_->last_sid(), channel_->id()); } -TEST_F(SctpDataChannelTest, QueuedOpenMessageSent) { - controller_->set_send_blocked(true); - SetChannelReady(); - controller_->set_send_blocked(false); - - EXPECT_EQ(DataMessageType::kControl, - controller_->last_send_data_params().type); - EXPECT_EQ(controller_->last_sid(), channel_->id()); -} - // Tests that the DataChannel created after transport gets ready can enter OPEN // state. TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) { @@ -619,56 +349,6 @@ TEST_F(SctpDataChannelTest, DeprecatedSendUnorderedAfterReceiveData) { EXPECT_FALSE(controller_->last_send_data_params().ordered); } -// Tests that the channel can't open until it's successfully sent the OPEN -// message. -TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) { - DataBuffer buffer("foo"); - - controller_->set_send_blocked(true); - SetChannelReady(); - EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state()); - controller_->set_send_blocked(false); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000); - EXPECT_EQ(DataMessageType::kControl, - controller_->last_send_data_params().type); -} - -// Tests that close first makes sure all queued data gets sent. -TEST_F(SctpDataChannelTest, QueuedCloseFlushes) { - DataBuffer buffer("foo"); - - controller_->set_send_blocked(true); - SetChannelReady(); - EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state()); - controller_->set_send_blocked(false); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000); - controller_->set_send_blocked(true); - channel_->SendAsync(buffer, nullptr); - channel_->Close(); - controller_->set_send_blocked(false); - EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), 1000); - EXPECT_TRUE(channel_->error().ok()); - EXPECT_EQ(DataMessageType::kText, controller_->last_send_data_params().type); -} - -// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. -TEST_F(SctpDataChannelTest, DeprecatedQueuedCloseFlushes) { - DataBuffer buffer("foo"); - - controller_->set_send_blocked(true); - SetChannelReady(); - EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state()); - controller_->set_send_blocked(false); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000); - controller_->set_send_blocked(true); - channel_->Send(buffer); - channel_->Close(); - controller_->set_send_blocked(false); - EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), 1000); - EXPECT_TRUE(channel_->error().ok()); - EXPECT_EQ(DataMessageType::kText, controller_->last_send_data_params().type); -} - // Tests that messages are sent with the right id. TEST_F(SctpDataChannelTest, SendDataId) { SetChannelSid(inner_channel_, StreamId(1)); @@ -800,59 +480,6 @@ TEST_F(SctpDataChannelTest, OpenAckRoleInitialization) { EXPECT_EQ(InternalDataChannelInit::kNone, init2.open_handshake_role); } -// Tests that that Send() returns false if the sending buffer is full -// and the channel stays open. -TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) { - AddObserver(); - SetChannelReady(); - - const size_t packetSize = 1024; - - rtc::CopyOnWriteBuffer buffer(packetSize); - memset(buffer.MutableData(), 0, buffer.size()); - - DataBuffer packet(buffer, true); - controller_->set_send_blocked(true); - size_t successful_send = 0u, failed_send = 0u; - auto send_complete = [&](RTCError err) { - err.ok() ? ++successful_send : ++failed_send; - }; - - size_t count = DataChannelInterface::MaxSendQueueSize() / packetSize; - for (size_t i = 0; i < count; ++i) { - channel_->SendAsync(packet, send_complete); - } - - // The sending buffer should be full, `Send()` returns false. - channel_->SendAsync(packet, std::move(send_complete)); - FlushNetworkThreadAndPendingOperations(); - EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state()); - EXPECT_EQ(successful_send, count); - EXPECT_EQ(failed_send, 1u); -} - -// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. -TEST_F(SctpDataChannelTest, DeprecatedOpenWhenSendBufferFull) { - SetChannelReady(); - - const size_t packetSize = 1024; - - rtc::CopyOnWriteBuffer buffer(packetSize); - memset(buffer.MutableData(), 0, buffer.size()); - - DataBuffer packet(buffer, true); - controller_->set_send_blocked(true); - - for (size_t i = 0; i < DataChannelInterface::MaxSendQueueSize() / packetSize; - ++i) { - EXPECT_TRUE(channel_->Send(packet)); - } - - // The sending buffer should be full, `Send()` returns false. - EXPECT_FALSE(channel_->Send(packet)); - EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state()); -} - // Tests that the DataChannel is closed on transport errors. TEST_F(SctpDataChannelTest, ClosedOnTransportError) { SetChannelReady(); @@ -943,12 +570,11 @@ TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) { AddObserver(); SetChannelReady(); - rtc::CopyOnWriteBuffer buffer(1024); + rtc::CopyOnWriteBuffer buffer(100 * 1024); memset(buffer.MutableData(), 0, buffer.size()); DataBuffer packet(buffer, true); - // Send a packet while sending is blocked so it ends up buffered. - controller_->set_send_blocked(true); + // Send a very large packet, forcing the message to become buffered. channel_->SendAsync(packet, nullptr); // Tell the data channel that its transport is being destroyed. @@ -966,33 +592,6 @@ TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) { EXPECT_EQ(RTCErrorDetailType::SCTP_FAILURE, channel_->error().error_detail()); } -// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. -TEST_F(SctpDataChannelTest, DeprecatedTransportDestroyedWhileDataBuffered) { - SetChannelReady(); - - rtc::CopyOnWriteBuffer buffer(1024); - memset(buffer.MutableData(), 0, buffer.size()); - DataBuffer packet(buffer, true); - - // Send a packet while sending is blocked so it ends up buffered. - controller_->set_send_blocked(true); - EXPECT_TRUE(channel_->Send(packet)); - - // Tell the data channel that its transport is being destroyed. - // It should then stop using the transport (allowing us to delete it) and - // transition to the "closed" state. - RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, ""); - error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); - network_thread_.BlockingCall( - [&] { inner_channel_->OnTransportChannelClosed(error); }); - controller_.reset(nullptr); - EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), - kDefaultTimeout); - EXPECT_FALSE(channel_->error().ok()); - EXPECT_EQ(RTCErrorType::OPERATION_ERROR_WITH_DATA, channel_->error().type()); - EXPECT_EQ(RTCErrorDetailType::SCTP_FAILURE, channel_->error().error_detail()); -} - TEST_F(SctpDataChannelTest, TransportGotErrorCode) { SetChannelReady(); diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc index e496382c35..6956cffa75 100644 --- a/pc/sctp_data_channel.cc +++ b/pc/sctp_data_channel.cc @@ -485,11 +485,10 @@ Priority SctpDataChannel::priority() const { uint64_t SctpDataChannel::buffered_amount() const { RTC_DCHECK_RUN_ON(network_thread_); - uint64_t buffered_amount = queued_send_data_.byte_count(); if (controller_ != nullptr && id_n_.has_value()) { - buffered_amount += controller_->buffered_amount(*id_n_); + return controller_->buffered_amount(*id_n_); } - return buffered_amount; + return 0u; } void SctpDataChannel::Close() { @@ -578,20 +577,14 @@ bool SctpDataChannel::Send(const DataBuffer& buffer) { // RTC_RUN_ON(network_thread_); RTCError SctpDataChannel::SendImpl(DataBuffer buffer) { + // The caller increases the cached `bufferedAmount` even if there are errors. + expected_buffer_amount_ += buffer.size(); + if (state_ != kOpen) { error_ = RTCError(RTCErrorType::INVALID_STATE); return error_; } - // If the queue is non-empty, we're waiting for SignalReadyToSend, - // so just add to the end of the queue and keep waiting. - if (!queued_send_data_.Empty()) { - error_ = QueueSendDataMessage(buffer) - ? RTCError::OK() - : RTCError(RTCErrorType::RESOURCE_EXHAUSTED); - return error_; - } - return SendDataMessage(buffer, true); } @@ -629,8 +622,11 @@ void SctpDataChannel::OnClosingProcedureStartedRemotely() { // Don't bother sending queued data since the side that initiated the // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy // discussion about this. - queued_send_data_.Clear(); - queued_control_data_.Clear(); + + // Note that this is handled by the SctpTransport, when an incoming stream + // reset notification comes in, the outgoing stream is closed, which + // discards data. + // Just need to change state to kClosing, SctpTransport will handle the // rest of the closing procedure and OnClosingProcedureComplete will be // called later. @@ -644,7 +640,9 @@ void SctpDataChannel::OnClosingProcedureComplete() { // If the closing procedure is complete, we should have finished sending // all pending data and transitioned to kClosing already. RTC_DCHECK_EQ(state_, kClosing); - RTC_DCHECK(queued_send_data_.Empty()); + if (controller_ && id_n_.has_value()) { + RTC_DCHECK_EQ(controller_->buffered_amount(*id_n_), 0); + } SetState(kClosed); } @@ -664,6 +662,7 @@ void SctpDataChannel::OnTransportChannelClosed(RTCError error) { void SctpDataChannel::OnBufferedAmountLow() { RTC_DCHECK_RUN_ON(network_thread_); + MaybeSendOnBufferedAmountChanged(); } DataChannelStats SctpDataChannel::GetStats() const { @@ -739,9 +738,6 @@ void SctpDataChannel::OnTransportReady() { RTC_DCHECK(connected_to_transport()); RTC_DCHECK(id_n_.has_value()); - SendQueuedControlMessages(); - SendQueuedDataMessages(); - UpdateState(); } @@ -754,10 +750,6 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) { network_safety_->SetNotAlive(); - // Closing abruptly means any queued data gets thrown away. - queued_send_data_.Clear(); - queued_control_data_.Clear(); - // Still go to "kClosing" before "kClosed", since observers may be expecting // that. SetState(kClosing); @@ -810,10 +802,10 @@ void SctpDataChannel::UpdateState() { break; } case kClosing: { - if (connected_to_transport() && controller_) { + if (connected_to_transport() && controller_ && id_n_.has_value()) { // Wait for all queued data to be sent before beginning the closing // procedure. - if (queued_send_data_.Empty() && queued_control_data_.Empty()) { + if (controller_->buffered_amount(*id_n_) == 0) { // For SCTP data channels, we need to wait for the closing procedure // to complete; after calling RemoveSctpDataStream, // OnClosingProcedureComplete will end up called asynchronously @@ -826,8 +818,6 @@ void SctpDataChannel::UpdateState() { } else { // When we're not connected to a transport, we'll transition // directly to the `kClosed` state from here. - queued_send_data_.Clear(); - queued_control_data_.Clear(); SetState(kClosed); } break; @@ -866,22 +856,50 @@ void SctpDataChannel::DeliverQueuedReceivedData() { } } -// RTC_RUN_ON(network_thread_). -void SctpDataChannel::SendQueuedDataMessages() { - if (queued_send_data_.Empty()) { +// RTC_RUN_ON(network_thread_) +void SctpDataChannel::MaybeSendOnBufferedAmountChanged() { + // The `buffered_amount` in the signaling thread (RTCDataChannel in Blink) + // has a cached variant of the SCTP socket's buffered_amount, which it + // increases for every data sent and decreased when `OnBufferedAmountChange` + // is sent. + // + // To ensure it's consistent, this object maintains its own view of that value + // and if it changes with a reasonable amount (10kb, or down to zero), send + // the `OnBufferedAmountChange` to update the caller's cached variable. + if (!controller_ || !id_n_.has_value() || !observer_) { return; } - RTC_DCHECK(state_ == kOpen || state_ == kClosing); - - while (!queued_send_data_.Empty()) { - std::unique_ptr buffer = queued_send_data_.PopFront(); - if (!SendDataMessage(*buffer, false).ok()) { - // Return the message to the front of the queue if sending is aborted. - queued_send_data_.PushFront(std::move(buffer)); - break; - } + // This becomes the resolution of how often the bufferedAmount is updated on + // the signaling thread and exists to avoid doing cross-thread communication + // too often. On benchmarks, Chrome handle around 300Mbps, which with this + // size results in a rate of ~400 updates per second - a reasonable number. + static constexpr int64_t kMinBufferedAmountDiffToTriggerCallback = 100 * 1024; + size_t actual_buffer_amount = controller_->buffered_amount(*id_n_); + if (actual_buffer_amount > expected_buffer_amount_) { + RTC_DLOG(LS_ERROR) << "Actual buffer_amount larger than expected"; + return; } + + // Fire OnBufferedAmountChange to decrease the cached view if it represents a + // big enough change (to reduce the frequency of cross-thread communication), + // or if it reaches zero. + if ((actual_buffer_amount == 0 && expected_buffer_amount_ != 0) || + (expected_buffer_amount_ - actual_buffer_amount > + kMinBufferedAmountDiffToTriggerCallback)) { + uint64_t diff = expected_buffer_amount_ - actual_buffer_amount; + expected_buffer_amount_ = actual_buffer_amount; + observer_->OnBufferedAmountChange(diff); + } + + // The threshold is always updated to ensure it's lower than what it's now. + // This ensures that this function will be called again, until the channel is + // completely drained. + controller_->SetBufferedAmountLowThreshold( + *id_n_, + actual_buffer_amount > kMinBufferedAmountDiffToTriggerCallback + ? actual_buffer_amount - kMinBufferedAmountDiffToTriggerCallback + : 0); } // RTC_RUN_ON(network_thread_). @@ -908,25 +926,13 @@ RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer, buffer.binary ? DataMessageType::kBinary : DataMessageType::kText; error_ = controller_->SendData(*id_n_, send_params, buffer.data); + MaybeSendOnBufferedAmountChanged(); if (error_.ok()) { ++messages_sent_; bytes_sent_ += buffer.size(); - - if (observer_ && buffer.size() > 0) { - observer_->OnBufferedAmountChange(buffer.size()); - } return error_; } - if (error_.type() == RTCErrorType::RESOURCE_EXHAUSTED) { - if (!queue_if_blocked) - return error_; - - if (QueueSendDataMessage(buffer)) { - error_ = RTCError::OK(); - return error_; - } - } // Close the channel if the error is not SDR_BLOCK, or if queuing the // message failed. RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, " @@ -938,30 +944,6 @@ RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer, return error_; } -// RTC_RUN_ON(network_thread_). -bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) { - size_t start_buffered_amount = queued_send_data_.byte_count(); - if (start_buffered_amount + buffer.size() > - DataChannelInterface::MaxSendQueueSize()) { - RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; - error_ = RTCError(RTCErrorType::RESOURCE_EXHAUSTED); - return false; - } - queued_send_data_.PushBack(std::make_unique(buffer)); - return true; -} - -// RTC_RUN_ON(network_thread_). -void SctpDataChannel::SendQueuedControlMessages() { - PacketQueue control_packets; - control_packets.Swap(&queued_control_data_); - - while (!control_packets.Empty()) { - std::unique_ptr buf = control_packets.PopFront(); - SendControlMessage(buf->data); - } -} - // RTC_RUN_ON(network_thread_). bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { RTC_DCHECK(connected_to_transport()); @@ -988,8 +970,6 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { } else if (handshake_state_ == kHandshakeShouldSendOpen) { handshake_state_ = kHandshakeWaitingForAck; } - } else if (err.type() == RTCErrorType::RESOURCE_EXHAUSTED) { - queued_control_data_.PushBack(std::make_unique(buffer, true)); } else { RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send" " the CONTROL message, send_result = " diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h index fcd088ce04..e1bd461a8e 100644 --- a/pc/sctp_data_channel.h +++ b/pc/sctp_data_channel.h @@ -257,19 +257,16 @@ class SctpDataChannel : public DataChannelInterface { void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_); - void SendQueuedDataMessages() RTC_RUN_ON(network_thread_); RTCError SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) RTC_RUN_ON(network_thread_); - bool QueueSendDataMessage(const DataBuffer& buffer) - RTC_RUN_ON(network_thread_); - void SendQueuedControlMessages() RTC_RUN_ON(network_thread_); bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) RTC_RUN_ON(network_thread_); bool connected_to_transport() const RTC_RUN_ON(network_thread_) { return network_safety_->alive(); } + void MaybeSendOnBufferedAmountChanged() RTC_RUN_ON(network_thread_); rtc::Thread* const signaling_thread_; rtc::Thread* const network_thread_; @@ -283,6 +280,8 @@ class SctpDataChannel : public DataChannelInterface { const absl::optional priority_; const bool negotiated_; const bool ordered_; + // See the body of `MaybeSendOnBufferedAmountChanged`. + size_t expected_buffer_amount_ = 0; DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr; std::unique_ptr observer_adapter_; @@ -298,11 +297,7 @@ class SctpDataChannel : public DataChannelInterface { kHandshakeInit; // Did we already start the graceful SCTP closing procedure? bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false; - // Control messages that always have to get sent out before any queued - // data. - PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_); PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_); - PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_); rtc::scoped_refptr network_safety_ = PendingTaskSafetyFlag::CreateDetachedInactive(); };