diff --git a/api/data_channel_interface.h b/api/data_channel_interface.h index 91a9804003..1bd874e936 100644 --- a/api/data_channel_interface.h +++ b/api/data_channel_interface.h @@ -87,7 +87,7 @@ class DataChannelObserver { // A data buffer was successfully received. virtual void OnMessage(const DataBuffer& buffer) = 0; // The data channel's buffered_amount has changed. - virtual void OnBufferedAmountChange(uint64_t previous_amount) {} + virtual void OnBufferedAmountChange(uint64_t sent_data_size) {} protected: virtual ~DataChannelObserver() = default; diff --git a/pc/data_channel.cc b/pc/data_channel.cc index f854defd1b..e4727f25de 100644 --- a/pc/data_channel.cc +++ b/pc/data_channel.cc @@ -127,6 +127,7 @@ DataChannel::DataChannel(DataChannelProviderInterface* provider, bytes_sent_(0), messages_received_(0), bytes_received_(0), + buffered_amount_(0), data_channel_type_(dct), provider_(provider), handshake_state_(kHandshakeInit), @@ -210,7 +211,7 @@ bool DataChannel::reliable() const { } uint64_t DataChannel::buffered_amount() const { - return queued_send_data_.byte_count(); + return buffered_amount_; } void DataChannel::Close() { @@ -224,6 +225,7 @@ void DataChannel::Close() { } bool DataChannel::Send(const DataBuffer& buffer) { + buffered_amount_ += buffer.size(); if (state_ != kOpen) { return false; } @@ -429,6 +431,7 @@ void DataChannel::CloseAbruptly() { // Closing abruptly means any queued data gets thrown away. queued_send_data_.Clear(); + buffered_amount_ = 0; queued_control_data_.Clear(); // Still go to "kClosing" before "kClosed", since observers may be expecting @@ -548,7 +551,6 @@ void DataChannel::SendQueuedDataMessages() { RTC_DCHECK(state_ == kOpen || state_ == kClosing); - uint64_t start_buffered_amount = buffered_amount(); while (!queued_send_data_.Empty()) { std::unique_ptr buffer = queued_send_data_.PopFront(); if (!SendDataMessage(*buffer, false)) { @@ -557,10 +559,6 @@ void DataChannel::SendQueuedDataMessages() { break; } } - - if (observer_ && buffered_amount() < start_buffered_amount) { - observer_->OnBufferedAmountChange(start_buffered_amount); - } } bool DataChannel::SendDataMessage(const DataBuffer& buffer, @@ -591,6 +589,12 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, if (success) { ++messages_sent_; bytes_sent_ += buffer.size(); + + RTC_DCHECK(buffered_amount_ >= buffer.size()); + buffered_amount_ -= buffer.size(); + if (observer_ && buffer.size() > 0) { + observer_->OnBufferedAmountChange(buffer.size()); + } return true; } @@ -614,17 +618,12 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, } bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { - size_t start_buffered_amount = buffered_amount(); - if (start_buffered_amount >= kMaxQueuedSendDataBytes) { + size_t start_buffered_amount = queued_send_data_.byte_count(); + if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) { RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; return false; } queued_send_data_.PushBack(absl::make_unique(buffer)); - - // The buffer can have length zero, in which case there is no change. - if (observer_ && buffered_amount() > start_buffered_amount) { - observer_->OnBufferedAmountChange(start_buffered_amount); - } return true; } diff --git a/pc/data_channel.h b/pc/data_channel.h index a07c4fb57b..fa5c0f5f43 100644 --- a/pc/data_channel.h +++ b/pc/data_channel.h @@ -269,6 +269,9 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { uint64_t bytes_sent_; uint32_t messages_received_; uint64_t bytes_received_; + // Number of bytes of data that have been queued using Send(). Increased + // before each transport send and decreased after each successful send. + uint64_t buffered_amount_; cricket::DataChannelType data_channel_type_; DataChannelProviderInterface* provider_; HandshakeState handshake_state_; diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc index 4aa9dee787..7ce40fb088 100644 --- a/pc/data_channel_unittest.cc +++ b/pc/data_channel_unittest.cc @@ -163,9 +163,11 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { SetChannelReady(); webrtc::DataBuffer buffer("abcd"); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); + size_t successful_send_count = 1; EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(successful_send_count, + observer_->on_buffered_amount_change_count()); provider_->set_send_blocked(true); @@ -175,7 +177,13 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { } EXPECT_EQ(buffer.data.size() * number_of_packets, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(rtc::checked_cast(number_of_packets), + EXPECT_EQ(successful_send_count, + observer_->on_buffered_amount_change_count()); + + provider_->set_send_blocked(false); + successful_send_count += number_of_packets; + EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(successful_send_count, observer_->on_buffered_amount_change_count()); } @@ -188,12 +196,12 @@ TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { provider_->set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); provider_->set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); } // Tests that no crash when the channel is blocked right away while trying to @@ -204,18 +212,18 @@ TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { webrtc::DataBuffer buffer("abcd"); provider_->set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); // Set channel ready while it is still blocked. SetChannelReady(); EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); // Unblock the channel to send queued data again, there should be no crash. provider_->set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); } // Tests that DataChannel::messages_sent() and DataChannel::bytes_sent() are