Add ability to handle data from multiple streams in RateAccCounter.

BUG=webrtc:5283

Review-Url: https://codereview.webrtc.org/2235223002
Cr-Commit-Position: refs/heads/master@{#14864}
This commit is contained in:
asapersson 2016-11-01 00:21:34 -07:00 committed by Commit bot
parent 7eaa83622b
commit fe647f4ab2
3 changed files with 165 additions and 54 deletions

View File

@ -11,6 +11,8 @@
#include "webrtc/video/stats_counter.h"
#include <algorithm>
#include <limits>
#include <map>
#include "webrtc/base/checks.h"
#include "webrtc/system_wrappers/include/clock.h"
@ -20,6 +22,7 @@ namespace webrtc {
namespace {
// Default periodic time interval for processing samples.
const int64_t kDefaultProcessIntervalMs = 2000;
const uint32_t kStreamId0 = 0;
} // namespace
// Class holding periodically computed metrics.
@ -62,19 +65,98 @@ class AggregatedCounter {
AggregatedStats stats_;
};
// Class holding gathered samples within a process interval.
class Samples {
public:
Samples() : total_count_(0) {}
~Samples() {}
void Add(int sample, uint32_t stream_id) {
samples_[stream_id].Add(sample);
++total_count_;
}
void Set(int sample, uint32_t stream_id) {
samples_[stream_id].Set(sample);
++total_count_;
}
int64_t Count() const { return total_count_; }
bool Empty() const { return total_count_ == 0; }
int64_t Sum() const {
int64_t sum = 0;
for (const auto& it : samples_)
sum += it.second.sum_;
return sum;
}
int Max() const {
int max = std::numeric_limits<int>::min();
for (const auto& it : samples_)
max = std::max(it.second.max_, max);
return max;
}
void Reset() {
for (auto& it : samples_)
it.second.Reset();
total_count_ = 0;
}
int64_t Diff() const {
int64_t sum_diff = 0;
int count = 0;
for (const auto& it : samples_) {
if (it.second.count_ > 0) {
int64_t diff = it.second.sum_ - it.second.last_sum_;
if (diff >= 0) {
sum_diff += diff;
++count;
}
}
}
return (count > 0) ? sum_diff : -1;
}
private:
struct Stats {
void Add(int sample) {
sum_ += sample;
++count_;
max_ = std::max(sample, max_);
}
void Set(int sample) {
sum_ = sample;
++count_;
}
void Reset() {
if (count_ > 0)
last_sum_ = sum_;
sum_ = 0;
count_ = 0;
max_ = std::numeric_limits<int>::min();
}
int max_ = std::numeric_limits<int>::min();
int64_t count_ = 0;
int64_t sum_ = 0;
int64_t last_sum_ = 0;
};
int64_t total_count_;
std::map<uint32_t, Stats> samples_; // Gathered samples mapped by stream id.
};
// StatsCounter class.
StatsCounter::StatsCounter(Clock* clock,
int64_t process_intervals_ms,
bool include_empty_intervals,
StatsCounterObserver* observer)
: max_(0),
sum_(0),
num_samples_(0),
last_sum_(0),
aggregated_counter_(new AggregatedCounter()),
: include_empty_intervals_(include_empty_intervals),
process_intervals_ms_(process_intervals_ms),
aggregated_counter_(new AggregatedCounter()),
samples_(new Samples()),
clock_(clock),
include_empty_intervals_(include_empty_intervals),
observer_(observer),
last_process_time_ms_(-1),
paused_(false) {
@ -120,21 +202,15 @@ bool StatsCounter::TimeToProcess(int* elapsed_intervals) {
return true;
}
void StatsCounter::Set(int sample) {
void StatsCounter::Set(int sample, uint32_t stream_id) {
TryProcess();
++num_samples_;
sum_ = sample;
samples_->Set(sample, stream_id);
paused_ = false;
}
void StatsCounter::Add(int sample) {
TryProcess();
++num_samples_;
sum_ += sample;
if (num_samples_ == 1)
max_ = sample;
max_ = std::max(sample, max_);
samples_->Add(sample, kStreamId0);
paused_ = false;
}
@ -164,17 +240,13 @@ void StatsCounter::TryProcess() {
// If there are no samples, all elapsed intervals are empty (otherwise one
// interval contains sample(s), discard this interval).
int empty_intervals =
(num_samples_ == 0) ? elapsed_intervals : (elapsed_intervals - 1);
samples_->Empty() ? elapsed_intervals : (elapsed_intervals - 1);
ReportMetricToAggregatedCounter(GetValueForEmptyInterval(),
empty_intervals);
}
// Reset samples for elapsed interval.
if (num_samples_ > 0)
last_sum_ = sum_;
sum_ = 0;
max_ = 0;
num_samples_ = 0;
samples_->Reset();
}
bool StatsCounter::IncludeEmptyIntervals() const {
@ -195,9 +267,11 @@ void AvgCounter::Add(int sample) {
}
bool AvgCounter::GetMetric(int* metric) const {
if (num_samples_ == 0)
int64_t count = samples_->Count();
if (count == 0)
return false;
*metric = (sum_ + num_samples_ / 2) / num_samples_;
*metric = (samples_->Sum() + count / 2) / count;
return true;
}
@ -218,9 +292,10 @@ void MaxCounter::Add(int sample) {
}
bool MaxCounter::GetMetric(int* metric) const {
if (num_samples_ == 0)
if (samples_->Empty())
return false;
*metric = max_;
*metric = samples_->Max();
return true;
}
@ -240,9 +315,11 @@ void PercentCounter::Add(bool sample) {
}
bool PercentCounter::GetMetric(int* metric) const {
if (num_samples_ == 0)
int64_t count = samples_->Count();
if (count == 0)
return false;
*metric = (sum_ * 100 + num_samples_ / 2) / num_samples_;
*metric = (samples_->Sum() * 100 + count / 2) / count;
return true;
}
@ -262,9 +339,11 @@ void PermilleCounter::Add(bool sample) {
}
bool PermilleCounter::GetMetric(int* metric) const {
if (num_samples_ == 0)
int64_t count = samples_->Count();
if (count == 0)
return false;
*metric = (sum_ * 1000 + num_samples_ / 2) / num_samples_;
*metric = (samples_->Sum() * 1000 + count / 2) / count;
return true;
}
@ -286,9 +365,11 @@ void RateCounter::Add(int sample) {
}
bool RateCounter::GetMetric(int* metric) const {
if (num_samples_ == 0)
if (samples_->Empty())
return false;
*metric = (sum_ * 1000 + process_intervals_ms_ / 2) / process_intervals_ms_;
*metric = (samples_->Sum() * 1000 + process_intervals_ms_ / 2) /
process_intervals_ms_;
return true;
}
@ -304,15 +385,16 @@ RateAccCounter::RateAccCounter(Clock* clock,
include_empty_intervals,
observer) {}
void RateAccCounter::Set(int sample) {
StatsCounter::Set(sample);
void RateAccCounter::Set(int sample, uint32_t stream_id) {
StatsCounter::Set(sample, stream_id);
}
bool RateAccCounter::GetMetric(int* metric) const {
if (num_samples_ == 0 || last_sum_ > sum_)
int64_t diff = samples_->Diff();
if (diff < 0 || (!include_empty_intervals_ && diff == 0))
return false;
*metric = ((sum_ - last_sum_) * 1000 + process_intervals_ms_ / 2) /
process_intervals_ms_;
*metric = (diff * 1000 + process_intervals_ms_ / 2) / process_intervals_ms_;
return true;
}

View File

@ -20,6 +20,7 @@ namespace webrtc {
class AggregatedCounter;
class Clock;
class Samples;
// |StatsCounterObserver| is called periodically when a metric is updated.
class StatsCounterObserver {
@ -104,15 +105,12 @@ class StatsCounter {
StatsCounterObserver* observer);
void Add(int sample);
void Set(int sample);
void Set(int sample, uint32_t stream_id);
int max_;
int64_t sum_;
int64_t num_samples_;
int64_t last_sum_;
const std::unique_ptr<AggregatedCounter> aggregated_counter_;
const bool include_empty_intervals_;
const int64_t process_intervals_ms_;
const std::unique_ptr<AggregatedCounter> aggregated_counter_;
const std::unique_ptr<Samples> samples_;
private:
bool TimeToProcess(int* num_elapsed_intervals);
@ -121,7 +119,6 @@ class StatsCounter {
bool IncludeEmptyIntervals() const;
Clock* const clock_;
const bool include_empty_intervals_;
const std::unique_ptr<StatsCounterObserver> observer_;
int64_t last_process_time_ms_;
bool paused_;
@ -262,7 +259,7 @@ class RateAccCounter : public StatsCounter {
bool include_empty_intervals);
~RateAccCounter() override {}
void Set(int sample);
void Set(int sample, uint32_t stream_id);
private:
bool GetMetric(int* metric) const override;

View File

@ -16,6 +16,7 @@
namespace webrtc {
namespace {
const int kDefaultProcessIntervalMs = 2000;
const uint32_t kStreamId = 123456;
class StatsCounterObserverImpl : public StatsCounterObserver {
public:
@ -42,7 +43,7 @@ class StatsCounterTest : public ::testing::Test {
void SetSampleAndAdvance(int sample,
int interval_ms,
RateAccCounter* counter) {
counter->Set(sample);
counter->Set(sample, kStreamId);
clock_.AdvanceTimeMilliseconds(interval_ms);
}
@ -197,11 +198,11 @@ TEST_F(StatsCounterTest, TestMetric_RateCounter) {
TEST_F(StatsCounterTest, TestMetric_RateAccCounter) {
StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
RateAccCounter counter(&clock_, observer, true);
counter.Set(175);
counter.Set(188);
counter.Set(175, kStreamId);
counter.Set(188, kStreamId);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
// Trigger process (sample included in next interval).
counter.Set(192);
counter.Set(192, kStreamId);
// Rate per interval: (188 - 0) / 2 sec = 94 samples/sec
EXPECT_EQ(1, observer->num_calls_);
EXPECT_EQ(94, observer->last_sample_);
@ -212,6 +213,37 @@ TEST_F(StatsCounterTest, TestMetric_RateAccCounter) {
EXPECT_EQ(94, stats.max);
}
TEST_F(StatsCounterTest, TestMetric_RateAccCounterWithMultipleStreamIds) {
StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
RateAccCounter counter(&clock_, observer, true);
counter.Set(175, kStreamId);
counter.Set(188, kStreamId);
counter.Set(100, kStreamId + 1);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
// Trigger process (sample included in next interval).
counter.Set(150, kStreamId + 1);
// Rate per interval: ((188 - 0) + (100 - 0)) / 2 sec = 144 samples/sec
EXPECT_EQ(1, observer->num_calls_);
EXPECT_EQ(144, observer->last_sample_);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
// Trigger process (sample included in next interval).
counter.Set(198, kStreamId);
// Rate per interval: (0 + (150 - 100)) / 2 sec = 25 samples/sec
EXPECT_EQ(2, observer->num_calls_);
EXPECT_EQ(25, observer->last_sample_);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
// Trigger process (sample included in next interval).
counter.Set(200, kStreamId);
// Rate per interval: ((198 - 188) + (0)) / 2 sec = 5 samples/sec
EXPECT_EQ(3, observer->num_calls_);
EXPECT_EQ(5, observer->last_sample_);
// Aggregated stats.
AggregatedStats stats = counter.GetStats();
EXPECT_EQ(3, stats.num_samples);
EXPECT_EQ(5, stats.min);
EXPECT_EQ(144, stats.max);
}
TEST_F(StatsCounterTest, TestGetStats_MultipleIntervals) {
AvgCounter counter(&clock_, nullptr, false);
const int kSample1 = 1;
@ -266,7 +298,7 @@ TEST_F(StatsCounterTest, TestRateAccCounter_NegativeRateIgnored) {
EXPECT_EQ(1, observer->num_calls_);
EXPECT_EQ(100, observer->last_sample_);
// Trigger process (sample included in next interval).
counter.Set(2000);
counter.Set(2000, kStreamId);
EXPECT_EQ(2, observer->num_calls_);
EXPECT_EQ(300, observer->last_sample_);
// Aggregated stats.
@ -386,7 +418,7 @@ TEST_F(StatsCounterTest, TestRateAccCounter_IntervalsWithoutSamplesIncluded) {
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
VerifyStatsIsNotSet(counter.ProcessAndGetStats());
// Add sample and advance 3 intervals (2 w/o samples -> zero reported).
counter.Set(12);
counter.Set(12, kStreamId);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs * 4 - 1);
// Trigger process and verify stats: [0:2][6:1]
counter.ProcessAndGetStats();
@ -399,7 +431,7 @@ TEST_F(StatsCounterTest, TestRateAccCounter_IntervalsWithoutSamplesIncluded) {
EXPECT_EQ(0, observer->last_sample_);
// Insert sample and advance non-complete interval, no change, [0:3][6:1]
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs - 1);
counter.Set(60);
counter.Set(60, kStreamId);
EXPECT_EQ(4, observer->num_calls_);
// Make next interval pass, [0:3][6:1][24:1]
clock_.AdvanceTimeMilliseconds(1);
@ -415,7 +447,7 @@ TEST_F(StatsCounterTest, TestRateAccCounter_IntervalsWithoutSamplesIgnored) {
StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
RateAccCounter counter(&clock_, observer, false);
// Add sample and advance 3 intervals (2 w/o samples -> ignored).
counter.Set(12);
counter.Set(12, kStreamId);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs * 4 - 1);
// Trigger process and verify stats: [6:1]
counter.ProcessAndGetStats();
@ -427,7 +459,7 @@ TEST_F(StatsCounterTest, TestRateAccCounter_IntervalsWithoutSamplesIgnored) {
EXPECT_EQ(1, observer->num_calls_);
// Insert sample and advance non-complete interval, no change, [6:1]
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs - 1);
counter.Set(60);
counter.Set(60, kStreamId);
counter.ProcessAndGetStats();
EXPECT_EQ(1, observer->num_calls_);
// Make next interval pass, [6:1][24:1]