Remove sigslot from PeerConnectionInternal and RTCStatsCollector.

It turns out that there were several sigslot instances across data
channel, pc and stats classes that in practice only served as means
to update two counters in RTCStatsCollector. There's already a
notification path that's suitable.

This also fixes a case where the PC instance sat in the middle
of notifications from datachannels to the datachannel controller.

Bug: webrtc:11943
Change-Id: Ic60b76021584019f82085f6651230fe2fe82d465
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295781
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39456}
This commit is contained in:
Tommi 2023-03-02 10:51:16 +01:00 committed by WebRTC LUCI CQ
parent 2311f93909
commit d2afbaf33f
15 changed files with 96 additions and 122 deletions

View File

@ -1063,7 +1063,6 @@ rtc_source_set("rtc_stats_collector") {
"../rtc_base:threading",
"../rtc_base:timeutils",
"../rtc_base/synchronization:mutex",
"../rtc_base/third_party/sigslot:sigslot",
]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:bind_front",

View File

@ -103,6 +103,16 @@ bool DataChannelController::ReadyToSendData() const {
return (data_channel_transport() && data_channel_transport_ready_to_send_);
}
void DataChannelController::OnChannelStateChanged(
SctpDataChannel* channel,
DataChannelInterface::DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (state == DataChannelInterface::DataState::kClosed)
OnSctpDataChannelClosed(channel);
pc_->OnSctpDataChannelStateChanged(channel, state);
}
void DataChannelController::OnDataReceived(
int channel_id,
DataMessageType type,
@ -298,9 +308,6 @@ DataChannelController::InternalCreateSctpDataChannel(
return nullptr;
}
sctp_data_channels_.push_back(channel);
channel->SignalClosed.connect(
pc_, &PeerConnectionInternal::OnSctpDataChannelClosed);
SignalSctpDataChannelCreated_(channel.get());
return channel;
}
@ -381,8 +388,8 @@ bool DataChannelController::DataChannelSendData(
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
// TODO(bugs.webrtc.org/11547): Expect method to be called on the network
// thread instead. Remove the BlockingCall() below and move assocated state to
// the network thread.
// thread instead. Remove the BlockingCall() below and move associated state
// to the network thread.
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(data_channel_transport());

View File

@ -57,6 +57,8 @@ class DataChannelController : public SctpDataChannelControllerInterface,
void AddSctpDataStream(int sid) override;
void RemoveSctpDataStream(int sid) override;
bool ReadyToSendData() const override;
void OnChannelStateChanged(SctpDataChannel* channel,
DataChannelInterface::DataState state) override;
// Implements DataChannelSink.
void OnDataReceived(int channel_id,
@ -99,10 +101,6 @@ class DataChannelController : public SctpDataChannelControllerInterface,
DataChannelTransportInterface* data_channel_transport() const;
void set_data_channel_transport(DataChannelTransportInterface* transport);
sigslot::signal1<SctpDataChannel*>& SignalSctpDataChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread());
return SignalSctpDataChannelCreated_;
}
// Called when the transport for the data channels is closed or destroyed.
void OnTransportChannelClosed(RTCError error);
@ -169,9 +167,6 @@ class DataChannelController : public SctpDataChannelControllerInterface,
sigslot::signal1<int> SignalDataChannelTransportChannelClosed_s
RTC_GUARDED_BY(signaling_thread());
sigslot::signal1<SctpDataChannel*> SignalSctpDataChannelCreated_
RTC_GUARDED_BY(signaling_thread());
// Owning PeerConnection.
PeerConnectionInternal* const pc_;
// The weak pointers must be dereferenced and invalidated on the signalling

View File

@ -27,7 +27,6 @@
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/gunit.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
#include "test/gtest.h"
@ -107,20 +106,6 @@ class SctpDataChannelTest : public ::testing::Test {
rtc::scoped_refptr<SctpDataChannel> webrtc_data_channel_;
};
class StateSignalsListener : public sigslot::has_slots<> {
public:
int opened_count() const { return opened_count_; }
int closed_count() const { return closed_count_; }
void OnSignalOpened(DataChannelInterface* data_channel) { ++opened_count_; }
void OnSignalClosed(DataChannelInterface* data_channel) { ++closed_count_; }
private:
int opened_count_ = 0;
int closed_count_ = 0;
};
// Verifies that the data channel is connected to the transport after creation.
TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
controller_->set_transport_available(true);
@ -150,26 +135,22 @@ TEST_F(SctpDataChannelTest, ConnectedAfterTransportBecomesAvailable) {
// Tests the state of the data channel.
TEST_F(SctpDataChannelTest, StateTransition) {
StateSignalsListener state_signals_listener;
webrtc_data_channel_->SignalOpened.connect(
&state_signals_listener, &StateSignalsListener::OnSignalOpened);
webrtc_data_channel_->SignalClosed.connect(
&state_signals_listener, &StateSignalsListener::OnSignalClosed);
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting,
webrtc_data_channel_->state());
EXPECT_EQ(state_signals_listener.opened_count(), 0);
EXPECT_EQ(state_signals_listener.closed_count(), 0);
EXPECT_EQ(controller_->channels_opened(), 0);
EXPECT_EQ(controller_->channels_closed(), 0);
SetChannelReady();
EXPECT_EQ(webrtc::DataChannelInterface::kOpen, webrtc_data_channel_->state());
EXPECT_EQ(state_signals_listener.opened_count(), 1);
EXPECT_EQ(state_signals_listener.closed_count(), 0);
EXPECT_EQ(controller_->channels_opened(), 1);
EXPECT_EQ(controller_->channels_closed(), 0);
webrtc_data_channel_->Close();
EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
webrtc_data_channel_->state());
EXPECT_TRUE(webrtc_data_channel_->error().ok());
EXPECT_EQ(state_signals_listener.opened_count(), 1);
EXPECT_EQ(state_signals_listener.closed_count(), 1);
EXPECT_EQ(controller_->channels_opened(), 1);
EXPECT_EQ(controller_->channels_closed(), 1);
// Verifies that it's disconnected from the transport.
EXPECT_FALSE(controller_->IsConnected(webrtc_data_channel_.get()));
}

View File

@ -2113,11 +2113,12 @@ void PeerConnection::ResetSctpDataMid() {
SetSctpTransportName("");
}
void PeerConnection::OnSctpDataChannelClosed(DataChannelInterface* channel) {
// Since data_channel_controller doesn't do signals, this
// signal is relayed here.
data_channel_controller_.OnSctpDataChannelClosed(
static_cast<SctpDataChannel*>(channel));
void PeerConnection::OnSctpDataChannelStateChanged(
DataChannelInterface* channel,
DataChannelInterface::DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (stats_collector_)
stats_collector_->OnSctpDataChannelStateChanged(channel, state);
}
PeerConnection::InitializePortAllocatorResult

View File

@ -283,11 +283,6 @@ class PeerConnection : public PeerConnectionInternal,
return rtp_manager()->transceivers()->List();
}
sigslot::signal1<SctpDataChannel*>& SignalSctpDataChannelCreated() override {
RTC_DCHECK_RUN_ON(signaling_thread());
return data_channel_controller_.SignalSctpDataChannelCreated();
}
std::vector<DataChannelStats> GetDataChannelStats() const override;
absl::optional<std::string> sctp_transport_name() const override;
@ -320,8 +315,10 @@ class PeerConnection : public PeerConnectionInternal,
}
// Get current SSL role used by SCTP's underlying transport.
bool GetSctpSslRole(rtc::SSLRole* role) override;
// Handler for the "channel closed" signal
void OnSctpDataChannelClosed(DataChannelInterface* channel) override;
void OnSctpDataChannelStateChanged(
DataChannelInterface* channel,
DataChannelInterface::DataState state) override;
bool ShouldFireNegotiationNeededEvent(uint32_t event_id) override;

View File

@ -130,8 +130,7 @@ class PeerConnectionSdpMethods {
// Functions defined in this class are called by other objects,
// but not by SdpOfferAnswerHandler.
class PeerConnectionInternal : public PeerConnectionInterface,
public PeerConnectionSdpMethods,
public sigslot::has_slots<> {
public PeerConnectionSdpMethods {
public:
virtual rtc::Thread* network_thread() const = 0;
virtual rtc::Thread* worker_thread() const = 0;
@ -143,9 +142,6 @@ class PeerConnectionInternal : public PeerConnectionInterface,
rtc::scoped_refptr<RtpTransceiverProxyWithInternal<RtpTransceiver>>>
GetTransceiversInternal() const = 0;
virtual sigslot::signal1<SctpDataChannel*>&
SignalSctpDataChannelCreated() = 0;
// Call on the network thread to fetch stats for all the data channels.
// TODO(tommi): Make pure virtual after downstream updates.
virtual std::vector<DataChannelStats> GetDataChannelStats() const {
@ -180,8 +176,10 @@ class PeerConnectionInternal : public PeerConnectionInterface,
rtc::SSLRole* role) = 0;
// Functions needed by DataChannelController
virtual void NoteDataAddedEvent() {}
// Handler for the "channel closed" signal
virtual void OnSctpDataChannelClosed(DataChannelInterface* channel) {}
// Handler for sctp data channel state changes.
virtual void OnSctpDataChannelStateChanged(
DataChannelInterface* channel,
DataChannelInterface::DataState state) {}
};
} // namespace webrtc

View File

@ -1390,8 +1390,6 @@ RTCStatsCollector::RTCStatsCollector(PeerConnectionInternal* pc,
RTC_DCHECK(worker_thread_);
RTC_DCHECK(network_thread_);
RTC_DCHECK_GE(cache_lifetime_us_, 0);
pc_->SignalSctpDataChannelCreated().connect(
this, &RTCStatsCollector::OnSctpDataChannelCreated);
}
RTCStatsCollector::~RTCStatsCollector() {
@ -2493,27 +2491,23 @@ void RTCStatsCollector::PrepareTransceiverStatsInfosAndCallStats_s_w_n() {
}
}
void RTCStatsCollector::OnSctpDataChannelCreated(SctpDataChannel* channel) {
channel->SignalOpened.connect(this, &RTCStatsCollector::OnDataChannelOpened);
channel->SignalClosed.connect(this, &RTCStatsCollector::OnDataChannelClosed);
}
void RTCStatsCollector::OnDataChannelOpened(DataChannelInterface* channel) {
void RTCStatsCollector::OnSctpDataChannelStateChanged(
DataChannelInterface* channel,
DataChannelInterface::DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread_);
bool result = internal_record_.opened_data_channels
.insert(reinterpret_cast<uintptr_t>(channel))
.second;
++internal_record_.data_channels_opened;
RTC_DCHECK(result);
}
void RTCStatsCollector::OnDataChannelClosed(DataChannelInterface* channel) {
RTC_DCHECK_RUN_ON(signaling_thread_);
// Only channels that have been fully opened (and have increased the
// `data_channels_opened_` counter) increase the closed counter.
if (internal_record_.opened_data_channels.erase(
reinterpret_cast<uintptr_t>(channel))) {
++internal_record_.data_channels_closed;
if (state == DataChannelInterface::DataState::kOpen) {
bool result = internal_record_.opened_data_channels
.insert(reinterpret_cast<uintptr_t>(channel))
.second;
RTC_DCHECK(result);
++internal_record_.data_channels_opened;
} else if (state == DataChannelInterface::DataState::kClosed) {
// Only channels that have been fully opened (and have increased the
// `data_channels_opened_` counter) increase the closed counter.
if (internal_record_.opened_data_channels.erase(
reinterpret_cast<uintptr_t>(channel))) {
++internal_record_.data_channels_closed;
}
}
}

View File

@ -44,7 +44,6 @@
#include "rtc_base/ssl_certificate.h"
#include "rtc_base/ssl_identity.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h"
@ -57,8 +56,7 @@ class RtpReceiverInternal;
// Stats are gathered on the signaling, worker and network threads
// asynchronously. The callback is invoked on the signaling thread. Resulting
// reports are cached for `cache_lifetime_` ms.
class RTCStatsCollector : public rtc::RefCountInterface,
public sigslot::has_slots<> {
class RTCStatsCollector : public rtc::RefCountInterface {
public:
static rtc::scoped_refptr<RTCStatsCollector> Create(
PeerConnectionInternal* pc,
@ -92,6 +90,10 @@ class RTCStatsCollector : public rtc::RefCountInterface,
// completed. Must be called on the signaling thread.
void WaitForPendingRequest();
// Called by the PeerConnection instance when data channel states change.
void OnSctpDataChannelStateChanged(DataChannelInterface* channel,
DataChannelInterface::DataState state);
protected:
RTCStatsCollector(PeerConnectionInternal* pc, int64_t cache_lifetime_us);
~RTCStatsCollector();
@ -255,12 +257,6 @@ class RTCStatsCollector : public rtc::RefCountInterface,
rtc::scoped_refptr<RtpSenderInternal> sender_selector,
rtc::scoped_refptr<RtpReceiverInternal> receiver_selector);
// Slots for signals (sigslot) that are wired up to `pc_`.
void OnSctpDataChannelCreated(SctpDataChannel* channel);
// Slots for signals (sigslot) that are wired up to `channel`.
void OnDataChannelOpened(DataChannelInterface* channel);
void OnDataChannelClosed(DataChannelInterface* channel);
PeerConnectionInternal* const pc_;
rtc::Thread* const signaling_thread_;
rtc::Thread* const worker_thread_;

View File

@ -2111,15 +2111,15 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
rtc::scoped_refptr<SctpDataChannel> dummy_channel_a = SctpDataChannel::Create(
&controller, "DummyChannelA", InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());
pc_->SignalSctpDataChannelCreated()(dummy_channel_a.get());
rtc::scoped_refptr<SctpDataChannel> dummy_channel_b = SctpDataChannel::Create(
&controller, "DummyChannelB", InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());
pc_->SignalSctpDataChannelCreated()(dummy_channel_b.get());
dummy_channel_a->SignalOpened(dummy_channel_a.get());
stats_->stats_collector()->OnSctpDataChannelStateChanged(
dummy_channel_a.get(), DataChannelInterface::DataState::kOpen);
// Closing a channel that is not opened should not affect the counts.
dummy_channel_b->SignalClosed(dummy_channel_b.get());
stats_->stats_collector()->OnSctpDataChannelStateChanged(
dummy_channel_b.get(), DataChannelInterface::DataState::kClosed);
{
rtc::scoped_refptr<const RTCStatsReport> report =
@ -2131,8 +2131,10 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
EXPECT_EQ(expected, report->Get("P")->cast_to<RTCPeerConnectionStats>());
}
dummy_channel_b->SignalOpened(dummy_channel_b.get());
dummy_channel_b->SignalClosed(dummy_channel_b.get());
stats_->stats_collector()->OnSctpDataChannelStateChanged(
dummy_channel_b.get(), DataChannelInterface::DataState::kOpen);
stats_->stats_collector()->OnSctpDataChannelStateChanged(
dummy_channel_b.get(), DataChannelInterface::DataState::kClosed);
{
rtc::scoped_refptr<const RTCStatsReport> report =
@ -2146,7 +2148,8 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
// Re-opening a data channel (or opening a new data channel that is re-using
// the same address in memory) should increase the opened count.
dummy_channel_b->SignalOpened(dummy_channel_b.get());
stats_->stats_collector()->OnSctpDataChannelStateChanged(
dummy_channel_b.get(), DataChannelInterface::DataState::kOpen);
{
rtc::scoped_refptr<const RTCStatsReport> report =
@ -2158,8 +2161,10 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
EXPECT_EQ(expected, report->Get("P")->cast_to<RTCPeerConnectionStats>());
}
dummy_channel_a->SignalClosed(dummy_channel_a.get());
dummy_channel_b->SignalClosed(dummy_channel_b.get());
stats_->stats_collector()->OnSctpDataChannelStateChanged(
dummy_channel_a.get(), DataChannelInterface::DataState::kClosed);
stats_->stats_collector()->OnSctpDataChannelStateChanged(
dummy_channel_b.get(), DataChannelInterface::DataState::kClosed);
{
rtc::scoped_refptr<const RTCStatsReport> report =

View File

@ -564,11 +564,9 @@ void SctpDataChannel::SetState(DataState state) {
if (observer_) {
observer_->OnStateChange();
}
if (state_ == kOpen) {
SignalOpened(this);
} else if (state_ == kClosed) {
SignalClosed(this);
}
if (!controller_detached_)
controller_->OnChannelStateChanged(this, state_);
}
void SctpDataChannel::DisconnectFromTransport() {

View File

@ -55,6 +55,9 @@ class SctpDataChannelControllerInterface {
virtual void RemoveSctpDataStream(int sid) = 0;
// Returns true if the transport channel is ready to send data.
virtual bool ReadyToSendData() const = 0;
// Notifies the controller of state changes.
virtual void OnChannelStateChanged(SctpDataChannel* data_channel,
DataChannelInterface::DataState state) = 0;
protected:
virtual ~SctpDataChannelControllerInterface() {}
@ -214,12 +217,6 @@ class SctpDataChannel : public DataChannelInterface,
DataChannelStats GetStats() const;
// Emitted when state transitions to kOpen.
sigslot::signal1<DataChannelInterface*> SignalOpened;
// Emitted when state transitions to kClosed.
// This signal can be used to tell when the channel's sid is free.
sigslot::signal1<DataChannelInterface*> SignalClosed;
// Reset the allocator for internal ID values for testing, so that
// the internal IDs generated are predictable. Test only.
static void ResetInternalIdAllocatorForTesting(int new_value);

View File

@ -53,7 +53,7 @@ class FakeDataChannelController
if (!transport_available_) {
return false;
}
RTC_LOG(LS_INFO) << "DataChannel connected " << data_channel;
RTC_LOG(LS_VERBOSE) << "DataChannel connected " << data_channel;
connected_channels_.insert(data_channel);
return true;
}
@ -61,7 +61,7 @@ class FakeDataChannelController
void DisconnectDataChannel(webrtc::SctpDataChannel* data_channel) override {
RTC_CHECK(connected_channels_.find(data_channel) !=
connected_channels_.end());
RTC_LOG(LS_INFO) << "DataChannel disconnected " << data_channel;
RTC_LOG(LS_VERBOSE) << "DataChannel disconnected " << data_channel;
connected_channels_.erase(data_channel);
}
@ -90,6 +90,16 @@ class FakeDataChannelController
bool ReadyToSendData() const override { return ready_to_send_; }
void OnChannelStateChanged(
webrtc::SctpDataChannel* data_channel,
webrtc::DataChannelInterface::DataState state) override {
if (state == webrtc::DataChannelInterface::DataState::kOpen) {
++channels_opened_;
} else if (state == webrtc::DataChannelInterface::DataState::kClosed) {
++channels_closed_;
}
}
// Set true to emulate the SCTP stream being blocked by congestion control.
void set_send_blocked(bool blocked) {
send_blocked_ = blocked;
@ -146,6 +156,9 @@ class FakeDataChannelController
return recv_ssrcs_.find(stream) != recv_ssrcs_.end();
}
int channels_opened() const { return channels_opened_; }
int channels_closed() const { return channels_closed_; }
private:
int last_sid_;
webrtc::SendDataParams last_send_data_params_;
@ -153,6 +166,8 @@ class FakeDataChannelController
bool transport_available_;
bool ready_to_send_;
bool transport_error_;
int channels_closed_ = 0;
int channels_opened_ = 0;
std::set<webrtc::SctpDataChannel*> connected_channels_;
std::set<uint32_t> send_ssrcs_;
std::set<uint32_t> recv_ssrcs_;

View File

@ -255,10 +255,6 @@ class FakePeerConnectionBase : public PeerConnectionInternal {
return {};
}
sigslot::signal1<SctpDataChannel*>& SignalSctpDataChannelCreated() override {
return SignalSctpDataChannelCreated_;
}
absl::optional<std::string> sctp_transport_name() const override {
return absl::nullopt;
}
@ -371,7 +367,6 @@ class FakePeerConnectionBase : public PeerConnectionInternal {
protected:
webrtc::test::ScopedKeyValueConfig field_trials_;
sigslot::signal1<SctpDataChannel*> SignalSctpDataChannelCreated_;
};
} // namespace webrtc

View File

@ -282,10 +282,6 @@ class MockPeerConnectionInternal : public PeerConnectionInternal {
GetTransceiversInternal,
(),
(const, override));
MOCK_METHOD(sigslot::signal1<SctpDataChannel*>&,
SignalSctpDataChannelCreated,
(),
(override));
MOCK_METHOD(std::vector<DataChannelStats>,
GetDataChannelStats,
(),
@ -322,8 +318,8 @@ class MockPeerConnectionInternal : public PeerConnectionInternal {
(override));
MOCK_METHOD(void, NoteDataAddedEvent, (), (override));
MOCK_METHOD(void,
OnSctpDataChannelClosed,
(DataChannelInterface*),
OnSctpDataChannelStateChanged,
(DataChannelInterface * channel, DataChannelInterface::DataState),
(override));
};