dcsctp: Support lifecycle events in send queue
The send queue is responsible for generating lifecycle events for all messages that are still in the queue. Because, if they are still in the queue, that means that the last fragment of the message hasn't been sent yet (because then it would have been in the retransmission queue instead). And if the last fragment hasn't been sent, the send queue is responsible for generating the `OnLifecycleMessageExpired(/*maybe_sent=*/false)` event. Bug: webrtc:5696 Change-Id: Icd5956d6aa0f392cae54f2a05bd20728d9f7f0a6 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264144 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37419}
This commit is contained in:
parent
d44badf409
commit
4f15246683
@ -119,12 +119,11 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
|
||||
}
|
||||
|
||||
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
||||
TimeMs expires_at,
|
||||
const SendOptions& send_options) {
|
||||
MessageAttributes attributes) {
|
||||
bool was_active = bytes_to_send_in_next_message() > 0;
|
||||
buffered_amount_.Increase(message.payload().size());
|
||||
parent_.total_buffered_amount_.Increase(message.payload().size());
|
||||
items_.emplace_back(std::move(message), expires_at, send_options);
|
||||
items_.emplace_back(std::move(message), std::move(attributes));
|
||||
|
||||
if (!was_active) {
|
||||
scheduler_stream_->MaybeMakeActive();
|
||||
@ -146,19 +145,18 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
||||
// Allocate Message ID and SSN when the first fragment is sent.
|
||||
if (!item.message_id.has_value()) {
|
||||
// Oops, this entire message has already expired. Try the next one.
|
||||
if (item.expires_at <= now) {
|
||||
buffered_amount_.Decrease(item.remaining_size);
|
||||
parent_.total_buffered_amount_.Decrease(item.remaining_size);
|
||||
if (item.attributes.expires_at <= now) {
|
||||
HandleMessageExpired(item);
|
||||
items_.pop_front();
|
||||
continue;
|
||||
}
|
||||
|
||||
MID& mid =
|
||||
item.send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
|
||||
item.attributes.unordered ? next_unordered_mid_ : next_ordered_mid_;
|
||||
item.message_id = mid;
|
||||
mid = MID(*mid + 1);
|
||||
}
|
||||
if (!item.send_options.unordered && !item.ssn.has_value()) {
|
||||
if (!item.attributes.unordered && !item.ssn.has_value()) {
|
||||
item.ssn = next_ssn_;
|
||||
next_ssn_ = SSN(*next_ssn_ + 1);
|
||||
}
|
||||
@ -189,16 +187,11 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
||||
SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)),
|
||||
item.message_id.value(), fsn, ppid,
|
||||
std::move(payload), is_beginning, is_end,
|
||||
item.send_options.unordered));
|
||||
if (item.send_options.max_retransmissions.has_value() &&
|
||||
*item.send_options.max_retransmissions >=
|
||||
std::numeric_limits<MaxRetransmits::UnderlyingType>::min() &&
|
||||
*item.send_options.max_retransmissions <=
|
||||
std::numeric_limits<MaxRetransmits::UnderlyingType>::max()) {
|
||||
chunk.max_retransmissions =
|
||||
MaxRetransmits(*item.send_options.max_retransmissions);
|
||||
}
|
||||
chunk.expires_at = item.expires_at;
|
||||
item.attributes.unordered));
|
||||
chunk.max_retransmissions = item.attributes.max_retransmissions;
|
||||
chunk.expires_at = item.attributes.expires_at;
|
||||
chunk.lifecycle_id =
|
||||
is_end ? item.attributes.lifecycle_id : LifecycleId::NotSet();
|
||||
|
||||
if (is_end) {
|
||||
// The entire message has been sent, and its last data copied to `chunk`,
|
||||
@ -224,15 +217,28 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
void RRSendQueue::OutgoingStream::HandleMessageExpired(
|
||||
OutgoingStream::Item& item) {
|
||||
buffered_amount_.Decrease(item.remaining_size);
|
||||
parent_.total_buffered_amount_.Decrease(item.remaining_size);
|
||||
if (item.attributes.lifecycle_id.IsSet()) {
|
||||
RTC_DLOG(LS_VERBOSE) << "Triggering OnLifecycleMessageExpired("
|
||||
<< item.attributes.lifecycle_id.value() << ", false)";
|
||||
|
||||
parent_.callbacks_.OnLifecycleMessageExpired(item.attributes.lifecycle_id,
|
||||
/*maybe_delivered=*/false);
|
||||
parent_.callbacks_.OnLifecycleEnd(item.attributes.lifecycle_id);
|
||||
}
|
||||
}
|
||||
|
||||
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
|
||||
MID message_id) {
|
||||
bool result = false;
|
||||
if (!items_.empty()) {
|
||||
Item& item = items_.front();
|
||||
if (item.send_options.unordered == unordered &&
|
||||
item.message_id.has_value() && *item.message_id == message_id) {
|
||||
buffered_amount_.Decrease(item.remaining_size);
|
||||
parent_.total_buffered_amount_.Decrease(item.remaining_size);
|
||||
if (item.attributes.unordered == unordered && item.message_id.has_value() &&
|
||||
*item.message_id == message_id) {
|
||||
HandleMessageExpired(item);
|
||||
items_.pop_front();
|
||||
|
||||
// Only partially sent messages are discarded, so if a message was
|
||||
@ -277,8 +283,7 @@ void RRSendQueue::OutgoingStream::Pause() {
|
||||
// the fragments before actually resetting the stream.
|
||||
for (auto it = items_.begin(); it != items_.end();) {
|
||||
if (it->remaining_offset == 0) {
|
||||
buffered_amount_.Decrease(it->remaining_size);
|
||||
parent_.total_buffered_amount_.Decrease(it->remaining_size);
|
||||
HandleMessageExpired(*it);
|
||||
it = items_.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
@ -348,15 +353,23 @@ void RRSendQueue::Add(TimeMs now,
|
||||
RTC_DCHECK(!message.payload().empty());
|
||||
// Any limited lifetime should start counting from now - when the message
|
||||
// has been added to the queue.
|
||||
TimeMs expires_at = TimeMs::InfiniteFuture();
|
||||
if (send_options.lifetime.has_value()) {
|
||||
// `expires_at` is the time when it expires. Which is slightly larger than
|
||||
// the message's lifetime, as the message is alive during its entire
|
||||
// lifetime (which may be zero).
|
||||
expires_at = now + *send_options.lifetime + DurationMs(1);
|
||||
}
|
||||
|
||||
// `expires_at` is the time when it expires. Which is slightly larger than the
|
||||
// message's lifetime, as the message is alive during its entire lifetime
|
||||
// (which may be zero).
|
||||
MessageAttributes attributes = {
|
||||
.unordered = send_options.unordered,
|
||||
.max_retransmissions =
|
||||
send_options.max_retransmissions.has_value()
|
||||
? MaxRetransmits(send_options.max_retransmissions.value())
|
||||
: MaxRetransmits::NoLimit(),
|
||||
.expires_at = send_options.lifetime.has_value()
|
||||
? now + *send_options.lifetime + DurationMs(1)
|
||||
: TimeMs::InfiniteFuture(),
|
||||
.lifecycle_id = send_options.lifecycle_id,
|
||||
};
|
||||
GetOrCreateStreamInfo(message.stream_id())
|
||||
.Add(std::move(message), expires_at, send_options);
|
||||
.Add(std::move(message), std::move(attributes));
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
|
||||
@ -45,6 +45,12 @@ namespace dcsctp {
|
||||
// The send queue may trigger callbacks:
|
||||
// * `OnBufferedAmountLow`, `OnTotalBufferedAmountLow`
|
||||
// These will be triggered as defined in their documentation.
|
||||
// * `OnLifecycleMessageExpired(/*maybe_delivered=*/false)`, `OnLifecycleEnd`
|
||||
// These will be triggered when messages have been expired, abandoned or
|
||||
// discarded from the send queue. If a message is fully produced, meaning
|
||||
// that the last fragment has been produced, the responsibility to send
|
||||
// lifecycle events is then transferred to the retransmission queue, which
|
||||
// is the one asking to produce the message.
|
||||
class RRSendQueue : public SendQueue {
|
||||
public:
|
||||
RRSendQueue(absl::string_view log_prefix,
|
||||
@ -96,6 +102,13 @@ class RRSendQueue : public SendQueue {
|
||||
void RestoreFromState(const DcSctpSocketHandoverState& state);
|
||||
|
||||
private:
|
||||
struct MessageAttributes {
|
||||
IsUnordered unordered;
|
||||
MaxRetransmits max_retransmissions;
|
||||
TimeMs expires_at;
|
||||
LifecycleId lifecycle_id;
|
||||
};
|
||||
|
||||
// Represents a value and a "low threshold" that when the value reaches or
|
||||
// goes under the "low threshold", will trigger `on_threshold_reached`
|
||||
// callback.
|
||||
@ -139,9 +152,7 @@ class RRSendQueue : public SendQueue {
|
||||
StreamID stream_id() const { return scheduler_stream_->stream_id(); }
|
||||
|
||||
// Enqueues a message to this stream.
|
||||
void Add(DcSctpMessage message,
|
||||
TimeMs expires_at,
|
||||
const SendOptions& send_options);
|
||||
void Add(DcSctpMessage message, MessageAttributes attributes);
|
||||
|
||||
// Implementing `StreamScheduler::StreamProducer`.
|
||||
absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
|
||||
@ -208,17 +219,13 @@ class RRSendQueue : public SendQueue {
|
||||
|
||||
// An enqueued message and metadata.
|
||||
struct Item {
|
||||
explicit Item(DcSctpMessage msg,
|
||||
TimeMs expires_at,
|
||||
const SendOptions& send_options)
|
||||
explicit Item(DcSctpMessage msg, MessageAttributes attributes)
|
||||
: message(std::move(msg)),
|
||||
expires_at(expires_at),
|
||||
send_options(send_options),
|
||||
attributes(std::move(attributes)),
|
||||
remaining_offset(0),
|
||||
remaining_size(message.payload().size()) {}
|
||||
DcSctpMessage message;
|
||||
TimeMs expires_at;
|
||||
SendOptions send_options;
|
||||
MessageAttributes attributes;
|
||||
// The remaining payload (offset and size) to be sent, when it has been
|
||||
// fragmented.
|
||||
size_t remaining_offset;
|
||||
@ -232,6 +239,7 @@ class RRSendQueue : public SendQueue {
|
||||
};
|
||||
|
||||
bool IsConsistent() const;
|
||||
void HandleMessageExpired(OutgoingStream::Item& item);
|
||||
|
||||
RRSendQueue& parent_;
|
||||
|
||||
|
||||
@ -812,5 +812,55 @@ TEST_F(RRSendQueueTest, WillSendMessagesByPrio) {
|
||||
EXPECT_FALSE(buf_.Produce(kNow, 1).has_value());
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) {
|
||||
std::vector<uint8_t> payload(kOneFragmentPacketSize);
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload),
|
||||
SendOptions{.lifetime = DurationMs(1000),
|
||||
.lifecycle_id = LifecycleId(1)});
|
||||
|
||||
EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
|
||||
/*maybe_delivered=*/false));
|
||||
EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
|
||||
EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize)
|
||||
.has_value());
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) {
|
||||
std::vector<uint8_t> payload(120);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
|
||||
SendOptions{.lifecycle_id = LifecycleId(1)});
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
|
||||
SendOptions{.lifecycle_id = LifecycleId(2)});
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
|
||||
|
||||
EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(2),
|
||||
/*maybe_delivered=*/false));
|
||||
EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(2)));
|
||||
buf_.PrepareResetStream(StreamID(1));
|
||||
EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) {
|
||||
std::vector<uint8_t> payload(kOneFragmentPacketSize + 20);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
|
||||
SendOptions{.lifecycle_id = LifecycleId(1)});
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_FALSE(chunk_one->data.is_end);
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
|
||||
/*maybe_delivered=*/false));
|
||||
EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
|
||||
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
||||
chunk_one->data.message_id);
|
||||
}
|
||||
} // namespace
|
||||
} // namespace dcsctp
|
||||
|
||||
@ -34,6 +34,10 @@ class SendQueue {
|
||||
// Partial reliability - RFC3758
|
||||
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit();
|
||||
TimeMs expires_at = TimeMs::InfiniteFuture();
|
||||
|
||||
// Lifecycle - set for the last fragment, and `LifecycleId::NotSet()` for
|
||||
// all other fragments.
|
||||
LifecycleId lifecycle_id = LifecycleId::NotSet();
|
||||
};
|
||||
|
||||
virtual ~SendQueue() = default;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user