New PacketQueue2 which prioritize packets in a round-robin fashion.
Bug: webrtc:8287, webrtc:8288 Change-Id: I69ab846851c308d51e23e2b3bc3b55b19d1a07e8 Reviewed-on: https://webrtc-review.googlesource.com/7300 Commit-Queue: Philip Eliasson <philipel@webrtc.org> Reviewed-by: Stefan Holmer <stefan@webrtc.org> Cr-Commit-Position: refs/heads/master@{#20281}
This commit is contained in:
parent
09d90147fb
commit
881829b145
@ -21,6 +21,8 @@ rtc_static_library("pacing") {
|
||||
"pacer.h",
|
||||
"packet_queue.cc",
|
||||
"packet_queue.h",
|
||||
"packet_queue2.cc",
|
||||
"packet_queue2.h",
|
||||
"packet_router.cc",
|
||||
"packet_router.h",
|
||||
]
|
||||
|
||||
212
modules/pacing/packet_queue2.cc
Normal file
212
modules/pacing/packet_queue2.cc
Normal file
@ -0,0 +1,212 @@
|
||||
/*
|
||||
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include "modules/pacing/packet_queue2.h"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include "rtc_base/checks.h"
|
||||
#include "system_wrappers/include/clock.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
PacketQueue2::Stream::Stream() : bytes(0) {}
|
||||
PacketQueue2::Stream::~Stream() {}
|
||||
|
||||
PacketQueue2::Packet::Packet(RtpPacketSender::Priority priority,
|
||||
uint32_t ssrc,
|
||||
uint16_t seq_number,
|
||||
int64_t capture_time_ms,
|
||||
int64_t enqueue_time_ms,
|
||||
size_t length_in_bytes,
|
||||
bool retransmission,
|
||||
uint64_t enqueue_order)
|
||||
: priority(priority),
|
||||
ssrc(ssrc),
|
||||
sequence_number(seq_number),
|
||||
capture_time_ms(capture_time_ms),
|
||||
enqueue_time_ms(enqueue_time_ms),
|
||||
bytes(length_in_bytes),
|
||||
retransmission(retransmission),
|
||||
enqueue_order(enqueue_order) {}
|
||||
|
||||
PacketQueue2::Packet::Packet(const Packet& other) = default;
|
||||
|
||||
PacketQueue2::Packet::~Packet() {}
|
||||
|
||||
PacketQueue2::PacketQueue2(const Clock* clock)
|
||||
: clock_(clock), time_last_updated_(clock_->TimeInMilliseconds()) {}
|
||||
|
||||
PacketQueue2::~PacketQueue2() {}
|
||||
|
||||
void PacketQueue2::Push(const Packet& packet_to_insert) {
|
||||
Packet packet(packet_to_insert);
|
||||
|
||||
auto stream_info_it = streams_.find(packet.ssrc);
|
||||
if (stream_info_it == streams_.end()) {
|
||||
stream_info_it = streams_.emplace(packet.ssrc, Stream()).first;
|
||||
stream_info_it->second.priority_it = stream_priorities_.end();
|
||||
stream_info_it->second.ssrc = packet.ssrc;
|
||||
}
|
||||
|
||||
Stream* streams_ = &stream_info_it->second;
|
||||
|
||||
if (streams_->priority_it == stream_priorities_.end()) {
|
||||
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
|
||||
RTC_CHECK(!IsSsrcScheduled(streams_->ssrc));
|
||||
streams_->priority_it = stream_priorities_.emplace(
|
||||
StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
|
||||
} else if (packet.priority < streams_->priority_it->first.priority) {
|
||||
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
|
||||
// and insert a new one with the new priority. Note that
|
||||
// RtpPacketSender::Priority uses lower ordinal for higher priority.
|
||||
stream_priorities_.erase(streams_->priority_it);
|
||||
streams_->priority_it = stream_priorities_.emplace(
|
||||
StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
|
||||
}
|
||||
RTC_CHECK(streams_->priority_it != stream_priorities_.end());
|
||||
|
||||
packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms);
|
||||
|
||||
// In order to figure out how much time a packet has spent in the queue while
|
||||
// not in a paused state, we subtract the total amount of time the queue has
|
||||
// been paused so far, and when the packet is poped we subtract the total
|
||||
// amount of time the queue has been paused at that moment. This way we
|
||||
// subtract the total amount of time the packet has spent in the queue while
|
||||
// in a paused state.
|
||||
UpdateQueueTime(packet.enqueue_time_ms);
|
||||
packet.enqueue_time_ms -= pause_time_sum_ms_;
|
||||
streams_->packet_queue.push(packet);
|
||||
|
||||
size_packets_ += 1;
|
||||
size_bytes_ += packet.bytes;
|
||||
}
|
||||
|
||||
const PacketQueue2::Packet& PacketQueue2::Top() {
|
||||
return GetHighestPriorityStream()->packet_queue.top();
|
||||
}
|
||||
|
||||
void PacketQueue2::Pop() {
|
||||
RTC_CHECK(!paused_);
|
||||
if (!Empty()) {
|
||||
Stream* streams_ = GetHighestPriorityStream();
|
||||
stream_priorities_.erase(streams_->priority_it);
|
||||
const Packet& packet = streams_->packet_queue.top();
|
||||
|
||||
// Calculate the total amount of time spent by this packet in the queue
|
||||
// while in a non-paused state. Note that the |pause_time_sum_ms_| was
|
||||
// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
|
||||
// by subtracting it now we effectively remove the time spent in in the
|
||||
// queue while in a paused state.
|
||||
int64_t time_in_non_paused_state_ms =
|
||||
time_last_updated_ - packet.enqueue_time_ms - pause_time_sum_ms_;
|
||||
queue_time_sum_ms_ -= time_in_non_paused_state_ms;
|
||||
|
||||
RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end());
|
||||
enqueue_times_.erase(packet.enqueue_time_it);
|
||||
|
||||
// Update |bytes| of this stream. The general idea is that the stream that
|
||||
// has sent the least amount of bytes should have the highest priority.
|
||||
// The problem with that is if streams send with different rates, in which
|
||||
// case a "budget" will be built up for the stream sending at the lower
|
||||
// rate. To avoid building a too large budget we limit |bytes| to be within
|
||||
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
|
||||
streams_->bytes =
|
||||
std::max(streams_->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes);
|
||||
max_bytes_ = std::max(max_bytes_, streams_->bytes);
|
||||
|
||||
size_bytes_ -= packet.bytes;
|
||||
size_packets_ -= 1;
|
||||
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
|
||||
streams_->packet_queue.pop();
|
||||
|
||||
// If there are packets left to be sent, schedule the stream again.
|
||||
RTC_CHECK(!IsSsrcScheduled(streams_->ssrc));
|
||||
if (streams_->packet_queue.empty()) {
|
||||
streams_->priority_it = stream_priorities_.end();
|
||||
} else {
|
||||
RtpPacketSender::Priority priority =
|
||||
streams_->packet_queue.top().priority;
|
||||
streams_->priority_it = stream_priorities_.emplace(
|
||||
StreamPrioKey(priority, streams_->bytes), streams_->ssrc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool PacketQueue2::Empty() const {
|
||||
RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
|
||||
(stream_priorities_.empty() && size_packets_ == 0));
|
||||
return stream_priorities_.empty();
|
||||
}
|
||||
|
||||
size_t PacketQueue2::SizeInPackets() const {
|
||||
return size_packets_;
|
||||
}
|
||||
|
||||
uint64_t PacketQueue2::SizeInBytes() const {
|
||||
return size_bytes_;
|
||||
}
|
||||
|
||||
int64_t PacketQueue2::OldestEnqueueTimeMs() const {
|
||||
if (Empty())
|
||||
return 0;
|
||||
RTC_CHECK(!enqueue_times_.empty());
|
||||
return *enqueue_times_.begin();
|
||||
}
|
||||
|
||||
void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) {
|
||||
RTC_CHECK_GE(timestamp_ms, time_last_updated_);
|
||||
if (timestamp_ms == time_last_updated_)
|
||||
return;
|
||||
|
||||
int64_t delta_ms = timestamp_ms - time_last_updated_;
|
||||
|
||||
if (paused_) {
|
||||
pause_time_sum_ms_ += delta_ms;
|
||||
} else {
|
||||
queue_time_sum_ms_ += delta_ms * size_packets_;
|
||||
}
|
||||
|
||||
time_last_updated_ = timestamp_ms;
|
||||
}
|
||||
|
||||
void PacketQueue2::SetPauseState(bool paused, int64_t timestamp_ms) {
|
||||
if (paused_ == paused)
|
||||
return;
|
||||
UpdateQueueTime(timestamp_ms);
|
||||
paused_ = paused;
|
||||
}
|
||||
|
||||
int64_t PacketQueue2::AverageQueueTimeMs() const {
|
||||
if (Empty())
|
||||
return 0;
|
||||
return queue_time_sum_ms_ / size_packets_;
|
||||
}
|
||||
|
||||
PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() {
|
||||
RTC_CHECK(!stream_priorities_.empty());
|
||||
uint32_t ssrc = stream_priorities_.begin()->second;
|
||||
|
||||
auto stream_info_it = streams_.find(ssrc);
|
||||
RTC_CHECK(stream_info_it != streams_.end());
|
||||
RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
|
||||
RTC_CHECK(!stream_info_it->second.packet_queue.empty());
|
||||
return &stream_info_it->second;
|
||||
}
|
||||
|
||||
bool PacketQueue2::IsSsrcScheduled(uint32_t ssrc) const {
|
||||
for (const auto& scheduled_stream : stream_priorities_) {
|
||||
if (scheduled_stream.second == ssrc)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace webrtc
|
||||
138
modules/pacing/packet_queue2.h
Normal file
138
modules/pacing/packet_queue2.h
Normal file
@ -0,0 +1,138 @@
|
||||
/*
|
||||
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#ifndef MODULES_PACING_PACKET_QUEUE2_H_
|
||||
#define MODULES_PACING_PACKET_QUEUE2_H_
|
||||
|
||||
#include <map>
|
||||
#include <queue>
|
||||
#include <set>
|
||||
|
||||
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
class PacketQueue2 {
|
||||
public:
|
||||
explicit PacketQueue2(const Clock* clock);
|
||||
virtual ~PacketQueue2();
|
||||
|
||||
struct Packet {
|
||||
Packet(RtpPacketSender::Priority priority,
|
||||
uint32_t ssrc,
|
||||
uint16_t seq_number,
|
||||
int64_t capture_time_ms,
|
||||
int64_t enqueue_time_ms,
|
||||
size_t length_in_bytes,
|
||||
bool retransmission,
|
||||
uint64_t enqueue_order);
|
||||
|
||||
Packet(const Packet& other);
|
||||
|
||||
virtual ~Packet();
|
||||
|
||||
bool operator<(const Packet& other) const {
|
||||
if (priority != other.priority)
|
||||
return priority > other.priority;
|
||||
if (retransmission != other.retransmission)
|
||||
return other.retransmission;
|
||||
|
||||
return enqueue_order > other.enqueue_order;
|
||||
}
|
||||
|
||||
RtpPacketSender::Priority priority;
|
||||
uint32_t ssrc;
|
||||
uint16_t sequence_number;
|
||||
int64_t capture_time_ms; // Absolute time of frame capture.
|
||||
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
|
||||
size_t bytes;
|
||||
bool retransmission;
|
||||
uint64_t enqueue_order;
|
||||
std::multiset<int64_t>::iterator enqueue_time_it;
|
||||
};
|
||||
|
||||
void Push(const Packet& packet);
|
||||
const Packet& Top();
|
||||
void Pop();
|
||||
|
||||
bool Empty() const;
|
||||
size_t SizeInPackets() const;
|
||||
uint64_t SizeInBytes() const;
|
||||
|
||||
int64_t OldestEnqueueTimeMs() const;
|
||||
int64_t AverageQueueTimeMs() const;
|
||||
void UpdateQueueTime(int64_t timestamp_ms);
|
||||
void SetPauseState(bool paused, int64_t timestamp_ms);
|
||||
|
||||
struct StreamPrioKey {
|
||||
StreamPrioKey() = default;
|
||||
StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes)
|
||||
: priority(priority), bytes(bytes) {}
|
||||
|
||||
bool operator<(const StreamPrioKey& other) const {
|
||||
if (priority != other.priority)
|
||||
return priority < other.priority;
|
||||
return bytes > other.bytes;
|
||||
}
|
||||
|
||||
const RtpPacketSender::Priority priority;
|
||||
const size_t bytes;
|
||||
};
|
||||
|
||||
struct Stream {
|
||||
Stream();
|
||||
|
||||
virtual ~Stream();
|
||||
|
||||
size_t bytes;
|
||||
uint32_t ssrc;
|
||||
std::priority_queue<Packet> packet_queue;
|
||||
|
||||
// Whenever a packet is inserted for this stream we check if |priority_it|
|
||||
// points to an element in |stream_priorities_|, and if it does it means
|
||||
// this stream has already been scheduled, and if the scheduled priority is
|
||||
// lower than the priority of the incoming packet we reschedule this stream
|
||||
// with the higher priority.
|
||||
std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
|
||||
};
|
||||
|
||||
private:
|
||||
static constexpr size_t kMaxLeadingBytes = 1400;
|
||||
|
||||
Stream* GetHighestPriorityStream();
|
||||
|
||||
// Just used to verify correctness.
|
||||
bool IsSsrcScheduled(uint32_t ssrc) const;
|
||||
|
||||
const Clock* const clock_;
|
||||
bool paused_ = false;
|
||||
size_t size_packets_ = 0;
|
||||
size_t size_bytes_ = 0;
|
||||
size_t max_bytes_ = kMaxLeadingBytes;
|
||||
int64_t time_last_updated_;
|
||||
int64_t queue_time_sum_ms_ = 0;
|
||||
int64_t pause_time_sum_ms_ = 0;
|
||||
|
||||
// A map of streams used to prioritize from which stream to send next. We use
|
||||
// a multimap instead of a priority_queue since the priority of a stream can
|
||||
// change as a new packet is inserted, and a multimap allows us to remove and
|
||||
// then reinsert a StreamPrioKey if the priority has increased.
|
||||
std::multimap<StreamPrioKey, uint32_t> stream_priorities_;
|
||||
|
||||
// A map of SSRCs to Streams.
|
||||
std::map<uint32_t, Stream> streams_;
|
||||
|
||||
// The enqueue time of every packet currently in the queue. Used to figure out
|
||||
// the age of the oldest packet in the queue.
|
||||
std::multiset<int64_t> enqueue_times_;
|
||||
};
|
||||
} // namespace webrtc
|
||||
|
||||
#endif // MODULES_PACING_PACKET_QUEUE2_H_
|
||||
Loading…
x
Reference in New Issue
Block a user