pc: Include SCTP queued bytes in buffered_amount

Before this change, calling buffered_amount only included what was
buffered on top of what was already buffered in the SCTP socket. With
the defaults, the SCTP socket can buffer up to 2MB of data (that is not
put on the wire) before the additional external bufferering in
SctpDataChannel will be used. The buffering that I am working on
removing completely.

Until it's removed completely, to avoid the issue reported in
crbug.com/41221056, include the bytes buffered in the SCTP socket to
what is returned when calling RTCDataChannel::buffered_amount.

This means that when this value is zero, it can be safe to know that all
bytes have been sent, but not necessarily acknowledged. And calling
close will not discard any messages.

This is a stopgap solution, but as functional as the proper solution
that removes all additional buffering. Follow-up CLs will merely improve
this solution.

Bug: chromium:41221056
Change-Id: I06edd52188d3bf13a17827381a15a4730722685a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/342520
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41898}
This commit is contained in:
Victor Boivie 2024-03-11 16:43:31 +01:00 committed by WebRTC LUCI CQ
parent 2f3b75d30d
commit fea41f540c
15 changed files with 64 additions and 8 deletions

View File

@ -118,6 +118,8 @@ class DataChannelTransportInterface {
// Note: the default implementation always returns false (as it assumes no one
// has implemented the interface). This default implementation is temporary.
virtual bool IsReadyToSend() const = 0;
virtual size_t buffered_amount(int channel_id) const = 0;
};
} // namespace webrtc

View File

@ -375,6 +375,12 @@ absl::optional<int> DcSctpTransport::max_inbound_streams() const {
return socket_->options().announced_maximum_incoming_streams;
}
size_t DcSctpTransport::buffered_amount(int sid) const {
if (!socket_)
return 0;
return socket_->buffered_amount(dcsctp::StreamID(sid));
}
void DcSctpTransport::set_debug_name_for_testing(const char* debug_name) {
debug_name_ = debug_name;
}

View File

@ -66,6 +66,7 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
int max_message_size() const override;
absl::optional<int> max_outbound_streams() const override;
absl::optional<int> max_inbound_streams() const override;
size_t buffered_amount(int sid) const override;
void set_debug_name_for_testing(const char* debug_name) override;
private:

View File

@ -140,6 +140,8 @@ class SctpTransportInternal {
virtual absl::optional<int> max_outbound_streams() const = 0;
// Returns the current negotiated max # of inbound streams.
virtual absl::optional<int> max_inbound_streams() const = 0;
// Returns the amount of buffered data in the send queue for a stream.
virtual size_t buffered_amount(int sid) const = 0;
// Helper for debugging.
virtual void set_debug_name_for_testing(const char* debug_name) = 0;

View File

@ -89,6 +89,14 @@ void DataChannelController::OnChannelStateChanged(
}));
}
size_t DataChannelController::buffered_amount(StreamId sid) const {
RTC_DCHECK_RUN_ON(network_thread());
if (!data_channel_transport_) {
return 0;
}
return data_channel_transport_->buffered_amount(sid.stream_id_int());
}
void DataChannelController::OnDataReceived(
int channel_id,
DataMessageType type,

View File

@ -54,6 +54,7 @@ class DataChannelController : public SctpDataChannelControllerInterface,
void RemoveSctpDataStream(StreamId sid) override;
void OnChannelStateChanged(SctpDataChannel* channel,
DataChannelInterface::DataState state) override;
size_t buffered_amount(StreamId sid) const override;
// Implements DataChannelSink.
void OnDataReceived(int channel_id,

View File

@ -41,6 +41,7 @@ class MockDataChannelTransport : public DataChannelTransportInterface {
MOCK_METHOD(RTCError, CloseChannel, (int channel_id), (override));
MOCK_METHOD(void, SetDataSink, (DataChannelSink * sink), (override));
MOCK_METHOD(bool, IsReadyToSend, (), (const, override));
MOCK_METHOD(size_t, buffered_amount, (int channel_id), (const, override));
};
// Convenience class for tests to ensure that shutdown methods for DCC
@ -167,6 +168,20 @@ TEST_F(DataChannelControllerTest, MaxChannels) {
}
}
TEST_F(DataChannelControllerTest, BufferedAmountIncludesFromTransport) {
NiceMock<MockDataChannelTransport> transport;
EXPECT_CALL(transport, buffered_amount(0)).WillOnce(Return(4711));
ON_CALL(*pc_, GetSctpSslRole_n).WillByDefault([&]() {
return rtc::SSL_CLIENT;
});
DataChannelControllerForTest dcc(pc_.get(), &transport);
auto dc = dcc.InternalCreateDataChannelWithProxy(
"label", InternalDataChannelInit(DataChannelInit()))
.MoveValue();
EXPECT_EQ(dc->buffered_amount(), 4711u);
}
// Test that while a data channel is in the `kClosing` state, its StreamId does
// not get re-used for new channels. Only once the state reaches `kClosed`
// should a StreamId be available again for allocation.

View File

@ -1042,14 +1042,16 @@ TEST_P(DataChannelIntegrationTest,
kDefaultTimeout);
// Cause a temporary network outage
virtual_socket_server()->set_drop_probability(1.0);
// Fill the buffer until queued data starts to build
// Fill the SCTP socket buffer until queued data starts to build.
constexpr size_t kBufferedDataInSctpSocket = 2'000'000;
size_t packet_counter = 0;
while (caller()->data_channel()->buffered_amount() < 1 &&
while (caller()->data_channel()->buffered_amount() <
kBufferedDataInSctpSocket &&
packet_counter < 10000) {
packet_counter++;
caller()->data_channel()->Send(DataBuffer("Sent while blocked"));
}
if (caller()->data_channel()->buffered_amount()) {
if (caller()->data_channel()->buffered_amount() > kBufferedDataInSctpSocket) {
RTC_LOG(LS_INFO) << "Buffered data after " << packet_counter << " packets";
} else {
RTC_LOG(LS_INFO) << "No buffered data after " << packet_counter

View File

@ -485,7 +485,11 @@ Priority SctpDataChannel::priority() const {
uint64_t SctpDataChannel::buffered_amount() const {
RTC_DCHECK_RUN_ON(network_thread_);
return queued_send_data_.byte_count();
uint64_t buffered_amount = queued_send_data_.byte_count();
if (controller_ != nullptr && id_n_.has_value()) {
buffered_amount += controller_->buffered_amount(*id_n_);
}
return buffered_amount;
}
void SctpDataChannel::Close() {

View File

@ -55,6 +55,7 @@ class SctpDataChannelControllerInterface {
// Notifies the controller of state changes.
virtual void OnChannelStateChanged(SctpDataChannel* data_channel,
DataChannelInterface::DataState state) = 0;
virtual size_t buffered_amount(StreamId sid) const = 0;
protected:
virtual ~SctpDataChannelControllerInterface() {}

View File

@ -100,6 +100,12 @@ bool SctpTransport::IsReadyToSend() const {
return internal_sctp_transport_->ReadyToSendData();
}
size_t SctpTransport::buffered_amount(int channel_id) const {
RTC_DCHECK_RUN_ON(owner_thread_);
RTC_DCHECK(internal_sctp_transport_);
return internal_sctp_transport_->buffered_amount(channel_id);
}
rtc::scoped_refptr<DtlsTransportInterface> SctpTransport::dtls_transport()
const {
RTC_DCHECK_RUN_ON(owner_thread_);

View File

@ -52,6 +52,7 @@ class SctpTransport : public SctpTransportInterface,
RTCError CloseChannel(int channel_id) override;
void SetDataSink(DataChannelSink* sink) override;
bool IsReadyToSend() const override;
size_t buffered_amount(int channel_id) const override;
// Internal functions
void Clear();

View File

@ -63,6 +63,7 @@ class FakeCricketSctpTransport : public cricket::SctpTransportInternal {
absl::optional<int> max_inbound_streams() const override {
return max_inbound_streams_;
}
size_t buffered_amount(int sid) const override { return 0; }
void SendSignalAssociationChangeCommunicationUp() {
ASSERT_TRUE(on_connected_callback_);
@ -212,5 +213,4 @@ TEST_F(SctpTransportTest, CloseWhenTransportCloses) {
ASSERT_EQ_WAIT(SctpTransportState::kClosed, observer_.State(),
kDefaultTimeout);
}
} // namespace webrtc

View File

@ -128,6 +128,8 @@ class FakeDataChannelController
}
}
size_t buffered_amount(webrtc::StreamId sid) const override { return 0; }
// Set true to emulate the SCTP stream being blocked by congestion control.
void set_send_blocked(bool blocked) {
network_thread_->BlockingCall([&]() {

View File

@ -41,9 +41,14 @@ class FakeSctpTransport : public cricket::SctpTransportInternal {
bool ReadyToSendData() override { return true; }
void set_debug_name_for_testing(const char* debug_name) override {}
int max_message_size() const { return max_message_size_; }
absl::optional<int> max_outbound_streams() const { return absl::nullopt; }
absl::optional<int> max_inbound_streams() const { return absl::nullopt; }
int max_message_size() const override { return max_message_size_; }
absl::optional<int> max_outbound_streams() const override {
return absl::nullopt;
}
absl::optional<int> max_inbound_streams() const override {
return absl::nullopt;
}
size_t buffered_amount(int sid) const override { return 0; }
int local_port() const {
RTC_DCHECK(local_port_);
return *local_port_;