[DataChannelController] Associate two methods with the network thread

Make DataChannelController's AddSctpDataStream and
RemoveSctpDataStream be required to be called on the network thread.
This moves blocking calls within those methods over to the
SctpDataChannel class instead.

For production code there's no functional change in this CL. However, this CL:

1) Introduces an actual dedicated network thread to
   DataChannelController and SctpDataChannel tests.
2) Removes two data_channel_transport() checks inside DCC that
   were being done on the wrong thread (signaling) and
3) introduces a network calling block to SctpDataChannel, where more
   network thread related work needs to be done and can be bundled.
   (to be done in follow-up CLs).

Bug: webrtc:11547
Change-Id: I6787ac395e61d4a25ae3a74a123e3357cbb46b54
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/298052
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39688}
This commit is contained in:
Tommi 2023-03-27 12:39:33 +02:00 committed by WebRTC LUCI CQ
parent 8e781a1bb1
commit 55f72800b4
9 changed files with 83 additions and 46 deletions

View File

@ -2461,6 +2461,7 @@ if (rtc_include_tests && !build_with_chromium) {
"../rtc_base:net_helper",
"../rtc_base:network",
"../rtc_base:network_constants",
"../rtc_base:null_socket_server",
"../rtc_base:refcount",
"../rtc_base:rtc_base_tests_utils",
"../rtc_base:rtc_certificate_generator",
@ -2562,6 +2563,7 @@ if (rtc_include_tests && !build_with_chromium) {
":pc_test_utils",
":peer_connection_internal",
":sctp_data_channel",
"../rtc_base:null_socket_server",
"../test:run_loop",
"../test:test_support",
]

View File

@ -44,22 +44,16 @@ RTCError DataChannelController::SendData(
}
void DataChannelController::AddSctpDataStream(StreamId sid) {
RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport()) {
network_thread()->BlockingCall([this, sid] {
if (data_channel_transport()) {
data_channel_transport()->OpenChannel(sid.stream_id_int());
}
});
data_channel_transport()->OpenChannel(sid.stream_id_int());
}
}
void DataChannelController::RemoveSctpDataStream(StreamId sid) {
RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport()) {
network_thread()->BlockingCall([this, sid] {
if (data_channel_transport()) {
data_channel_transport()->CloseChannel(sid.stream_id_int());
}
});
data_channel_transport()->CloseChannel(sid.stream_id_int());
}
}

View File

@ -15,6 +15,7 @@
#include "pc/peer_connection_internal.h"
#include "pc/sctp_data_channel.h"
#include "pc/test/mock_peer_connection_internal.h"
#include "rtc_base/null_socket_server.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/run_loop.h"
@ -44,17 +45,22 @@ class MockDataChannelTransport : public webrtc::DataChannelTransportInterface {
class DataChannelControllerTest : public ::testing::Test {
protected:
DataChannelControllerTest() {
DataChannelControllerTest()
: network_thread_(std::make_unique<rtc::NullSocketServer>()) {
network_thread_.Start();
pc_ = rtc::make_ref_counted<NiceMock<MockPeerConnectionInternal>>();
ON_CALL(*pc_, signaling_thread)
.WillByDefault(Return(rtc::Thread::Current()));
// TODO(tommi): Return a dedicated thread.
ON_CALL(*pc_, network_thread).WillByDefault(Return(rtc::Thread::Current()));
ON_CALL(*pc_, network_thread).WillByDefault(Return(&network_thread_));
}
~DataChannelControllerTest() override { run_loop_.Flush(); }
~DataChannelControllerTest() override {
run_loop_.Flush();
network_thread_.Stop();
}
test::RunLoop run_loop_;
rtc::Thread network_thread_;
rtc::scoped_refptr<NiceMock<MockPeerConnectionInternal>> pc_;
};

View File

@ -26,9 +26,11 @@
#include "pc/test/fake_data_channel_controller.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/gunit.h"
#include "rtc_base/null_socket_server.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/thread.h"
#include "test/gtest.h"
#include "test/run_loop.h"
namespace webrtc {
@ -71,12 +73,18 @@ class FakeDataChannelObserver : public DataChannelObserver {
size_t on_buffered_amount_change_count_;
};
// TODO(bugs.webrtc.org/11547): Incorporate a dedicated network thread.
class SctpDataChannelTest : public ::testing::Test {
protected:
SctpDataChannelTest()
: controller_(new FakeDataChannelController()),
webrtc_data_channel_(controller_->CreateDataChannel("test", init_)) {}
: network_thread_(std::make_unique<rtc::NullSocketServer>()),
controller_(new FakeDataChannelController(&network_thread_)),
webrtc_data_channel_(controller_->CreateDataChannel("test", init_)) {
network_thread_.Start();
}
~SctpDataChannelTest() override {
run_loop_.Flush();
network_thread_.Stop();
}
void SetChannelReady() {
controller_->set_transport_available(true);
@ -92,7 +100,8 @@ class SctpDataChannelTest : public ::testing::Test {
webrtc_data_channel_->RegisterObserver(observer_.get());
}
rtc::AutoThread main_thread_;
test::RunLoop run_loop_;
rtc::Thread network_thread_;
InternalDataChannelInit init_;
std::unique_ptr<FakeDataChannelController> controller_;
std::unique_ptr<FakeDataChannelObserver> observer_;
@ -154,6 +163,10 @@ TEST_F(SctpDataChannelTest, StateTransition) {
// `Close()` should trigger two state changes, first `kClosing`, then
// `kClose`.
webrtc_data_channel_->Close();
// The (simulated) transport close notifications runs on the network thread
// and posts a completion notification to the signaling (current) thread.
// Allow that ooperation to complete before checking the state.
run_loop_.Flush();
EXPECT_EQ(DataChannelInterface::kClosed, webrtc_data_channel_->state());
EXPECT_EQ(observer_->on_state_change_count(), 3u);
EXPECT_TRUE(webrtc_data_channel_->error().ok());

View File

@ -670,7 +670,8 @@ class RTCStatsCollectorTest : public ::testing::Test {
RTCStatsCollectorTest()
: pc_(rtc::make_ref_counted<FakePeerConnectionForStats>()),
stats_(new RTCStatsCollectorWrapper(pc_)),
data_channel_controller_(new FakeDataChannelController()) {}
data_channel_controller_(
new FakeDataChannelController(pc_->network_thread())) {}
void ExpectReportContainsCertificateInfo(
const rtc::scoped_refptr<const RTCStatsReport>& report,
@ -2122,8 +2123,7 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
EXPECT_EQ(expected, report->Get("P")->cast_to<RTCPeerConnectionStats>());
}
// TODO(bugs.webrtc.org/11547): Supply a separate network thread.
FakeDataChannelController controller;
FakeDataChannelController controller(pc_->network_thread());
rtc::scoped_refptr<SctpDataChannel> dummy_channel_a = SctpDataChannel::Create(
controller.weak_ptr(), "DummyChannelA", false, InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());

View File

@ -36,7 +36,7 @@ int GenerateUniqueId() {
}
// Define proxy for DataChannelInterface.
BEGIN_PRIMARY_PROXY_MAP(DataChannel)
BEGIN_PROXY_MAP(DataChannel)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
PROXY_METHOD0(void, UnregisterObserver)
@ -164,9 +164,11 @@ rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
// static
rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
rtc::scoped_refptr<SctpDataChannel> channel) {
// TODO(bugs.webrtc.org/11547): incorporate the network thread in the proxy.
// Copy thread params to local variables before `std::move()`.
auto* signaling_thread = channel->signaling_thread_;
return DataChannelProxy::Create(signaling_thread, std::move(channel));
auto* network_thread = channel->network_thread_;
return DataChannelProxy::Create(signaling_thread, network_thread,
std::move(channel));
}
SctpDataChannel::SctpDataChannel(
@ -209,8 +211,9 @@ SctpDataChannel::SctpDataChannel(
// Try to connect to the transport in case the transport channel already
// exists.
if (id_.HasValue()) {
controller_->AddSctpDataStream(id_);
if (id_.HasValue() && connected_to_transport_) {
network_thread_->BlockingCall(
[c = controller_.get(), sid = id_] { c->AddSctpDataStream(sid); });
}
}
@ -349,7 +352,10 @@ void SctpDataChannel::SetSctpSid(const StreamId& sid) {
RTC_DCHECK_EQ(state_, kConnecting);
id_ = sid;
controller_->AddSctpDataStream(sid);
if (connected_to_transport_) {
network_thread_->BlockingCall(
[c = controller_.get(), sid] { c->AddSctpDataStream(sid); });
}
}
void SctpDataChannel::OnClosingProcedureStartedRemotely() {
@ -385,8 +391,9 @@ void SctpDataChannel::OnTransportChannelCreated() {
// The sid may have been unassigned when controller_->ConnectDataChannel was
// done. So always add the streams even if connected_to_transport_ is true.
if (id_.HasValue()) {
controller_->AddSctpDataStream(id_);
if (id_.HasValue() && connected_to_transport_) {
network_thread_->BlockingCall(
[c = controller_.get(), sid = id_] { c->AddSctpDataStream(sid); });
}
}
@ -563,7 +570,9 @@ void SctpDataChannel::UpdateState() {
// afterwards.
if (!started_closing_procedure_ && controller_ && id_.HasValue()) {
started_closing_procedure_ = true;
controller_->RemoveSctpDataStream(id_);
network_thread_->BlockingCall([c = controller_.get(), sid = id_] {
c->RemoveSctpDataStream(sid);
});
}
}
} else {

View File

@ -38,6 +38,10 @@ namespace webrtc {
class SctpDataChannel;
// Interface that acts as a bridge from the data channel to the transport.
// TODO(bugs.webrtc.org/11547): The transport operates on the network thread
// and ultimately all the methods in this interface need to be invoked on the
// network thread. Currently, some are called on the signaling thread.
class SctpDataChannelControllerInterface {
public:
// Sends the data to the transport.
@ -45,9 +49,11 @@ class SctpDataChannelControllerInterface {
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) = 0;
// Adds the data channel SID to the transport for SCTP.
// Note: Must be called on the network thread.
virtual void AddSctpDataStream(StreamId sid) = 0;
// Begins the closing procedure by sending an outgoing stream reset. Still
// need to wait for callbacks to tell when this completes.
// Note: Must be called on the network thread.
virtual void RemoveSctpDataStream(StreamId sid) = 0;
// Returns true if the transport channel is ready to send data.
virtual bool ReadyToSendData() const = 0;

View File

@ -21,8 +21,10 @@
class FakeDataChannelController
: public webrtc::SctpDataChannelControllerInterface {
public:
FakeDataChannelController()
: send_blocked_(false),
explicit FakeDataChannelController(rtc::Thread* network_thread)
: signaling_thread_(rtc::Thread::Current()),
network_thread_(network_thread),
send_blocked_(false),
transport_available_(false),
ready_to_send_(false),
transport_error_(false) {}
@ -34,15 +36,13 @@ class FakeDataChannelController
rtc::scoped_refptr<webrtc::SctpDataChannel> CreateDataChannel(
absl::string_view label,
webrtc::InternalDataChannelInit init,
rtc::Thread* network_thread = rtc::Thread::Current()) {
rtc::Thread* signaling_thread = rtc::Thread::Current();
webrtc::InternalDataChannelInit init) {
rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
webrtc::SctpDataChannel::Create(weak_ptr(), std::string(label),
transport_available_, init,
signaling_thread, network_thread);
signaling_thread_, network_thread_);
if (ReadyToSendData()) {
signaling_thread->PostTask(
signaling_thread_->PostTask(
SafeTask(signaling_safety_.flag(), [channel = channel] {
if (channel->state() !=
webrtc::DataChannelInterface::DataState::kClosed) {
@ -73,6 +73,7 @@ class FakeDataChannelController
}
void AddSctpDataStream(webrtc::StreamId sid) override {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
if (!transport_available_) {
return;
@ -81,16 +82,19 @@ class FakeDataChannelController
}
void RemoveSctpDataStream(webrtc::StreamId sid) override {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
known_stream_ids_.erase(sid);
// Unlike the real SCTP transport, act like the closing procedure finished
// instantly, doing the same snapshot thing as below.
auto it = absl::c_find_if(connected_channels_,
[&](const auto* c) { return c->sid() == sid; });
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
if (it != connected_channels_.end())
(*it)->OnClosingProcedureComplete();
signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
// Unlike the real SCTP transport, act like the closing procedure finished
// instantly.
auto it = absl::c_find_if(connected_channels_,
[&](const auto* c) { return c->sid() == sid; });
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
if (it != connected_channels_.end())
(*it)->OnClosingProcedureComplete();
}));
}
bool ReadyToSendData() const override { return ready_to_send_; }
@ -159,6 +163,8 @@ class FakeDataChannelController
int channels_closed() const { return channels_closed_; }
private:
rtc::Thread* const signaling_thread_;
rtc::Thread* const network_thread_;
int last_sid_;
webrtc::SendDataParams last_send_data_params_;
bool send_blocked_;

View File

@ -205,7 +205,8 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase {
dependencies_(MakeDependencies()),
context_(ConnectionContext::Create(&dependencies_)),
local_streams_(StreamCollection::Create()),
remote_streams_(StreamCollection::Create()) {}
remote_streams_(StreamCollection::Create()),
data_channel_controller_(network_thread_) {}
~FakePeerConnectionForStats() {
for (auto transceiver : transceivers_) {