Add support for detaching and merging of StatsCollection contents.

This is needed in order to be able to update the legacy stats
collector to fetch data channel stats from the network thread, which
is part of an upcoming change to data channels.

Bug: webrtc:11547
Change-Id: Ic205b0314b9f11a024d36d714c223cbddd0f3df3
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299462
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39732}
This commit is contained in:
Tommi 2023-03-31 12:09:30 +02:00 committed by WebRTC LUCI CQ
parent 30f3d2710d
commit 0fe65102cc
3 changed files with 115 additions and 10 deletions

View File

@ -12,6 +12,8 @@
#include <string.h>
#include <utility>
#include "absl/algorithm/container.h"
#include "api/make_ref_counted.h"
#include "rtc_base/checks.h"
@ -783,28 +785,28 @@ const StatsReport::Value* StatsReport::FindValue(StatsValueName name) const {
StatsCollection::StatsCollection() {}
StatsCollection::~StatsCollection() {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
for (auto* r : list_)
delete r;
}
StatsCollection::const_iterator StatsCollection::begin() const {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
return list_.begin();
}
StatsCollection::const_iterator StatsCollection::end() const {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
return list_.end();
}
size_t StatsCollection::size() const {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
return list_.size();
}
StatsReport* StatsCollection::InsertNew(const StatsReport::Id& id) {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(Find(id) == nullptr);
StatsReport* report = new StatsReport(id);
list_.push_back(report);
@ -812,13 +814,13 @@ StatsReport* StatsCollection::InsertNew(const StatsReport::Id& id) {
}
StatsReport* StatsCollection::FindOrAddNew(const StatsReport::Id& id) {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
StatsReport* ret = Find(id);
return ret ? ret : InsertNew(id);
}
StatsReport* StatsCollection::ReplaceOrAddNew(const StatsReport::Id& id) {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(id.get());
Container::iterator it = absl::c_find_if(
list_,
@ -832,10 +834,37 @@ StatsReport* StatsCollection::ReplaceOrAddNew(const StatsReport::Id& id) {
return InsertNew(id);
}
StatsCollection::Container StatsCollection::DetachCollection() {
RTC_DCHECK_RUN_ON(&thread_checker_);
#if RTC_DCHECK_IS_ON
for (auto* report : list_)
report->DetachSequenceCheckers();
#endif
return std::move(list_);
}
void StatsCollection::MergeCollection(Container collection) {
RTC_DCHECK_RUN_ON(&thread_checker_);
for (auto* report : collection) {
#if RTC_DCHECK_IS_ON
report->AttachSequenceCheckers();
#endif
Container::iterator it = absl::c_find_if(list_, [&](const StatsReport* r) {
return r->id()->Equals(report->id());
});
if (it == list_.end()) {
list_.push_back(report);
} else {
delete *it;
*it = report;
}
}
}
// Looks for a report with the given `id`. If one is not found, null
// will be returned.
StatsReport* StatsCollection::Find(const StatsReport::Id& id) {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&thread_checker_);
Container::iterator it = absl::c_find_if(
list_,
[&id](const StatsReport* r) -> bool { return r->id()->Equals(id); });

View File

@ -344,8 +344,15 @@ class RTC_EXPORT StatsReport {
// TODO(tommi): Move `name` and `display_name` out of the Value struct.
const StatsValueName name;
protected:
#if RTC_DCHECK_IS_ON
friend class StatsReport;
void DetachSequenceChecker() { thread_checker_.Detach(); }
void AttachSequenceChecker() { RTC_DCHECK_RUN_ON(&thread_checker_); }
#endif
private:
webrtc::SequenceChecker thread_checker_;
webrtc::SequenceChecker thread_checker_{webrtc::SequenceChecker::kDetached};
mutable int ref_count_ RTC_GUARDED_BY(thread_checker_) = 0;
const Type type_;
@ -403,6 +410,19 @@ class RTC_EXPORT StatsReport {
const Value* FindValue(StatsValueName name) const;
#if RTC_DCHECK_IS_ON
void DetachSequenceCheckers() {
for (auto& v : values_) {
v.second->DetachSequenceChecker();
}
}
void AttachSequenceCheckers() {
for (auto& v : values_) {
v.second->AttachSequenceChecker();
}
}
#endif
private:
// The unique identifier for this object.
// This is used as a key for this report in ordered containers,
@ -441,13 +461,16 @@ class StatsCollection {
StatsReport* FindOrAddNew(const StatsReport::Id& id);
StatsReport* ReplaceOrAddNew(const StatsReport::Id& id);
Container DetachCollection();
void MergeCollection(Container collection);
// Looks for a report with the given `id`. If one is not found, null
// will be returned.
StatsReport* Find(const StatsReport::Id& id);
private:
Container list_;
webrtc::SequenceChecker thread_checker_;
webrtc::SequenceChecker thread_checker_{SequenceChecker::kDetached};
};
} // namespace webrtc

View File

@ -40,6 +40,7 @@
#include "rtc_base/fake_ssl_identity.h"
#include "rtc_base/message_digest.h"
#include "rtc_base/net_helper.h"
#include "rtc_base/null_socket_server.h"
#include "rtc_base/rtc_certificate.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/ssl_identity.h"
@ -846,6 +847,58 @@ class StatsCollectorTrackTest : public LegacyStatsCollectorTest,
rtc::scoped_refptr<FakeAudioTrack> audio_track_;
};
TEST(StatsCollectionTest, DetachAndMerge) {
StatsCollection collection;
ASSERT_EQ(collection.size(), 0u);
// Create a new report with some information.
StatsReport::Id id(
StatsReport::NewTypedId(StatsReport::kStatsReportTypeTrack, "track_id"));
StatsReport* report = collection.ReplaceOrAddNew(id);
report->AddString(StatsReport::kStatsValueNameTrackId, "track_id");
ASSERT_TRUE(report);
// Check that looking it up, yields the same report.
ASSERT_EQ(report, collection.FindOrAddNew(id));
// There should be one report now.
ASSERT_EQ(collection.size(), 1u);
// Detach the internal container from the StatsCollection.
StatsCollection::Container container = collection.DetachCollection();
EXPECT_EQ(container.size(), 1u);
EXPECT_EQ(collection.size(), 0u);
EXPECT_EQ(nullptr, collection.Find(id));
// Merge it back and test if we find the same report.
collection.MergeCollection(std::move(container));
EXPECT_EQ(collection.size(), 1u);
EXPECT_EQ(report, collection.Find(id));
}
// Similar to `DetachAndMerge` above but detaches on one thread, merges on
// another to test that we don't trigger sequence checker.
TEST(StatsCollectionTest, DetachAndMergeThreaded) {
rtc::Thread new_thread(std::make_unique<rtc::NullSocketServer>());
new_thread.Start();
StatsReport::Id id(
StatsReport::NewTypedId(StatsReport::kStatsReportTypeTrack, "track_id"));
StatsReport* expected_report = nullptr;
StatsCollection::Container container = new_thread.BlockingCall([&] {
StatsCollection collection;
expected_report = collection.ReplaceOrAddNew(id);
expected_report->AddString(StatsReport::kStatsValueNameTrackId, "track_id");
return collection.DetachCollection();
});
StatsCollection collection;
collection.MergeCollection(std::move(container));
EXPECT_EQ(collection.size(), 1u);
EXPECT_EQ(expected_report, collection.Find(id));
new_thread.Stop();
}
TEST_F(LegacyStatsCollectorTest, FilterOutNegativeDataChannelId) {
auto pc = CreatePeerConnection();
auto stats = CreateStatsCollector(pc.get());