From 5df960d3073630c5619e00d79f89937bf6fabd69 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 9 Jun 2022 22:06:35 +0000 Subject: [PATCH] Revert "dcsctp: Use stream scheduler in send queue" This reverts commit d729d12454906d924d5a142deb3432e2d5fa97ae. Reason for revert: Breaks downstream project. Original change's description: > dcsctp: Use stream scheduler in send queue > > Changing the currently embedded scheduler that was implemented using a > revolving pointer, to the parameterized stream scheduler that is > implemented using a "virtual finish time" approach. > > Also renamed StreamCallback to StreamProducer, per review comments. > > Bug: webrtc:5696 > Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160 > Reviewed-by: Harald Alvestrand > Commit-Queue: Victor Boivie > Cr-Commit-Position: refs/heads/main@{#37170} Bug: webrtc:5696 Change-Id: Iaf3608b52a31eb31b4ca604539edb2e8ca89399b No-Presubmit: true No-Tree-Checks: true No-Try: true Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265389 Auto-Submit: Victor Boivie Commit-Queue: Tomas Gunnarsson Bot-Commit: rubber-stamper@appspot.gserviceaccount.com Cr-Commit-Position: refs/heads/main@{#37172} --- net/dcsctp/tx/BUILD.gn | 5 - net/dcsctp/tx/rr_send_queue.cc | 156 ++++++++++++++++--------- net/dcsctp/tx/rr_send_queue.h | 44 ++++--- net/dcsctp/tx/stream_scheduler.cc | 2 +- net/dcsctp/tx/stream_scheduler.h | 16 +-- net/dcsctp/tx/stream_scheduler_test.cc | 86 +++++++------- 6 files changed, 179 insertions(+), 130 deletions(-) diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index ae81db8c67..c2fb12eac8 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -14,7 +14,6 @@ rtc_source_set("send_queue") { "../common:internal_types", "../packet:chunk", "../packet:data", - "../public:socket", "../public:types", ] sources = [ "send_queue.h" ] @@ -24,12 +23,9 @@ rtc_source_set("send_queue") { rtc_library("rr_send_queue") { deps = [ ":send_queue", - ":stream_scheduler", "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", - "../../../rtc_base/containers:flat_map", - "../common:str_join", "../packet:data", "../public:socket", "../public:types", @@ -184,7 +180,6 @@ if (rtc_include_tests) { "../common:sequence_numbers", "../packet:chunk", "../packet:data", - "../packet:sctp_packet", "../public:socket", "../public:types", "../testing:data_generator", diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 09323d2d76..6127da4e77 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -13,14 +13,12 @@ #include #include #include -#include #include #include #include "absl/algorithm/container.h" #include "absl/types/optional.h" #include "api/array_view.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" @@ -44,18 +42,18 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); } -size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const { +bool RRSendQueue::OutgoingStream::HasDataToSend() const { if (pause_state_ == PauseState::kPaused || pause_state_ == PauseState::kResetting) { // The stream has paused (and there is no partially sent message). - return 0; + return false; } if (items_.empty()) { - return 0; + return false; } - return items_.front().remaining_size; + return true; } void RRSendQueue::OutgoingStream::AddHandoverState( @@ -63,30 +61,29 @@ void RRSendQueue::OutgoingStream::AddHandoverState( state.next_ssn = next_ssn_.value(); state.next_ordered_mid = next_ordered_mid_.value(); state.next_unordered_mid = next_unordered_mid_.value(); - state.priority = *scheduler_stream_->priority(); + state.priority = *priority_; } bool RRSendQueue::IsConsistent() const { - std::set expected_active_streams; - std::set actual_active_streams; - size_t total_buffered_amount = 0; - for (const auto& [stream_id, stream] : streams_) { + for (const auto& [unused, stream] : streams_) { total_buffered_amount += stream.buffered_amount().value(); - if (stream.bytes_to_send_in_next_message() > 0) { - expected_active_streams.emplace(stream_id); + } + + if (previous_message_has_ended_) { + auto it = streams_.find(current_stream_id_); + if (it != streams_.end() && it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has ended, but still partial message in stream"; + return false; + } + } else { + auto it = streams_.find(current_stream_id_); + if (it == streams_.end() || !it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has NOT ended, but there is no partial message"; + return false; } - } - for (const auto& stream : scheduler_.ActiveStreamsForTesting()) { - actual_active_streams.emplace(stream->stream_id()); - } - if (expected_active_streams != actual_active_streams) { - auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; }; - RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=[" - << StrJoin(actual_active_streams, ",", fn) - << "], expected=[" - << StrJoin(expected_active_streams, ",", fn) << "]"; - return false; } return total_buffered_amount == total_buffered_amount_.value(); @@ -121,15 +118,10 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) { void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, TimeMs expires_at, const SendOptions& send_options) { - bool was_active = bytes_to_send_in_next_message() > 0; buffered_amount_.Increase(message.payload().size()); total_buffered_amount_.Increase(message.payload().size()); items_.emplace_back(std::move(message), expires_at, send_options); - if (!was_active) { - scheduler_stream_->MaybeMakeActive(); - } - RTC_DCHECK(IsConsistent()); } @@ -235,15 +227,8 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); - // Only partially sent messages are discarded, so if a message was - // discarded, then it was the currently sent message. - scheduler_stream_->ForceReschedule(); - if (pause_state_ == PauseState::kPending) { pause_state_ = PauseState::kPaused; - scheduler_stream_->MakeInactive(); - } else if (bytes_to_send_in_next_message() == 0) { - scheduler_stream_->MakeInactive(); } // As the item still existed, it had unsent data. @@ -292,7 +277,6 @@ void RRSendQueue::OutgoingStream::Pause() { if (had_pending_items && pause_state_ == PauseState::kPaused) { RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() << " was previously active, but is now paused."; - scheduler_stream_->MakeInactive(); } RTC_DCHECK(IsConsistent()); @@ -300,8 +284,11 @@ void RRSendQueue::OutgoingStream::Pause() { void RRSendQueue::OutgoingStream::Resume() { RTC_DCHECK(pause_state_ == PauseState::kResetting); + if (!items_.empty()) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() + << " was previously paused, but is now active."; + } pause_state_ = PauseState::kNotPaused; - scheduler_stream_->MaybeMakeActive(); RTC_DCHECK(IsConsistent()); } @@ -309,11 +296,6 @@ void RRSendQueue::OutgoingStream::Reset() { // This can be called both when an outgoing stream reset has been responded // to, or when the entire SendQueue is reset due to detecting the peer having // restarted. The stream may be in any state at this time. - PauseState old_pause_state = pause_state_; - pause_state_ = PauseState::kNotPaused; - next_ordered_mid_ = MID(0); - next_unordered_mid_ = MID(0); - next_ssn_ = SSN(0); if (!items_.empty()) { // If this message has been partially sent, reset it so that it will be // re-sent. @@ -327,11 +309,11 @@ void RRSendQueue::OutgoingStream::Reset() { item.message_id = absl::nullopt; item.ssn = absl::nullopt; item.current_fsn = FSN(0); - if (old_pause_state == PauseState::kPaused || - old_pause_state == PauseState::kResetting) { - scheduler_stream_->MaybeMakeActive(); - } } + pause_state_ = PauseState::kNotPaused; + next_ordered_mid_ = MID(0); + next_unordered_mid_ = MID(0); + next_ssn_ = SSN(0); RTC_DCHECK(IsConsistent()); } @@ -368,9 +350,67 @@ bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } +std::map::iterator +RRSendQueue::GetNextStream() { + auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1)); + + for (auto it = start_it; it != streams_.end(); ++it) { + if (it->second.HasDataToSend()) { + current_stream_id_ = it->first; + return it; + } + } + + for (auto it = streams_.begin(); it != start_it; ++it) { + if (it->second.HasDataToSend()) { + current_stream_id_ = it->first; + return it; + } + } + return streams_.end(); +} + absl::optional RRSendQueue::Produce(TimeMs now, size_t max_size) { - return scheduler_.Produce(now, max_size); + std::map::iterator stream_it; + + for (;;) { + if (previous_message_has_ended_) { + // Previous message has ended. Round-robin to a different stream, if there + // even is one with data to send. + stream_it = GetNextStream(); + if (stream_it == streams_.end()) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "There is no stream with data; Can't produce any data."; + return absl::nullopt; + } + } else { + // The previous message has not ended; Continue from the current stream. + stream_it = streams_.find(current_stream_id_); + RTC_DCHECK(stream_it != streams_.end()); + } + + absl::optional data = stream_it->second.Produce(now, max_size); + if (!data.has_value()) { + continue; + } + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning ? "first" + : *data->data.is_end ? "last" + : "middle") + << ", stream_id=" << *stream_it->first + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + previous_message_has_ended_ = *data->data.is_end; + RTC_DCHECK(IsConsistent()); + return data; + } } bool RRSendQueue::Discard(IsUnordered unordered, @@ -378,8 +418,12 @@ bool RRSendQueue::Discard(IsUnordered unordered, MID message_id) { bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); + if (has_discarded) { + // Only partially sent messages are discarded, so if a message was + // discarded, then it was the currently sent message. + previous_message_has_ended_ = true; + } - RTC_DCHECK(IsConsistent()); return has_discarded; } @@ -440,7 +484,7 @@ void RRSendQueue::Reset() { for (auto& [unused, stream] : streams_) { stream.Reset(); } - scheduler_.ForceReschedule(); + previous_message_has_ended_ = true; } size_t RRSendQueue::buffered_amount(StreamID stream_id) const { @@ -472,9 +516,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( } return streams_ - .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), - std::forward_as_tuple( - &scheduler_, stream_id, default_priority_, + .emplace(stream_id, + OutgoingStream( + stream_id, default_priority_, [this, stream_id]() { on_buffered_amount_low_(stream_id); }, total_buffered_amount_)) .first->second; @@ -518,9 +562,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { state.tx.streams) { StreamID stream_id(state_stream.id); streams_.emplace( - std::piecewise_construct, std::forward_as_tuple(stream_id), - std::forward_as_tuple( - &scheduler_, stream_id, StreamPriority(state_stream.priority), + stream_id, + OutgoingStream( + stream_id, StreamPriority(state_stream.priority), [this, stream_id]() { on_buffered_amount_low_(stream_id); }, total_buffered_amount_, &state_stream)); } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index eea814c310..59f0d91aed 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -26,7 +25,6 @@ #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" #include "net/dcsctp/tx/send_queue.h" -#include "net/dcsctp/tx/stream_scheduler.h" namespace dcsctp { @@ -113,33 +111,32 @@ class RRSendQueue : public SendQueue { }; // Per-stream information. - class OutgoingStream : public StreamScheduler::StreamProducer { + class OutgoingStream { public: OutgoingStream( - StreamScheduler* scheduler, StreamID stream_id, StreamPriority priority, std::function on_buffered_amount_low, ThresholdWatcher& total_buffered_amount, const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) - : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)), + : stream_id_(stream_id), + priority_(priority), next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)), buffered_amount_(std::move(on_buffered_amount_low)), total_buffered_amount_(total_buffered_amount) {} - StreamID stream_id() const { return scheduler_stream_->stream_id(); } + StreamID stream_id() const { return stream_id_; } // Enqueues a message to this stream. void Add(DcSctpMessage message, TimeMs expires_at, const SendOptions& send_options); - // Implementing `StreamScheduler::StreamProducer`. - absl::optional Produce(TimeMs now, - size_t max_size) override; - size_t bytes_to_send_in_next_message() const override; + // Produces a data chunk to send, or `absl::nullopt` if nothing could be + // produced, e.g. if all messages have expired. + absl::optional Produce(TimeMs now, size_t max_size); const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } ThresholdWatcher& buffered_amount() { return buffered_amount_; } @@ -170,10 +167,12 @@ class RRSendQueue : public SendQueue { // Indicates if this stream has a partially sent message in it. bool has_partially_sent_message() const; - StreamPriority priority() const { return scheduler_stream_->priority(); } - void set_priority(StreamPriority priority) { - scheduler_stream_->set_priority(priority); - } + // Indicates if the stream possibly has data to send. Note that it may + // return `true` for streams that have enqueued, but expired, messages. + bool HasDataToSend() const; + + void set_priority(StreamPriority priority) { priority_ = priority; } + StreamPriority priority() const { return priority_; } void AddHandoverState( DcSctpSocketHandoverState::OutgoingStream& state) const; @@ -226,8 +225,8 @@ class RRSendQueue : public SendQueue { bool IsConsistent() const; - const std::unique_ptr scheduler_stream_; - + const StreamID stream_id_; + StreamPriority priority_; PauseState pause_state_ = PauseState::kNotPaused; // MIDs are different for unordered and ordered messages sent on a stream. MID next_unordered_mid_; @@ -252,10 +251,12 @@ class RRSendQueue : public SendQueue { TimeMs now, size_t max_size); + // Return the next stream, in round-robin fashion. + std::map::iterator GetNextStream(); + const std::string log_prefix_; const size_t buffer_size_; const StreamPriority default_priority_; - StreamScheduler scheduler_; // Called when the buffered amount is below what has been set using // `SetBufferedAmountLowThreshold`. @@ -268,6 +269,15 @@ class RRSendQueue : public SendQueue { // The total amount of buffer data, for all streams. ThresholdWatcher total_buffered_amount_; + // Indicates if the previous fragment sent was the end of a message. For + // non-interleaved sending, this means that the next message may come from a + // different stream. If not true, the next fragment must be produced from the + // same stream as last time. + bool previous_message_has_ended_ = true; + + // The current stream to send chunks from. Modified by `GetNextStream`. + StreamID current_stream_id_ = StreamID(0); + // All streams, and messages added to those. std::map streams_; }; diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc index 2056dd1cce..22457bfa8e 100644 --- a/net/dcsctp/tx/stream_scheduler.cc +++ b/net/dcsctp/tx/stream_scheduler.cc @@ -114,7 +114,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime() absl::optional StreamScheduler::Stream::Produce( TimeMs now, size_t max_size) { - absl::optional data = producer_.Produce(now, max_size); + absl::optional data = callback_.Produce(now, max_size); if (data.has_value()) { VirtualTime new_current = GetNextFinishTime(); diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h index d82501f2f8..9feeb61434 100644 --- a/net/dcsctp/tx/stream_scheduler.h +++ b/net/dcsctp/tx/stream_scheduler.h @@ -56,9 +56,9 @@ class StreamScheduler { }; public: - class StreamProducer { + class StreamCallback { public: - virtual ~StreamProducer() = default; + virtual ~StreamCallback() = default; // Produces a fragment of data to send. The current wall time is specified // as `now` and should be used to skip chunks with expired limited lifetime. @@ -99,11 +99,11 @@ class StreamScheduler { friend class StreamScheduler; Stream(StreamScheduler* parent, - StreamProducer* producer, + StreamCallback* callback, StreamID stream_id, StreamPriority priority) : parent_(*parent), - producer_(*producer), + callback_(*callback), stream_id_(stream_id), priority_(priority) {} @@ -117,14 +117,14 @@ class StreamScheduler { VirtualTime current_time() const { return current_virtual_time_; } VirtualTime next_finish_time() const { return next_finish_time_; } size_t bytes_to_send_in_next_message() const { - return producer_.bytes_to_send_in_next_message(); + return callback_.bytes_to_send_in_next_message(); } // Returns the next virtual finish time for this stream. VirtualTime GetNextFinishTime() const; StreamScheduler& parent_; - StreamProducer& producer_; + StreamCallback& callback_; const StreamID stream_id_; StreamPriority priority_; // This outgoing stream's "current" virtual_time. @@ -132,10 +132,10 @@ class StreamScheduler { VirtualTime next_finish_time_ = VirtualTime::Zero(); }; - std::unique_ptr CreateStream(StreamProducer* producer, + std::unique_ptr CreateStream(StreamCallback* callback, StreamID stream_id, StreamPriority priority) { - return absl::WrapUnique(new Stream(this, producer, stream_id, priority)); + return absl::WrapUnique(new Stream(this, callback, stream_id, priority)); } // Makes the scheduler stop producing message from the current stream and diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc index 0c239fe8b0..cd15837c7e 100644 --- a/net/dcsctp/tx/stream_scheduler_test.cc +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -59,7 +59,7 @@ std::map GetPacketCounts(StreamScheduler& scheduler, return packet_counts; } -class MockStreamProducer : public StreamScheduler::StreamProducer { +class MockStreamCallback : public StreamScheduler::StreamCallback { public: MOCK_METHOD(absl::optional, Produce, @@ -74,18 +74,18 @@ class TestStream { StreamID stream_id, StreamPriority priority, size_t packet_size = kPayloadSize) { - EXPECT_CALL(producer_, Produce) + EXPECT_CALL(callback_, Produce) .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size)); - EXPECT_CALL(producer_, bytes_to_send_in_next_message) + EXPECT_CALL(callback_, bytes_to_send_in_next_message) .WillRepeatedly(Return(packet_size)); - stream_ = scheduler.CreateStream(&producer_, stream_id, priority); + stream_ = scheduler.CreateStream(&callback_, stream_id, priority); stream_->MaybeMakeActive(); } StreamScheduler::Stream& stream() { return *stream_; } private: - StrictMock producer_; + StrictMock callback_; std::unique_ptr stream_; }; @@ -100,9 +100,9 @@ TEST(StreamSchedulerTest, HasNoActiveStreams) { TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { StreamScheduler scheduler; - StrictMock producer; + StrictMock callback; auto stream = - scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); EXPECT_EQ(stream->stream_id(), StreamID(1)); EXPECT_EQ(stream->priority(), StreamPriority(2)); @@ -115,13 +115,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { TEST(StreamSchedulerTest, CanProduceFromSingleStream) { StreamScheduler scheduler; - StrictMock producer; - EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); - EXPECT_CALL(producer, bytes_to_send_in_next_message) + StrictMock callback; + EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); + EXPECT_CALL(callback, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(0)); auto stream = - scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); stream->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); @@ -132,32 +132,32 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { StreamScheduler scheduler; - StrictMock producer1; - EXPECT_CALL(producer1, Produce) + StrictMock callback1; + EXPECT_CALL(callback1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(producer1, bytes_to_send_in_next_message) + EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - StrictMock producer2; - EXPECT_CALL(producer2, Produce) + StrictMock callback2; + EXPECT_CALL(callback2, Produce) .WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(202))); - EXPECT_CALL(producer2, bytes_to_send_in_next_message) + EXPECT_CALL(callback2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream2 = - scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -174,8 +174,8 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { StreamScheduler scheduler; - StrictMock producer1; - EXPECT_CALL(producer1, Produce) + StrictMock callback1; + EXPECT_CALL(callback1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce([](...) { return SendQueue::DataToSend( @@ -196,7 +196,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { Data::IsEnd(true), IsUnordered(true))); }) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(producer1, bytes_to_send_in_next_message) + EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) @@ -204,21 +204,21 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - StrictMock producer2; - EXPECT_CALL(producer2, Produce) + StrictMock callback2; + EXPECT_CALL(callback2, Produce) .WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(202))); - EXPECT_CALL(producer2, bytes_to_send_in_next_message) + EXPECT_CALL(callback2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream2 = - scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -236,16 +236,16 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { StreamScheduler scheduler; - StrictMock producer1; - EXPECT_CALL(producer1, Produce) + StrictMock callback1; + EXPECT_CALL(callback1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))); - EXPECT_CALL(producer1, bytes_to_send_in_next_message) + EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming. auto stream1 = - scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -260,20 +260,20 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { StreamScheduler scheduler; - StrictMock producer1; + StrictMock callback1; // Callbacks are setup so that they hint that there is a MID(2) coming... - EXPECT_CALL(producer1, Produce) + EXPECT_CALL(callback1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(producer1, bytes_to_send_in_next_message) + EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) // When making active again .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -290,33 +290,33 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { StreamScheduler scheduler; - StrictMock producer1; - EXPECT_CALL(producer1, Produce) + StrictMock callback1; + EXPECT_CALL(callback1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(producer1, bytes_to_send_in_next_message) + EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - StrictMock producer2; - EXPECT_CALL(producer2, Produce) + StrictMock callback2; + EXPECT_CALL(callback2, Produce) .WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(202))); - EXPECT_CALL(producer2, bytes_to_send_in_next_message) + EXPECT_CALL(callback2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream2 = - scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));