diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index c2fb12eac8..ae81db8c67 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -14,6 +14,7 @@ rtc_source_set("send_queue") { "../common:internal_types", "../packet:chunk", "../packet:data", + "../public:socket", "../public:types", ] sources = [ "send_queue.h" ] @@ -23,9 +24,12 @@ rtc_source_set("send_queue") { rtc_library("rr_send_queue") { deps = [ ":send_queue", + ":stream_scheduler", "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base/containers:flat_map", + "../common:str_join", "../packet:data", "../public:socket", "../public:types", @@ -180,6 +184,7 @@ if (rtc_include_tests) { "../common:sequence_numbers", "../packet:chunk", "../packet:data", + "../packet:sctp_packet", "../public:socket", "../public:types", "../testing:data_generator", diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 6127da4e77..ee8bf828d0 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -13,12 +13,14 @@ #include #include #include +#include #include #include #include "absl/algorithm/container.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" @@ -42,18 +44,18 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); } -bool RRSendQueue::OutgoingStream::HasDataToSend() const { +size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const { if (pause_state_ == PauseState::kPaused || pause_state_ == PauseState::kResetting) { // The stream has paused (and there is no partially sent message). - return false; + return 0; } if (items_.empty()) { - return false; + return 0; } - return true; + return items_.front().remaining_size; } void RRSendQueue::OutgoingStream::AddHandoverState( @@ -61,30 +63,29 @@ void RRSendQueue::OutgoingStream::AddHandoverState( state.next_ssn = next_ssn_.value(); state.next_ordered_mid = next_ordered_mid_.value(); state.next_unordered_mid = next_unordered_mid_.value(); - state.priority = *priority_; + state.priority = *scheduler_stream_->priority(); } bool RRSendQueue::IsConsistent() const { - size_t total_buffered_amount = 0; - for (const auto& [unused, stream] : streams_) { - total_buffered_amount += stream.buffered_amount().value(); - } + std::set expected_active_streams; + std::set actual_active_streams = + scheduler_.ActiveStreamsForTesting(); - if (previous_message_has_ended_) { - auto it = streams_.find(current_stream_id_); - if (it != streams_.end() && it->second.has_partially_sent_message()) { - RTC_DLOG(LS_ERROR) - << "Previous message has ended, but still partial message in stream"; - return false; - } - } else { - auto it = streams_.find(current_stream_id_); - if (it == streams_.end() || !it->second.has_partially_sent_message()) { - RTC_DLOG(LS_ERROR) - << "Previous message has NOT ended, but there is no partial message"; - return false; + size_t total_buffered_amount = 0; + for (const auto& [stream_id, stream] : streams_) { + total_buffered_amount += stream.buffered_amount().value(); + if (stream.bytes_to_send_in_next_message() > 0) { + expected_active_streams.emplace(stream_id); } } + if (expected_active_streams != actual_active_streams) { + auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; }; + RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=[" + << StrJoin(actual_active_streams, ",", fn) + << "], expected=[" + << StrJoin(expected_active_streams, ",", fn) << "]"; + return false; + } return total_buffered_amount == total_buffered_amount_.value(); } @@ -118,10 +119,15 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) { void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, TimeMs expires_at, const SendOptions& send_options) { + bool was_active = bytes_to_send_in_next_message() > 0; buffered_amount_.Increase(message.payload().size()); total_buffered_amount_.Increase(message.payload().size()); items_.emplace_back(std::move(message), expires_at, send_options); + if (!was_active) { + scheduler_stream_->MaybeMakeActive(); + } + RTC_DCHECK(IsConsistent()); } @@ -227,8 +233,15 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); + // Only partially sent messages are discarded, so if a message was + // discarded, then it was the currently sent message. + scheduler_stream_->ForceReschedule(); + if (pause_state_ == PauseState::kPending) { pause_state_ = PauseState::kPaused; + scheduler_stream_->MakeInactive(); + } else if (bytes_to_send_in_next_message() == 0) { + scheduler_stream_->MakeInactive(); } // As the item still existed, it had unsent data. @@ -277,6 +290,7 @@ void RRSendQueue::OutgoingStream::Pause() { if (had_pending_items && pause_state_ == PauseState::kPaused) { RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() << " was previously active, but is now paused."; + scheduler_stream_->MakeInactive(); } RTC_DCHECK(IsConsistent()); @@ -284,11 +298,8 @@ void RRSendQueue::OutgoingStream::Pause() { void RRSendQueue::OutgoingStream::Resume() { RTC_DCHECK(pause_state_ == PauseState::kResetting); - if (!items_.empty()) { - RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() - << " was previously paused, but is now active."; - } pause_state_ = PauseState::kNotPaused; + scheduler_stream_->MaybeMakeActive(); RTC_DCHECK(IsConsistent()); } @@ -296,6 +307,11 @@ void RRSendQueue::OutgoingStream::Reset() { // This can be called both when an outgoing stream reset has been responded // to, or when the entire SendQueue is reset due to detecting the peer having // restarted. The stream may be in any state at this time. + PauseState old_pause_state = pause_state_; + pause_state_ = PauseState::kNotPaused; + next_ordered_mid_ = MID(0); + next_unordered_mid_ = MID(0); + next_ssn_ = SSN(0); if (!items_.empty()) { // If this message has been partially sent, reset it so that it will be // re-sent. @@ -309,11 +325,11 @@ void RRSendQueue::OutgoingStream::Reset() { item.message_id = absl::nullopt; item.ssn = absl::nullopt; item.current_fsn = FSN(0); + if (old_pause_state == PauseState::kPaused || + old_pause_state == PauseState::kResetting) { + scheduler_stream_->MaybeMakeActive(); + } } - pause_state_ = PauseState::kNotPaused; - next_ordered_mid_ = MID(0); - next_unordered_mid_ = MID(0); - next_ssn_ = SSN(0); RTC_DCHECK(IsConsistent()); } @@ -350,67 +366,9 @@ bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } -std::map::iterator -RRSendQueue::GetNextStream() { - auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1)); - - for (auto it = start_it; it != streams_.end(); ++it) { - if (it->second.HasDataToSend()) { - current_stream_id_ = it->first; - return it; - } - } - - for (auto it = streams_.begin(); it != start_it; ++it) { - if (it->second.HasDataToSend()) { - current_stream_id_ = it->first; - return it; - } - } - return streams_.end(); -} - absl::optional RRSendQueue::Produce(TimeMs now, size_t max_size) { - std::map::iterator stream_it; - - for (;;) { - if (previous_message_has_ended_) { - // Previous message has ended. Round-robin to a different stream, if there - // even is one with data to send. - stream_it = GetNextStream(); - if (stream_it == streams_.end()) { - RTC_DLOG(LS_VERBOSE) - << log_prefix_ - << "There is no stream with data; Can't produce any data."; - return absl::nullopt; - } - } else { - // The previous message has not ended; Continue from the current stream. - stream_it = streams_.find(current_stream_id_); - RTC_DCHECK(stream_it != streams_.end()); - } - - absl::optional data = stream_it->second.Produce(now, max_size); - if (!data.has_value()) { - continue; - } - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type=" - << (data->data.is_unordered ? "unordered" : "ordered") - << "::" - << (*data->data.is_beginning && *data->data.is_end - ? "complete" - : *data->data.is_beginning ? "first" - : *data->data.is_end ? "last" - : "middle") - << ", stream_id=" << *stream_it->first - << ", ppid=" << *data->data.ppid - << ", length=" << data->data.payload.size(); - - previous_message_has_ended_ = *data->data.is_end; - RTC_DCHECK(IsConsistent()); - return data; - } + return scheduler_.Produce(now, max_size); } bool RRSendQueue::Discard(IsUnordered unordered, @@ -418,12 +376,8 @@ bool RRSendQueue::Discard(IsUnordered unordered, MID message_id) { bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); - if (has_discarded) { - // Only partially sent messages are discarded, so if a message was - // discarded, then it was the currently sent message. - previous_message_has_ended_ = true; - } + RTC_DCHECK(IsConsistent()); return has_discarded; } @@ -484,7 +438,7 @@ void RRSendQueue::Reset() { for (auto& [unused, stream] : streams_) { stream.Reset(); } - previous_message_has_ended_ = true; + scheduler_.ForceReschedule(); } size_t RRSendQueue::buffered_amount(StreamID stream_id) const { @@ -516,9 +470,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( } return streams_ - .emplace(stream_id, - OutgoingStream( - stream_id, default_priority_, + .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), + std::forward_as_tuple( + &scheduler_, stream_id, default_priority_, [this, stream_id]() { on_buffered_amount_low_(stream_id); }, total_buffered_amount_)) .first->second; @@ -562,9 +516,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { state.tx.streams) { StreamID stream_id(state_stream.id); streams_.emplace( - stream_id, - OutgoingStream( - stream_id, StreamPriority(state_stream.priority), + std::piecewise_construct, std::forward_as_tuple(stream_id), + std::forward_as_tuple( + &scheduler_, stream_id, StreamPriority(state_stream.priority), [this, stream_id]() { on_buffered_amount_low_(stream_id); }, total_buffered_amount_, &state_stream)); } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index 59f0d91aed..eea814c310 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -25,6 +26,7 @@ #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" #include "net/dcsctp/tx/send_queue.h" +#include "net/dcsctp/tx/stream_scheduler.h" namespace dcsctp { @@ -111,32 +113,33 @@ class RRSendQueue : public SendQueue { }; // Per-stream information. - class OutgoingStream { + class OutgoingStream : public StreamScheduler::StreamProducer { public: OutgoingStream( + StreamScheduler* scheduler, StreamID stream_id, StreamPriority priority, std::function on_buffered_amount_low, ThresholdWatcher& total_buffered_amount, const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) - : stream_id_(stream_id), - priority_(priority), + : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)), next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)), buffered_amount_(std::move(on_buffered_amount_low)), total_buffered_amount_(total_buffered_amount) {} - StreamID stream_id() const { return stream_id_; } + StreamID stream_id() const { return scheduler_stream_->stream_id(); } // Enqueues a message to this stream. void Add(DcSctpMessage message, TimeMs expires_at, const SendOptions& send_options); - // Produces a data chunk to send, or `absl::nullopt` if nothing could be - // produced, e.g. if all messages have expired. - absl::optional Produce(TimeMs now, size_t max_size); + // Implementing `StreamScheduler::StreamProducer`. + absl::optional Produce(TimeMs now, + size_t max_size) override; + size_t bytes_to_send_in_next_message() const override; const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } ThresholdWatcher& buffered_amount() { return buffered_amount_; } @@ -167,12 +170,10 @@ class RRSendQueue : public SendQueue { // Indicates if this stream has a partially sent message in it. bool has_partially_sent_message() const; - // Indicates if the stream possibly has data to send. Note that it may - // return `true` for streams that have enqueued, but expired, messages. - bool HasDataToSend() const; - - void set_priority(StreamPriority priority) { priority_ = priority; } - StreamPriority priority() const { return priority_; } + StreamPriority priority() const { return scheduler_stream_->priority(); } + void set_priority(StreamPriority priority) { + scheduler_stream_->set_priority(priority); + } void AddHandoverState( DcSctpSocketHandoverState::OutgoingStream& state) const; @@ -225,8 +226,8 @@ class RRSendQueue : public SendQueue { bool IsConsistent() const; - const StreamID stream_id_; - StreamPriority priority_; + const std::unique_ptr scheduler_stream_; + PauseState pause_state_ = PauseState::kNotPaused; // MIDs are different for unordered and ordered messages sent on a stream. MID next_unordered_mid_; @@ -251,12 +252,10 @@ class RRSendQueue : public SendQueue { TimeMs now, size_t max_size); - // Return the next stream, in round-robin fashion. - std::map::iterator GetNextStream(); - const std::string log_prefix_; const size_t buffer_size_; const StreamPriority default_priority_; + StreamScheduler scheduler_; // Called when the buffered amount is below what has been set using // `SetBufferedAmountLowThreshold`. @@ -269,15 +268,6 @@ class RRSendQueue : public SendQueue { // The total amount of buffer data, for all streams. ThresholdWatcher total_buffered_amount_; - // Indicates if the previous fragment sent was the end of a message. For - // non-interleaved sending, this means that the next message may come from a - // different stream. If not true, the next fragment must be produced from the - // same stream as last time. - bool previous_message_has_ended_ = true; - - // The current stream to send chunks from. Modified by `GetNextStream`. - StreamID current_stream_id_ = StreamID(0); - // All streams, and messages added to those. std::map streams_; }; diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc index 22457bfa8e..1d15aec4da 100644 --- a/net/dcsctp/tx/stream_scheduler.cc +++ b/net/dcsctp/tx/stream_scheduler.cc @@ -114,7 +114,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime() absl::optional StreamScheduler::Stream::Produce( TimeMs now, size_t max_size) { - absl::optional data = callback_.Produce(now, max_size); + absl::optional data = producer_.Produce(now, max_size); if (data.has_value()) { VirtualTime new_current = GetNextFinishTime(); @@ -172,4 +172,12 @@ void StreamScheduler::Stream::MakeInactive() { [&](const auto* s) { return s == this; }); } +std::set StreamScheduler::ActiveStreamsForTesting() const { + std::set stream_ids; + for (const auto& stream : active_streams_) { + stream_ids.insert(stream->stream_id()); + } + return stream_ids; +} + } // namespace dcsctp diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h index 9feeb61434..e76f474248 100644 --- a/net/dcsctp/tx/stream_scheduler.h +++ b/net/dcsctp/tx/stream_scheduler.h @@ -56,9 +56,9 @@ class StreamScheduler { }; public: - class StreamCallback { + class StreamProducer { public: - virtual ~StreamCallback() = default; + virtual ~StreamProducer() = default; // Produces a fragment of data to send. The current wall time is specified // as `now` and should be used to skip chunks with expired limited lifetime. @@ -99,11 +99,11 @@ class StreamScheduler { friend class StreamScheduler; Stream(StreamScheduler* parent, - StreamCallback* callback, + StreamProducer* producer, StreamID stream_id, StreamPriority priority) : parent_(*parent), - callback_(*callback), + producer_(*producer), stream_id_(stream_id), priority_(priority) {} @@ -117,14 +117,14 @@ class StreamScheduler { VirtualTime current_time() const { return current_virtual_time_; } VirtualTime next_finish_time() const { return next_finish_time_; } size_t bytes_to_send_in_next_message() const { - return callback_.bytes_to_send_in_next_message(); + return producer_.bytes_to_send_in_next_message(); } // Returns the next virtual finish time for this stream. VirtualTime GetNextFinishTime() const; StreamScheduler& parent_; - StreamCallback& callback_; + StreamProducer& producer_; const StreamID stream_id_; StreamPriority priority_; // This outgoing stream's "current" virtual_time. @@ -132,10 +132,10 @@ class StreamScheduler { VirtualTime next_finish_time_ = VirtualTime::Zero(); }; - std::unique_ptr CreateStream(StreamCallback* callback, + std::unique_ptr CreateStream(StreamProducer* producer, StreamID stream_id, StreamPriority priority) { - return absl::WrapUnique(new Stream(this, callback, stream_id, priority)); + return absl::WrapUnique(new Stream(this, producer, stream_id, priority)); } // Makes the scheduler stop producing message from the current stream and @@ -148,10 +148,7 @@ class StreamScheduler { // may be returned. If no data can be produced, `absl::nullopt` is returned. absl::optional Produce(TimeMs now, size_t max_size); - rtc::ArrayView ActiveStreamsForTesting() const { - return rtc::MakeArrayView(&*active_streams_.cbegin(), - active_streams_.size()); - } + std::set ActiveStreamsForTesting() const; private: struct ActiveStreamComparator { diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc index cd15837c7e..0c239fe8b0 100644 --- a/net/dcsctp/tx/stream_scheduler_test.cc +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -59,7 +59,7 @@ std::map GetPacketCounts(StreamScheduler& scheduler, return packet_counts; } -class MockStreamCallback : public StreamScheduler::StreamCallback { +class MockStreamProducer : public StreamScheduler::StreamProducer { public: MOCK_METHOD(absl::optional, Produce, @@ -74,18 +74,18 @@ class TestStream { StreamID stream_id, StreamPriority priority, size_t packet_size = kPayloadSize) { - EXPECT_CALL(callback_, Produce) + EXPECT_CALL(producer_, Produce) .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size)); - EXPECT_CALL(callback_, bytes_to_send_in_next_message) + EXPECT_CALL(producer_, bytes_to_send_in_next_message) .WillRepeatedly(Return(packet_size)); - stream_ = scheduler.CreateStream(&callback_, stream_id, priority); + stream_ = scheduler.CreateStream(&producer_, stream_id, priority); stream_->MaybeMakeActive(); } StreamScheduler::Stream& stream() { return *stream_; } private: - StrictMock callback_; + StrictMock producer_; std::unique_ptr stream_; }; @@ -100,9 +100,9 @@ TEST(StreamSchedulerTest, HasNoActiveStreams) { TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { StreamScheduler scheduler; - StrictMock callback; + StrictMock producer; auto stream = - scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); EXPECT_EQ(stream->stream_id(), StreamID(1)); EXPECT_EQ(stream->priority(), StreamPriority(2)); @@ -115,13 +115,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { TEST(StreamSchedulerTest, CanProduceFromSingleStream) { StreamScheduler scheduler; - StrictMock callback; - EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); - EXPECT_CALL(callback, bytes_to_send_in_next_message) + StrictMock producer; + EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); + EXPECT_CALL(producer, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(0)); auto stream = - scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); stream->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); @@ -132,32 +132,32 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { StreamScheduler scheduler; - StrictMock callback1; - EXPECT_CALL(callback1, Produce) + StrictMock producer1; + EXPECT_CALL(producer1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(callback1, bytes_to_send_in_next_message) + EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - StrictMock callback2; - EXPECT_CALL(callback2, Produce) + StrictMock producer2; + EXPECT_CALL(producer2, Produce) .WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(202))); - EXPECT_CALL(callback2, bytes_to_send_in_next_message) + EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream2 = - scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -174,8 +174,8 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { StreamScheduler scheduler; - StrictMock callback1; - EXPECT_CALL(callback1, Produce) + StrictMock producer1; + EXPECT_CALL(producer1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce([](...) { return SendQueue::DataToSend( @@ -196,7 +196,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { Data::IsEnd(true), IsUnordered(true))); }) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(callback1, bytes_to_send_in_next_message) + EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) @@ -204,21 +204,21 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - StrictMock callback2; - EXPECT_CALL(callback2, Produce) + StrictMock producer2; + EXPECT_CALL(producer2, Produce) .WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(202))); - EXPECT_CALL(callback2, bytes_to_send_in_next_message) + EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream2 = - scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -236,16 +236,16 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { StreamScheduler scheduler; - StrictMock callback1; - EXPECT_CALL(callback1, Produce) + StrictMock producer1; + EXPECT_CALL(producer1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))); - EXPECT_CALL(callback1, bytes_to_send_in_next_message) + EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming. auto stream1 = - scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -260,20 +260,20 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { StreamScheduler scheduler; - StrictMock callback1; + StrictMock producer1; // Callbacks are setup so that they hint that there is a MID(2) coming... - EXPECT_CALL(callback1, Produce) + EXPECT_CALL(producer1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(callback1, bytes_to_send_in_next_message) + EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) // When making active again .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); @@ -290,33 +290,33 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { StreamScheduler scheduler; - StrictMock callback1; - EXPECT_CALL(callback1, Produce) + StrictMock producer1; + EXPECT_CALL(producer1, Produce) .WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(102))); - EXPECT_CALL(callback1, bytes_to_send_in_next_message) + EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream1 = - scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - StrictMock callback2; - EXPECT_CALL(callback2, Produce) + StrictMock producer2; + EXPECT_CALL(producer2, Produce) .WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(202))); - EXPECT_CALL(callback2, bytes_to_send_in_next_message) + EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize)) .WillOnce(Return(0)); auto stream2 = - scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));