diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 37c6974c3a..4bfbaf718b 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -27,28 +27,30 @@ namespace dcsctp { -RRSendQueue::OutgoingStream::Item* -RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) { +bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { while (!items_.empty()) { RRSendQueue::OutgoingStream::Item& item = items_.front(); - // An entire item can be discarded iff: - // 1) It hasn't been partially sent (has been allocated a message_id). - // 2) It has a non-negative expiry time. - // 3) And that expiry time has passed. - if (!item.message_id.has_value() && item.expires_at.has_value() && - *item.expires_at <= now) { - // TODO(boivie): This should be reported to the client. + if (item.message_id.has_value()) { + // Already partially sent messages can always continue to be sent. + return true; + } + + // Message has expired. Remove it and inspect the next one. + if (item.expires_at.has_value() && *item.expires_at <= now) { buffered_amount_.Decrease(item.remaining_size); total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); + RTC_DCHECK(IsConsistent()); continue; } - RTC_DCHECK(IsConsistent()); - return &item; + if (is_paused_) { + // The stream has paused (and there is no partially sent message). + return false; + } + return true; } - RTC_DCHECK(IsConsistent()); - return nullptr; + return false; } bool RRSendQueue::IsConsistent() const { @@ -56,6 +58,23 @@ bool RRSendQueue::IsConsistent() const { for (const auto& stream_entry : streams_) { total_buffered_amount += stream_entry.second.buffered_amount().value(); } + + if (previous_message_has_ended_) { + auto it = streams_.find(current_stream_id_); + if (it != streams_.end() && it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has ended, but still partial message in stream"; + return false; + } + } else { + auto it = streams_.find(current_stream_id_); + if (it == streams_.end() || !it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has NOT ended, but there is no partial message"; + return false; + } + } + return total_buffered_amount == total_buffered_amount_.value(); } @@ -98,19 +117,9 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, absl::optional RRSendQueue::OutgoingStream::Produce( TimeMs now, size_t max_size) { - Item* item = GetFirstNonExpiredMessage(now); - if (item == nullptr) { - RTC_DCHECK(IsConsistent()); - return absl::nullopt; - } - - // If a stream is paused, it will allow sending all partially sent messages - // but will not start sending new fragments of completely unsent messages. - if (is_paused_ && !item->message_id.has_value()) { - RTC_DCHECK(IsConsistent()); - return absl::nullopt; - } + RTC_DCHECK(!items_.empty()); + Item* item = &items_.front(); DcSctpMessage& message = item->message; if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) { @@ -269,50 +278,79 @@ bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } -absl::optional RRSendQueue::Produce( - std::map::iterator it, - TimeMs now, - size_t max_size) { - absl::optional data = it->second.Produce(now, max_size); - if (data.has_value()) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of " - << data->data.size() << " bytes (max: " << max_size - << ")"; +std::map::iterator +RRSendQueue::GetNextStream(TimeMs now) { + auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1)); - if (data->data.is_end) { - // No more fragments. Continue with the next stream next time. - next_stream_id_ = StreamID(*it->first + 1); - } - } - RTC_DCHECK(IsConsistent()); - return data; -} - -absl::optional RRSendQueue::Produce(TimeMs now, - size_t max_size) { - auto start_it = streams_.lower_bound(next_stream_id_); for (auto it = start_it; it != streams_.end(); ++it) { - next_stream_id_ = it->first; - absl::optional ret = Produce(it, now, max_size); - if (ret.has_value()) { - return ret; + if (it->second.HasDataToSend(now)) { + current_stream_id_ = it->first; + return it; } } for (auto it = streams_.begin(); it != start_it; ++it) { - next_stream_id_ = it->first; - absl::optional ret = Produce(it, now, max_size); - if (ret.has_value()) { - return ret; + if (it->second.HasDataToSend(now)) { + current_stream_id_ = it->first; + return it; } } - return absl::nullopt; + return streams_.end(); +} + +absl::optional RRSendQueue::Produce(TimeMs now, + size_t max_size) { + std::map::iterator stream_it; + + if (previous_message_has_ended_) { + // Previous message has ended. Round-robin to a different stream, if there + // even is one with data to send. + stream_it = GetNextStream(now); + if (stream_it == streams_.end()) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "There is no stream with data; Can't produce any data."; + return absl::nullopt; + } + } else { + // The previous message has not ended; Continue from the current stream. + stream_it = streams_.find(current_stream_id_); + RTC_DCHECK(stream_it != streams_.end()); + } + + absl::optional data = stream_it->second.Produce(now, max_size); + if (data.has_value()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning + ? "first" + : *data->data.is_end ? "last" : "middle") + << ", stream_id=" << *stream_it->first + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + previous_message_has_ended_ = *data->data.is_end; + } + + RTC_DCHECK(IsConsistent()); + return data; } bool RRSendQueue::Discard(IsUnordered unordered, StreamID stream_id, MID message_id) { - return GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); + bool has_discarded = + GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); + if (has_discarded) { + // Only partially sent messages are discarded, so if a message was + // discarded, then it was the currently sent message. + previous_message_has_ended_ = true; + } + + return has_discarded; } void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { @@ -353,6 +391,7 @@ void RRSendQueue::Reset() { OutgoingStream& stream = stream_entry.second; stream.Reset(); } + previous_message_has_ended_ = true; } size_t RRSendQueue::buffered_amount(StreamID stream_id) const { diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index bd96bb9e8b..3ec45af17d 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -147,6 +147,10 @@ class RRSendQueue : public SendQueue { // Indicates if this stream has a partially sent message in it. bool has_partially_sent_message() const; + // Indicates if the stream has data to send. It will also try to remove any + // expired non-partially sent message. + bool HasDataToSend(TimeMs now); + private: // An enqueued message and metadata. struct Item { @@ -173,8 +177,6 @@ class RRSendQueue : public SendQueue { FSN current_fsn = FSN(0); }; - // Returns the first non-expired message, or nullptr if there isn't one. - Item* GetFirstNonExpiredMessage(TimeMs now); bool IsConsistent() const; // Streams are pause when they are about to be reset. @@ -202,6 +204,9 @@ class RRSendQueue : public SendQueue { TimeMs now, size_t max_size); + // Return the next stream, in round-robin fashion. + std::map::iterator GetNextStream(TimeMs now); + const std::string log_prefix_; const size_t buffer_size_; @@ -216,8 +221,14 @@ class RRSendQueue : public SendQueue { // The total amount of buffer data, for all streams. ThresholdWatcher total_buffered_amount_; - // The next stream to send chunks from. - StreamID next_stream_id_ = StreamID(0); + // Indicates if the previous fragment sent was the end of a message. For + // non-interleaved sending, this means that the next message may come from a + // different stream. If not true, the next fragment must be produced from the + // same stream as last time. + bool previous_message_has_ended_ = true; + + // The current stream to send chunks from. Modified by `GetNextStream`. + StreamID current_stream_id_ = StreamID(0); // All streams, and messages added to those. std::map streams_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 7a031098a0..682c16af0b 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -703,5 +703,40 @@ TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) { EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); } + +TEST_F(RRSendQueueTest, WillStayInStreamWhenOnlySmallFragmentRemaining) { + buf_.Add(kNow, + DcSctpMessage(StreamID(5), kPPID, + std::vector(kOneFragmentPacketSize * 2))); + buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(5)); + EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); + + // Now assume that there will be a lot of previous chunks that need to be + // retransmitted, which fills up the next packet and there is little space + // left in the packet for new chunks. What it should NOT do right now is to + // try to send a message from StreamID 6. And it should not try to send a very + // small fragment from StreamID 5 either. So just skip this one. + EXPECT_FALSE(buf_.Produce(kNow, 8).has_value()); + + // When the next produce request comes with a large buffer to fill, continue + // sending from StreamID 5. + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(5)); + EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); + + // Lastly, produce a message on StreamID 6. + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(6)); + EXPECT_THAT(chunk3.data.payload, SizeIs(1)); + + EXPECT_FALSE(buf_.Produce(kNow, 8).has_value()); +} } // namespace } // namespace dcsctp