Bug: b/231397778 Change-Id: I8ce1486f91f6c84e246e043f2a4e2dd94fc29d06 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265809 Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Commit-Queue: Artem Titov <titovartem@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37220}
169 lines
5.1 KiB
C++
169 lines
5.1 KiB
C++
/*
|
|
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
|
|
#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
|
|
#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
|
|
|
|
#include <deque>
|
|
#include <memory>
|
|
#include <set>
|
|
#include <unordered_map>
|
|
|
|
#include "absl/types/optional.h"
|
|
#include "rtc_base/checks.h"
|
|
|
|
namespace webrtc {
|
|
|
|
// Represents the queue which can be read by multiple readers. Each reader reads
|
|
// from its own queue head. When an element is added it will become visible for
|
|
// all readers. When an element will be removed by all the readers, the element
|
|
// will be removed from the queue.
|
|
template <typename T>
|
|
class MultiReaderQueue {
|
|
public:
|
|
// Creates queue with exactly `readers_count` readers named from 0 to
|
|
// `readers_count - 1`.
|
|
explicit MultiReaderQueue(size_t readers_count) {
|
|
for (size_t i = 0; i < readers_count; ++i) {
|
|
heads_[i] = 0;
|
|
}
|
|
}
|
|
// Creates queue with specified readers.
|
|
explicit MultiReaderQueue(std::set<size_t> readers) {
|
|
for (size_t reader : readers) {
|
|
heads_[reader] = 0;
|
|
}
|
|
}
|
|
|
|
// Adds a new `reader`, initializing its reading position (the reader's head)
|
|
// equal to the one of `reader_to_copy`.
|
|
// Complexity O(MultiReaderQueue::size(reader_to_copy)).
|
|
void AddReader(size_t reader, size_t reader_to_copy) {
|
|
size_t pos = GetHeadPositionOrDie(reader_to_copy);
|
|
|
|
auto it = heads_.find(reader);
|
|
RTC_CHECK(it == heads_.end())
|
|
<< "Reader " << reader << " is already in the queue";
|
|
heads_[reader] = heads_[reader_to_copy];
|
|
for (size_t i = pos; i < queue_.size(); ++i) {
|
|
in_queues_[i]++;
|
|
}
|
|
}
|
|
|
|
// Adds a new `reader`, initializing its reading position equal to first
|
|
// element in the queue.
|
|
// Complexity O(MultiReaderQueue::size()).
|
|
void AddReader(size_t reader) {
|
|
auto it = heads_.find(reader);
|
|
RTC_CHECK(it == heads_.end())
|
|
<< "Reader " << reader << " is already in the queue";
|
|
heads_[reader] = removed_elements_count_;
|
|
for (size_t i = 0; i < queue_.size(); ++i) {
|
|
in_queues_[i]++;
|
|
}
|
|
}
|
|
|
|
// Removes specified `reader` from the queue.
|
|
// Complexity O(MultiReaderQueue::size(reader)).
|
|
void RemoveReader(size_t reader) {
|
|
size_t pos = GetHeadPositionOrDie(reader);
|
|
for (size_t i = pos; i < queue_.size(); ++i) {
|
|
in_queues_[i]--;
|
|
}
|
|
while (!in_queues_.empty() && in_queues_[0] == 0) {
|
|
PopFront();
|
|
}
|
|
heads_.erase(reader);
|
|
}
|
|
|
|
// Add value to the end of the queue. Complexity O(1).
|
|
void PushBack(T value) {
|
|
queue_.push_back(value);
|
|
in_queues_.push_back(heads_.size());
|
|
}
|
|
|
|
// Extract element from specified head. Complexity O(1).
|
|
absl::optional<T> PopFront(size_t reader) {
|
|
size_t pos = GetHeadPositionOrDie(reader);
|
|
if (pos >= queue_.size()) {
|
|
return absl::nullopt;
|
|
}
|
|
|
|
T out = queue_[pos];
|
|
|
|
in_queues_[pos]--;
|
|
heads_[reader]++;
|
|
|
|
if (in_queues_[pos] == 0) {
|
|
RTC_DCHECK_EQ(pos, 0);
|
|
PopFront();
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// Returns element at specified head. Complexity O(1).
|
|
absl::optional<T> Front(size_t reader) const {
|
|
size_t pos = GetHeadPositionOrDie(reader);
|
|
if (pos >= queue_.size()) {
|
|
return absl::nullopt;
|
|
}
|
|
return queue_[pos];
|
|
}
|
|
|
|
// Returns true if for specified head there are no more elements in the queue
|
|
// or false otherwise. Complexity O(1).
|
|
bool IsEmpty(size_t reader) const {
|
|
size_t pos = GetHeadPositionOrDie(reader);
|
|
return pos >= queue_.size();
|
|
}
|
|
|
|
// Returns size of the longest queue between all readers.
|
|
// Complexity O(1).
|
|
size_t size() const { return queue_.size(); }
|
|
|
|
// Returns size of the specified queue. Complexity O(1).
|
|
size_t size(size_t reader) const {
|
|
size_t pos = GetHeadPositionOrDie(reader);
|
|
return queue_.size() - pos;
|
|
}
|
|
|
|
// Complexity O(1).
|
|
size_t readers_count() const { return heads_.size(); }
|
|
|
|
private:
|
|
size_t GetHeadPositionOrDie(size_t reader) const {
|
|
auto it = heads_.find(reader);
|
|
RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader;
|
|
return it->second - removed_elements_count_;
|
|
}
|
|
|
|
void PopFront() {
|
|
RTC_DCHECK(!queue_.empty());
|
|
RTC_DCHECK_EQ(in_queues_[0], 0);
|
|
queue_.pop_front();
|
|
in_queues_.pop_front();
|
|
removed_elements_count_++;
|
|
}
|
|
|
|
// Number of the elements that were removed from the queue. It is used to
|
|
// subtract from each head to compute the right index inside `queue_`;
|
|
size_t removed_elements_count_ = 0;
|
|
std::deque<T> queue_;
|
|
// In how may queues the element at index `i` is. An element can be removed
|
|
// from the front if and only if it is in 0 queues.
|
|
std::deque<size_t> in_queues_;
|
|
// Map from the reader to the head position in the queue.
|
|
std::unordered_map<size_t, size_t> heads_;
|
|
};
|
|
|
|
} // namespace webrtc
|
|
|
|
#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
|