Fix unsynchronized access to mid_to_transport_ in JsepTransportController

* Added several thread checks to JTC to help with programmer errors.
* Avoid a few Invokes() to the network thread here and there such
  as for fetching sctp transport name for getStats(). The transport
  name is now cached when it changes on the network thread.
* JsepTransportController instances now get deleted on the network
  thread rather than on the signaling thread + issuing an Invoke()
  in the dtor.
* Moved some thread hops from JTC over to PC which is where the problem
  exists and also (imho) makes it easier to see where hops happen in
  the PC code.
* The sctp transport is now started asynchronously when we push down the
  media description.
* PeerConnection proxy calls GetSctpTransport directly on the network
  thread instead of to the signaling thread + blocking on the network
  thread.
* The above changes simplified things for webrtc::SctpTransport which
  allowed for removing locking from that class and delete some code.

Bug: webrtc:9987, webrtc:12445
Change-Id: Ic89a9426e314e1b93c81751d4f732f05fa448fbc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205620
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33191}
This commit is contained in:
Tomas Gunnarsson 2021-02-08 14:20:08 +01:00 committed by Commit Bot
parent 54ea85c2cd
commit 6cd4058504
11 changed files with 238 additions and 142 deletions

View File

@ -20,9 +20,12 @@
namespace webrtc { namespace webrtc {
// PeerConnection proxy objects will be constructed with two thread pointers,
// signaling and network. The proxy macros don't have 'network' specific macros
// and support for a secondary thread is provided via 'WORKER' macros.
// TODO(deadbeef): Move this to .cc file and out of api/. What threads methods // TODO(deadbeef): Move this to .cc file and out of api/. What threads methods
// are called on is an implementation detail. // are called on is an implementation detail.
BEGIN_SIGNALING_PROXY_MAP(PeerConnection) BEGIN_PROXY_MAP(PeerConnection)
PROXY_SIGNALING_THREAD_DESTRUCTOR() PROXY_SIGNALING_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams) PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams) PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
@ -133,7 +136,10 @@ PROXY_METHOD1(void, SetAudioRecording, bool)
PROXY_METHOD1(rtc::scoped_refptr<DtlsTransportInterface>, PROXY_METHOD1(rtc::scoped_refptr<DtlsTransportInterface>,
LookupDtlsTransportByMid, LookupDtlsTransportByMid,
const std::string&) const std::string&)
PROXY_CONSTMETHOD0(rtc::scoped_refptr<SctpTransportInterface>, GetSctpTransport) // This method will be invoked on the network thread. See
// PeerConnectionFactory::CreatePeerConnectionOrError for more details.
PROXY_WORKER_CONSTMETHOD0(rtc::scoped_refptr<SctpTransportInterface>,
GetSctpTransport)
PROXY_METHOD0(SignalingState, signaling_state) PROXY_METHOD0(SignalingState, signaling_state)
PROXY_METHOD0(IceConnectionState, ice_connection_state) PROXY_METHOD0(IceConnectionState, ice_connection_state)
PROXY_METHOD0(IceConnectionState, standardized_ice_connection_state) PROXY_METHOD0(IceConnectionState, standardized_ice_connection_state)

View File

@ -105,10 +105,8 @@ JsepTransportController::JsepTransportController(
JsepTransportController::~JsepTransportController() { JsepTransportController::~JsepTransportController() {
// Channel destructors may try to send packets, so this needs to happen on // Channel destructors may try to send packets, so this needs to happen on
// the network thread. // the network thread.
network_thread_->Invoke<void>(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK_RUN_ON(network_thread_); DestroyAllJsepTransports_n();
DestroyAllJsepTransports_n();
});
} }
RTCError JsepTransportController::SetLocalDescription( RTCError JsepTransportController::SetLocalDescription(
@ -145,6 +143,7 @@ RTCError JsepTransportController::SetRemoteDescription(
RtpTransportInternal* JsepTransportController::GetRtpTransport( RtpTransportInternal* JsepTransportController::GetRtpTransport(
const std::string& mid) const { const std::string& mid) const {
RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid); auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) { if (!jsep_transport) {
return nullptr; return nullptr;
@ -154,6 +153,7 @@ RtpTransportInternal* JsepTransportController::GetRtpTransport(
DataChannelTransportInterface* JsepTransportController::GetDataChannelTransport( DataChannelTransportInterface* JsepTransportController::GetDataChannelTransport(
const std::string& mid) const { const std::string& mid) const {
RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid); auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) { if (!jsep_transport) {
return nullptr; return nullptr;
@ -163,6 +163,7 @@ DataChannelTransportInterface* JsepTransportController::GetDataChannelTransport(
cricket::DtlsTransportInternal* JsepTransportController::GetDtlsTransport( cricket::DtlsTransportInternal* JsepTransportController::GetDtlsTransport(
const std::string& mid) { const std::string& mid) {
RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid); auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) { if (!jsep_transport) {
return nullptr; return nullptr;
@ -172,6 +173,7 @@ cricket::DtlsTransportInternal* JsepTransportController::GetDtlsTransport(
const cricket::DtlsTransportInternal* const cricket::DtlsTransportInternal*
JsepTransportController::GetRtcpDtlsTransport(const std::string& mid) const { JsepTransportController::GetRtcpDtlsTransport(const std::string& mid) const {
RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid); auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) { if (!jsep_transport) {
return nullptr; return nullptr;
@ -181,6 +183,7 @@ JsepTransportController::GetRtcpDtlsTransport(const std::string& mid) const {
rtc::scoped_refptr<webrtc::DtlsTransport> rtc::scoped_refptr<webrtc::DtlsTransport>
JsepTransportController::LookupDtlsTransportByMid(const std::string& mid) { JsepTransportController::LookupDtlsTransportByMid(const std::string& mid) {
RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid); auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) { if (!jsep_transport) {
return nullptr; return nullptr;
@ -190,6 +193,7 @@ JsepTransportController::LookupDtlsTransportByMid(const std::string& mid) {
rtc::scoped_refptr<SctpTransport> JsepTransportController::GetSctpTransport( rtc::scoped_refptr<SctpTransport> JsepTransportController::GetSctpTransport(
const std::string& mid) const { const std::string& mid) const {
RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid); auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) { if (!jsep_transport) {
return nullptr; return nullptr;
@ -236,11 +240,16 @@ bool JsepTransportController::NeedsIceRestart(
absl::optional<rtc::SSLRole> JsepTransportController::GetDtlsRole( absl::optional<rtc::SSLRole> JsepTransportController::GetDtlsRole(
const std::string& mid) const { const std::string& mid) const {
// TODO(tommi): Remove this hop. Currently it's called from the signaling
// thread during negotiations, potentially multiple times.
// WebRtcSessionDescriptionFactory::InternalCreateAnswer is one example.
if (!network_thread_->IsCurrent()) { if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<absl::optional<rtc::SSLRole>>( return network_thread_->Invoke<absl::optional<rtc::SSLRole>>(
RTC_FROM_HERE, [&] { return GetDtlsRole(mid); }); RTC_FROM_HERE, [&] { return GetDtlsRole(mid); });
} }
RTC_DCHECK_RUN_ON(network_thread_);
const cricket::JsepTransport* t = GetJsepTransportForMid(mid); const cricket::JsepTransport* t = GetJsepTransportForMid(mid);
if (!t) { if (!t) {
return absl::optional<rtc::SSLRole>(); return absl::optional<rtc::SSLRole>();
@ -846,24 +855,34 @@ bool JsepTransportController::HandleBundledContent(
bool JsepTransportController::SetTransportForMid( bool JsepTransportController::SetTransportForMid(
const std::string& mid, const std::string& mid,
cricket::JsepTransport* jsep_transport) { cricket::JsepTransport* jsep_transport) {
RTC_DCHECK(jsep_transport);
if (mid_to_transport_[mid] == jsep_transport) {
return true;
}
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(jsep_transport);
auto it = mid_to_transport_.find(mid);
if (it != mid_to_transport_.end() && it->second == jsep_transport)
return true;
pending_mids_.push_back(mid); pending_mids_.push_back(mid);
mid_to_transport_[mid] = jsep_transport;
if (it == mid_to_transport_.end()) {
mid_to_transport_.insert(std::make_pair(mid, jsep_transport));
} else {
it->second = jsep_transport;
}
return config_.transport_observer->OnTransportChanged( return config_.transport_observer->OnTransportChanged(
mid, jsep_transport->rtp_transport(), jsep_transport->RtpDtlsTransport(), mid, jsep_transport->rtp_transport(), jsep_transport->RtpDtlsTransport(),
jsep_transport->data_channel_transport()); jsep_transport->data_channel_transport());
} }
void JsepTransportController::RemoveTransportForMid(const std::string& mid) { void JsepTransportController::RemoveTransportForMid(const std::string& mid) {
RTC_DCHECK_RUN_ON(network_thread_);
bool ret = config_.transport_observer->OnTransportChanged(mid, nullptr, bool ret = config_.transport_observer->OnTransportChanged(mid, nullptr,
nullptr, nullptr); nullptr, nullptr);
// Calling OnTransportChanged with nullptr should always succeed, since it is // Calling OnTransportChanged with nullptr should always succeed, since it is
// only expected to fail when adding media to a transport (not removing). // only expected to fail when adding media to a transport (not removing).
RTC_DCHECK(ret); RTC_DCHECK(ret);
mid_to_transport_.erase(mid); mid_to_transport_.erase(mid);
} }

View File

@ -363,8 +363,9 @@ class JsepTransportController : public sigslot::has_slots<> {
// transports are bundled on (In current implementation, it is the first // transports are bundled on (In current implementation, it is the first
// content in the BUNDLE group). // content in the BUNDLE group).
const cricket::JsepTransport* GetJsepTransportForMid( const cricket::JsepTransport* GetJsepTransportForMid(
const std::string& mid) const; const std::string& mid) const RTC_RUN_ON(network_thread_);
cricket::JsepTransport* GetJsepTransportForMid(const std::string& mid); cricket::JsepTransport* GetJsepTransportForMid(const std::string& mid)
RTC_RUN_ON(network_thread_);
// Get the JsepTransport without considering the BUNDLE group. Return nullptr // Get the JsepTransport without considering the BUNDLE group. Return nullptr
// if the JsepTransport is destroyed. // if the JsepTransport is destroyed.
@ -460,7 +461,8 @@ class JsepTransportController : public sigslot::has_slots<> {
jsep_transports_by_name_ RTC_GUARDED_BY(network_thread_); jsep_transports_by_name_ RTC_GUARDED_BY(network_thread_);
// This keeps track of the mapping between media section // This keeps track of the mapping between media section
// (BaseChannel/SctpTransport) and the JsepTransport underneath. // (BaseChannel/SctpTransport) and the JsepTransport underneath.
std::map<std::string, cricket::JsepTransport*> mid_to_transport_; std::map<std::string, cricket::JsepTransport*> mid_to_transport_
RTC_GUARDED_BY(network_thread_);
// Keep track of mids that have been mapped to transports. Used for rollback. // Keep track of mids that have been mapped to transports. Used for rollback.
std::vector<std::string> pending_mids_ RTC_GUARDED_BY(network_thread_); std::vector<std::string> pending_mids_ RTC_GUARDED_BY(network_thread_);
// Aggregate states for Transports. // Aggregate states for Transports.

View File

@ -904,6 +904,9 @@ TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) {
EXPECT_EQ(2, candidates_signal_count_); EXPECT_EQ(2, candidates_signal_count_);
EXPECT_TRUE(!signaled_on_non_signaling_thread_); EXPECT_TRUE(!signaled_on_non_signaling_thread_);
network_thread_->Invoke<void>(RTC_FROM_HERE,
[&] { transport_controller_.reset(); });
} }
// Test that if the TransportController was created with the // Test that if the TransportController was created with the

View File

@ -489,12 +489,17 @@ PeerConnection::~PeerConnection() {
sdp_handler_->ResetSessionDescFactory(); sdp_handler_->ResetSessionDescFactory();
} }
transport_controller_.reset();
// port_allocator_ lives on the network thread and should be destroyed there. // port_allocator_ and transport_controller_ live on the network thread and
// should be destroyed there.
network_thread()->Invoke<void>(RTC_FROM_HERE, [this] { network_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
transport_controller_.reset();
port_allocator_.reset(); port_allocator_.reset();
if (network_thread_safety_) {
network_thread_safety_->SetNotAlive();
network_thread_safety_ = nullptr;
}
}); });
// call_ and event_log_ must be destroyed on the worker thread. // call_ and event_log_ must be destroyed on the worker thread.
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] { worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
@ -527,13 +532,15 @@ RTCError PeerConnection::Initialize(
} }
// The port allocator lives on the network thread and should be initialized // The port allocator lives on the network thread and should be initialized
// there. // there. Also set up the task safety flag for canceling pending tasks on
// the network thread when closing.
// TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and // TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and
// initialize all the |transport_controller_->Subscribe*| calls below on the // initialize all the |transport_controller_->Subscribe*| calls below on the
// network thread via this invoke. // network thread via this invoke.
const auto pa_result = const auto pa_result =
network_thread()->Invoke<InitializePortAllocatorResult>( network_thread()->Invoke<InitializePortAllocatorResult>(
RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] { RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] {
network_thread_safety_ = PendingTaskSafetyFlag::Create();
return InitializePortAllocator_n(stun_servers, turn_servers, return InitializePortAllocator_n(stun_servers, turn_servers,
configuration); configuration);
}); });
@ -832,6 +839,16 @@ PeerConnection::AddTransceiver(
return AddTransceiver(track, RtpTransceiverInit()); return AddTransceiver(track, RtpTransceiverInit());
} }
RtpTransportInternal* PeerConnection::GetRtpTransport(const std::string& mid) {
RTC_DCHECK_RUN_ON(signaling_thread());
return network_thread()->Invoke<RtpTransportInternal*>(
RTC_FROM_HERE, [this, &mid] {
auto rtp_transport = transport_controller_->GetRtpTransport(mid);
RTC_DCHECK(rtp_transport);
return rtp_transport;
});
}
RTCErrorOr<rtc::scoped_refptr<RtpTransceiverInterface>> RTCErrorOr<rtc::scoped_refptr<RtpTransceiverInterface>>
PeerConnection::AddTransceiver( PeerConnection::AddTransceiver(
rtc::scoped_refptr<MediaStreamTrackInterface> track, rtc::scoped_refptr<MediaStreamTrackInterface> track,
@ -1588,11 +1605,11 @@ PeerConnection::LookupDtlsTransportByMidInternal(const std::string& mid) {
rtc::scoped_refptr<SctpTransportInterface> PeerConnection::GetSctpTransport() rtc::scoped_refptr<SctpTransportInterface> PeerConnection::GetSctpTransport()
const { const {
RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK_RUN_ON(network_thread());
if (!sctp_mid_s_) { if (!sctp_mid_n_)
return nullptr; return nullptr;
}
return transport_controller_->GetSctpTransport(*sctp_mid_s_); return transport_controller_->GetSctpTransport(*sctp_mid_n_);
} }
const SessionDescriptionInterface* PeerConnection::local_description() const { const SessionDescriptionInterface* PeerConnection::local_description() const {
@ -1673,11 +1690,16 @@ void PeerConnection::Close() {
// WebRTC session description factory, the session description factory would // WebRTC session description factory, the session description factory would
// call the transport controller. // call the transport controller.
sdp_handler_->ResetSessionDescFactory(); sdp_handler_->ResetSessionDescFactory();
transport_controller_.reset();
rtp_manager_->Close(); rtp_manager_->Close();
network_thread()->Invoke<void>( network_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_FROM_HERE, [this] { port_allocator_->DiscardCandidatePool(); }); transport_controller_.reset();
port_allocator_->DiscardCandidatePool();
if (network_thread_safety_) {
network_thread_safety_->SetNotAlive();
network_thread_safety_ = nullptr;
}
});
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] { worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread()); RTC_DCHECK_RUN_ON(worker_thread());
@ -1810,6 +1832,17 @@ absl::optional<std::string> PeerConnection::GetDataMid() const {
} }
} }
void PeerConnection::SetSctpDataMid(const std::string& mid) {
RTC_DCHECK_RUN_ON(signaling_thread());
sctp_mid_s_ = mid;
}
void PeerConnection::ResetSctpDataMid() {
RTC_DCHECK_RUN_ON(signaling_thread());
sctp_mid_s_.reset();
sctp_transport_name_s_.clear();
}
void PeerConnection::OnSctpDataChannelClosed(DataChannelInterface* channel) { void PeerConnection::OnSctpDataChannelClosed(DataChannelInterface* channel) {
// Since data_channel_controller doesn't do signals, this // Since data_channel_controller doesn't do signals, this
// signal is relayed here. // signal is relayed here.
@ -2023,13 +2056,8 @@ std::vector<DataChannelStats> PeerConnection::GetDataChannelStats() const {
absl::optional<std::string> PeerConnection::sctp_transport_name() const { absl::optional<std::string> PeerConnection::sctp_transport_name() const {
RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK_RUN_ON(signaling_thread());
if (sctp_mid_s_ && transport_controller_) { if (sctp_mid_s_ && transport_controller_)
auto dtls_transport = transport_controller_->GetDtlsTransport(*sctp_mid_s_); return sctp_transport_name_s_;
if (dtls_transport) {
return dtls_transport->transport_name();
}
return absl::optional<std::string>();
}
return absl::optional<std::string>(); return absl::optional<std::string>();
} }
@ -2268,6 +2296,15 @@ bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) {
data_channel_controller_.set_data_channel_transport(transport); data_channel_controller_.set_data_channel_transport(transport);
data_channel_controller_.SetupDataChannelTransport_n(); data_channel_controller_.SetupDataChannelTransport_n();
sctp_mid_n_ = mid; sctp_mid_n_ = mid;
auto dtls_transport = transport_controller_->GetDtlsTransport(mid);
if (dtls_transport) {
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(),
[this, name = dtls_transport->transport_name()] {
RTC_DCHECK_RUN_ON(signaling_thread());
sctp_transport_name_s_ = std::move(name);
}));
}
// Note: setting the data sink and checking initial state must be done last, // Note: setting the data sink and checking initial state must be done last,
// after setting up the data channel. Setting the data sink may trigger // after setting up the data channel. Setting the data sink may trigger
@ -2612,9 +2649,19 @@ bool PeerConnection::OnTransportChanged(
if (base_channel) { if (base_channel) {
ret = base_channel->SetRtpTransport(rtp_transport); ret = base_channel->SetRtpTransport(rtp_transport);
} }
if (mid == sctp_mid_n_) { if (mid == sctp_mid_n_) {
data_channel_controller_.OnTransportChanged(data_channel_transport); data_channel_controller_.OnTransportChanged(data_channel_transport);
if (dtls_transport) {
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread_safety_.flag(),
[this, name = dtls_transport->internal()->transport_name()] {
RTC_DCHECK_RUN_ON(signaling_thread());
sctp_transport_name_s_ = std::move(name);
}));
}
} }
return ret; return ret;
} }
@ -2624,6 +2671,23 @@ PeerConnectionObserver* PeerConnection::Observer() const {
return observer_; return observer_;
} }
void PeerConnection::StartSctpTransport(int local_port,
int remote_port,
int max_message_size) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (!sctp_mid_s_)
return;
network_thread()->PostTask(ToQueuedTask(
network_thread_safety_,
[this, mid = *sctp_mid_s_, local_port, remote_port, max_message_size] {
rtc::scoped_refptr<SctpTransport> sctp_transport =
transport_controller()->GetSctpTransport(mid);
if (sctp_transport)
sctp_transport->Start(local_port, remote_port, max_message_size);
}));
}
CryptoOptions PeerConnection::GetCryptoOptions() { CryptoOptions PeerConnection::GetCryptoOptions() {
RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/9891) - Remove PeerConnectionFactory::CryptoOptions // TODO(bugs.webrtc.org/9891) - Remove PeerConnectionFactory::CryptoOptions

View File

@ -404,14 +404,15 @@ class PeerConnection : public PeerConnectionInternal,
// channels are configured this will return nullopt. // channels are configured this will return nullopt.
absl::optional<std::string> GetDataMid() const; absl::optional<std::string> GetDataMid() const;
void SetSctpDataMid(const std::string& mid) { void SetSctpDataMid(const std::string& mid);
RTC_DCHECK_RUN_ON(signaling_thread());
sctp_mid_s_ = mid; void ResetSctpDataMid();
}
void ResetSctpDataMid() { // Asynchronously calls SctpTransport::Start() on the network thread for
RTC_DCHECK_RUN_ON(signaling_thread()); // |sctp_mid()| if set. Called as part of setting the local description.
sctp_mid_s_.reset(); void StartSctpTransport(int local_port,
} int remote_port,
int max_message_size);
// Returns the CryptoOptions for this PeerConnection. This will always // Returns the CryptoOptions for this PeerConnection. This will always
// return the RTCConfiguration.crypto_options if set and will only default // return the RTCConfiguration.crypto_options if set and will only default
@ -427,12 +428,7 @@ class PeerConnection : public PeerConnectionInternal,
bool fire_callback = true); bool fire_callback = true);
// Returns rtp transport, result can not be nullptr. // Returns rtp transport, result can not be nullptr.
RtpTransportInternal* GetRtpTransport(const std::string& mid) { RtpTransportInternal* GetRtpTransport(const std::string& mid);
RTC_DCHECK_RUN_ON(signaling_thread());
auto rtp_transport = transport_controller_->GetRtpTransport(mid);
RTC_DCHECK(rtp_transport);
return rtp_transport;
}
// Returns true if SRTP (either using DTLS-SRTP or SDES) is required by // Returns true if SRTP (either using DTLS-SRTP or SDES) is required by
// this session. // this session.
@ -648,6 +644,8 @@ class PeerConnection : public PeerConnectionInternal,
// The unique_ptr belongs to the worker thread, but the Call object manages // The unique_ptr belongs to the worker thread, but the Call object manages
// its own thread safety. // its own thread safety.
std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread()); std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread());
ScopedTaskSafety signaling_thread_safety_;
rtc::scoped_refptr<PendingTaskSafetyFlag> network_thread_safety_;
std::unique_ptr<ScopedTaskSafety> call_safety_ std::unique_ptr<ScopedTaskSafety> call_safety_
RTC_GUARDED_BY(worker_thread()); RTC_GUARDED_BY(worker_thread());
@ -677,6 +675,7 @@ class PeerConnection : public PeerConnectionInternal,
// thread, but applied first on the networking thread via an invoke(). // thread, but applied first on the networking thread via an invoke().
absl::optional<std::string> sctp_mid_s_ RTC_GUARDED_BY(signaling_thread()); absl::optional<std::string> sctp_mid_s_ RTC_GUARDED_BY(signaling_thread());
absl::optional<std::string> sctp_mid_n_ RTC_GUARDED_BY(network_thread()); absl::optional<std::string> sctp_mid_n_ RTC_GUARDED_BY(network_thread());
std::string sctp_transport_name_s_ RTC_GUARDED_BY(signaling_thread());
// The machinery for handling offers and answers. Const after initialization. // The machinery for handling offers and answers. Const after initialization.
std::unique_ptr<SdpOfferAnswerHandler> sdp_handler_ std::unique_ptr<SdpOfferAnswerHandler> sdp_handler_

View File

@ -265,8 +265,15 @@ PeerConnectionFactory::CreatePeerConnectionOrError(
if (!result.ok()) { if (!result.ok()) {
return result.MoveError(); return result.MoveError();
} }
// We configure the proxy with a pointer to the network thread for methods
// that need to be invoked there rather than on the signaling thread.
// Internally, the proxy object has a member variable named |worker_thread_|
// which will point to the network thread (and not the factory's
// worker_thread()). All such methods have thread checks though, so the code
// should still be clear (outside of macro expansion).
rtc::scoped_refptr<PeerConnectionInterface> result_proxy = rtc::scoped_refptr<PeerConnectionInterface> result_proxy =
PeerConnectionProxy::Create(signaling_thread(), result.MoveValue()); PeerConnectionProxy::Create(signaling_thread(), network_thread(),
result.MoveValue());
return result_proxy; return result_proxy;
} }

View File

@ -5944,9 +5944,11 @@ TEST_F(PeerConnectionIntegrationTestUnifiedPlan,
callee()->AddAudioVideoTracks(); callee()->AddAudioVideoTracks();
caller()->CreateAndSetAndSignalOffer(); caller()->CreateAndSetAndSignalOffer();
ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout);
ASSERT_EQ_WAIT(SctpTransportState::kConnected, network_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
caller()->pc()->GetSctpTransport()->Information().state(), ASSERT_EQ_WAIT(SctpTransportState::kConnected,
kDefaultTimeout); caller()->pc()->GetSctpTransport()->Information().state(),
kDefaultTimeout);
});
ASSERT_TRUE_WAIT(callee()->data_channel(), kDefaultTimeout); ASSERT_TRUE_WAIT(callee()->data_channel(), kDefaultTimeout);
ASSERT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout); ASSERT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout);
} }

View File

@ -45,7 +45,7 @@ SctpTransport::~SctpTransport() {
} }
SctpTransportInformation SctpTransport::Information() const { SctpTransportInformation SctpTransport::Information() const {
MutexLock lock(&lock_); RTC_DCHECK_RUN_ON(owner_thread_);
return info_; return info_;
} }
@ -71,103 +71,78 @@ rtc::scoped_refptr<DtlsTransportInterface> SctpTransport::dtls_transport()
void SctpTransport::Clear() { void SctpTransport::Clear() {
RTC_DCHECK_RUN_ON(owner_thread_); RTC_DCHECK_RUN_ON(owner_thread_);
RTC_DCHECK(internal()); RTC_DCHECK(internal());
{ // Note that we delete internal_sctp_transport_, but
MutexLock lock(&lock_); // only drop the reference to dtls_transport_.
// Note that we delete internal_sctp_transport_, but dtls_transport_ = nullptr;
// only drop the reference to dtls_transport_. internal_sctp_transport_ = nullptr;
dtls_transport_ = nullptr;
internal_sctp_transport_ = nullptr;
}
UpdateInformation(SctpTransportState::kClosed); UpdateInformation(SctpTransportState::kClosed);
} }
void SctpTransport::SetDtlsTransport( void SctpTransport::SetDtlsTransport(
rtc::scoped_refptr<DtlsTransport> transport) { rtc::scoped_refptr<DtlsTransport> transport) {
RTC_DCHECK_RUN_ON(owner_thread_); RTC_DCHECK_RUN_ON(owner_thread_);
SctpTransportState next_state; SctpTransportState next_state = info_.state();
{ dtls_transport_ = transport;
MutexLock lock(&lock_); if (internal_sctp_transport_) {
next_state = info_.state(); if (transport) {
dtls_transport_ = transport; internal_sctp_transport_->SetDtlsTransport(transport->internal());
if (internal_sctp_transport_) { transport->internal()->SignalDtlsState.connect(
if (transport) { this, &SctpTransport::OnDtlsStateChange);
internal_sctp_transport_->SetDtlsTransport(transport->internal()); if (info_.state() == SctpTransportState::kNew) {
transport->internal()->SignalDtlsState.connect( next_state = SctpTransportState::kConnecting;
this, &SctpTransport::OnDtlsStateChange);
if (info_.state() == SctpTransportState::kNew) {
next_state = SctpTransportState::kConnecting;
}
} else {
internal_sctp_transport_->SetDtlsTransport(nullptr);
} }
} else {
internal_sctp_transport_->SetDtlsTransport(nullptr);
} }
} }
UpdateInformation(next_state); UpdateInformation(next_state);
} }
void SctpTransport::Start(int local_port, void SctpTransport::Start(int local_port,
int remote_port, int remote_port,
int max_message_size) { int max_message_size) {
{ RTC_DCHECK_RUN_ON(owner_thread_);
MutexLock lock(&lock_); info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
// Record max message size on calling thread. max_message_size, info_.MaxChannels());
info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
max_message_size, info_.MaxChannels()); if (!internal()->Start(local_port, remote_port, max_message_size)) {
} RTC_LOG(LS_ERROR) << "Failed to push down SCTP parameters, closing.";
if (owner_thread_->IsCurrent()) { UpdateInformation(SctpTransportState::kClosed);
if (!internal()->Start(local_port, remote_port, max_message_size)) {
RTC_LOG(LS_ERROR) << "Failed to push down SCTP parameters, closing.";
UpdateInformation(SctpTransportState::kClosed);
}
} else {
owner_thread_->Invoke<void>(
RTC_FROM_HERE, [this, local_port, remote_port, max_message_size] {
Start(local_port, remote_port, max_message_size);
});
} }
} }
void SctpTransport::UpdateInformation(SctpTransportState state) { void SctpTransport::UpdateInformation(SctpTransportState state) {
RTC_DCHECK_RUN_ON(owner_thread_); RTC_DCHECK_RUN_ON(owner_thread_);
bool must_send_update; bool must_send_update = (state != info_.state());
SctpTransportInformation info_copy(SctpTransportState::kNew); // TODO(https://bugs.webrtc.org/10358): Update max channels from internal
{ // SCTP transport when available.
MutexLock lock(&lock_); if (internal_sctp_transport_) {
must_send_update = (state != info_.state()); info_ = SctpTransportInformation(
// TODO(https://bugs.webrtc.org/10358): Update max channels from internal state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
// SCTP transport when available. } else {
if (internal_sctp_transport_) { info_ = SctpTransportInformation(
info_ = SctpTransportInformation( state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
} else {
info_ = SctpTransportInformation(
state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
}
if (observer_ && must_send_update) {
info_copy = info_;
}
} }
// We call the observer without holding the lock.
if (observer_ && must_send_update) { if (observer_ && must_send_update) {
observer_->OnStateChange(info_copy); observer_->OnStateChange(info_);
} }
} }
void SctpTransport::OnAssociationChangeCommunicationUp() { void SctpTransport::OnAssociationChangeCommunicationUp() {
RTC_DCHECK_RUN_ON(owner_thread_); RTC_DCHECK_RUN_ON(owner_thread_);
{ RTC_DCHECK(internal_sctp_transport_);
MutexLock lock(&lock_); if (internal_sctp_transport_->max_outbound_streams() &&
RTC_DCHECK(internal_sctp_transport_); internal_sctp_transport_->max_inbound_streams()) {
if (internal_sctp_transport_->max_outbound_streams() && int max_channels =
internal_sctp_transport_->max_inbound_streams()) { std::min(*(internal_sctp_transport_->max_outbound_streams()),
int max_channels = *(internal_sctp_transport_->max_inbound_streams()));
std::min(*(internal_sctp_transport_->max_outbound_streams()), // Record max channels.
*(internal_sctp_transport_->max_inbound_streams())); info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
// Record max channels. info_.MaxMessageSize(), max_channels);
info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
info_.MaxMessageSize(), max_channels);
}
} }
UpdateInformation(SctpTransportState::kConnected); UpdateInformation(SctpTransportState::kConnected);
} }

View File

@ -20,7 +20,6 @@
#include "media/sctp/sctp_transport_internal.h" #include "media/sctp/sctp_transport_internal.h"
#include "p2p/base/dtls_transport_internal.h" #include "p2p/base/dtls_transport_internal.h"
#include "pc/dtls_transport.h" #include "pc/dtls_transport.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h" #include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -54,12 +53,12 @@ class SctpTransport : public SctpTransportInterface,
// internal() to be functions on the webrtc::SctpTransport interface, // internal() to be functions on the webrtc::SctpTransport interface,
// and make the internal() function private. // and make the internal() function private.
cricket::SctpTransportInternal* internal() { cricket::SctpTransportInternal* internal() {
MutexLock lock(&lock_); RTC_DCHECK_RUN_ON(owner_thread_);
return internal_sctp_transport_.get(); return internal_sctp_transport_.get();
} }
const cricket::SctpTransportInternal* internal() const { const cricket::SctpTransportInternal* internal() const {
MutexLock lock(&lock_); RTC_DCHECK_RUN_ON(owner_thread_);
return internal_sctp_transport_.get(); return internal_sctp_transport_.get();
} }
@ -75,15 +74,12 @@ class SctpTransport : public SctpTransportInterface,
void OnDtlsStateChange(cricket::DtlsTransportInternal* transport, void OnDtlsStateChange(cricket::DtlsTransportInternal* transport,
cricket::DtlsTransportState state); cricket::DtlsTransportState state);
// Note - owner_thread never changes, but can't be const if we do // NOTE: |owner_thread_| is the thread that the SctpTransport object is
// Invoke() on it. // constructed on. In the context of PeerConnection, it's the network thread.
rtc::Thread* owner_thread_; rtc::Thread* const owner_thread_;
mutable Mutex lock_; SctpTransportInformation info_ RTC_GUARDED_BY(owner_thread_);
// Variables accessible off-thread, guarded by lock_
SctpTransportInformation info_ RTC_GUARDED_BY(lock_);
std::unique_ptr<cricket::SctpTransportInternal> internal_sctp_transport_ std::unique_ptr<cricket::SctpTransportInternal> internal_sctp_transport_
RTC_GUARDED_BY(lock_); RTC_GUARDED_BY(owner_thread_);
// Variables only accessed on-thread
SctpTransportObserverInterface* observer_ RTC_GUARDED_BY(owner_thread_) = SctpTransportObserverInterface* observer_ RTC_GUARDED_BY(owner_thread_) =
nullptr; nullptr;
rtc::scoped_refptr<DtlsTransport> dtls_transport_ rtc::scoped_refptr<DtlsTransport> dtls_transport_

View File

@ -729,6 +729,21 @@ bool CanAddLocalMediaStream(webrtc::StreamCollectionInterface* current_streams,
return true; return true;
} }
rtc::scoped_refptr<webrtc::DtlsTransport> LookupDtlsTransportByMid(
rtc::Thread* network_thread,
JsepTransportController* controller,
const std::string& mid) {
// TODO(tommi): Can we post this (and associated operations where this
// function is called) to the network thread and avoid this Invoke?
// We might be able to simplify a few things if we set the transport on
// the network thread and then update the implementation to check that
// the set_ and relevant get methods are always called on the network
// thread (we'll need to update proxy maps).
return network_thread->Invoke<rtc::scoped_refptr<webrtc::DtlsTransport>>(
RTC_FROM_HERE,
[controller, &mid] { return controller->LookupDtlsTransportByMid(mid); });
}
} // namespace } // namespace
// Used by parameterless SetLocalDescription() to create an offer or answer. // Used by parameterless SetLocalDescription() to create an offer or answer.
@ -1308,8 +1323,8 @@ RTCError SdpOfferAnswerHandler::ApplyLocalDescription(
// Note that code paths that don't set MID won't be able to use // Note that code paths that don't set MID won't be able to use
// information about DTLS transports. // information about DTLS transports.
if (transceiver->mid()) { if (transceiver->mid()) {
auto dtls_transport = transport_controller()->LookupDtlsTransportByMid( auto dtls_transport = LookupDtlsTransportByMid(
*transceiver->mid()); pc_->network_thread(), transport_controller(), *transceiver->mid());
transceiver->internal()->sender_internal()->set_transport( transceiver->internal()->sender_internal()->set_transport(
dtls_transport); dtls_transport);
transceiver->internal()->receiver_internal()->set_transport( transceiver->internal()->receiver_internal()->set_transport(
@ -1725,9 +1740,9 @@ RTCError SdpOfferAnswerHandler::ApplyRemoteDescription(
transceiver->internal()->set_current_direction(local_direction); transceiver->internal()->set_current_direction(local_direction);
// 2.2.8.1.11.[3-6]: Set the transport internal slots. // 2.2.8.1.11.[3-6]: Set the transport internal slots.
if (transceiver->mid()) { if (transceiver->mid()) {
auto dtls_transport = auto dtls_transport = LookupDtlsTransportByMid(pc_->network_thread(),
transport_controller()->LookupDtlsTransportByMid( transport_controller(),
*transceiver->mid()); *transceiver->mid());
transceiver->internal()->sender_internal()->set_transport( transceiver->internal()->sender_internal()->set_transport(
dtls_transport); dtls_transport);
transceiver->internal()->receiver_internal()->set_transport( transceiver->internal()->receiver_internal()->set_transport(
@ -4276,13 +4291,11 @@ RTCError SdpOfferAnswerHandler::PushdownMediaDescription(
// Need complete offer/answer with an SCTP m= section before starting SCTP, // Need complete offer/answer with an SCTP m= section before starting SCTP,
// according to https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-19 // according to https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-19
if (pc_->sctp_mid() && local_description() && remote_description()) { if (pc_->sctp_mid() && local_description() && remote_description()) {
rtc::scoped_refptr<SctpTransport> sctp_transport =
transport_controller()->GetSctpTransport(*(pc_->sctp_mid()));
auto local_sctp_description = cricket::GetFirstSctpDataContentDescription( auto local_sctp_description = cricket::GetFirstSctpDataContentDescription(
local_description()->description()); local_description()->description());
auto remote_sctp_description = cricket::GetFirstSctpDataContentDescription( auto remote_sctp_description = cricket::GetFirstSctpDataContentDescription(
remote_description()->description()); remote_description()->description());
if (sctp_transport && local_sctp_description && remote_sctp_description) { if (local_sctp_description && remote_sctp_description) {
int max_message_size; int max_message_size;
// A remote max message size of zero means "any size supported". // A remote max message size of zero means "any size supported".
// We configure the connection with our own max message size. // We configure the connection with our own max message size.
@ -4293,8 +4306,9 @@ RTCError SdpOfferAnswerHandler::PushdownMediaDescription(
std::min(local_sctp_description->max_message_size(), std::min(local_sctp_description->max_message_size(),
remote_sctp_description->max_message_size()); remote_sctp_description->max_message_size());
} }
sctp_transport->Start(local_sctp_description->port(), pc_->StartSctpTransport(local_sctp_description->port(),
remote_sctp_description->port(), max_message_size); remote_sctp_description->port(),
max_message_size);
} }
} }
@ -4520,8 +4534,16 @@ bool SdpOfferAnswerHandler::ReadyToUseRemoteCandidate(
return false; return false;
} }
std::string transport_name = GetTransportName(result.value()->name); bool has_transport = false;
return !transport_name.empty(); cricket::ChannelInterface* channel = pc_->GetChannel(result.value()->name);
if (channel) {
has_transport = !channel->transport_name().empty();
} else if (data_channel_controller()->data_channel_transport()) {
auto sctp_mid = pc_->sctp_mid();
RTC_DCHECK(sctp_mid);
has_transport = (result.value()->name == *sctp_mid);
}
return has_transport;
} }
void SdpOfferAnswerHandler::ReportRemoteIceCandidateAdded( void SdpOfferAnswerHandler::ReportRemoteIceCandidateAdded(
@ -4644,6 +4666,7 @@ cricket::VoiceChannel* SdpOfferAnswerHandler::CreateVoiceChannel(
cricket::VideoChannel* SdpOfferAnswerHandler::CreateVideoChannel( cricket::VideoChannel* SdpOfferAnswerHandler::CreateVideoChannel(
const std::string& mid) { const std::string& mid) {
RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK_RUN_ON(signaling_thread());
// NOTE: This involves a non-ideal hop (Invoke) over to the network thread.
RtpTransportInternal* rtp_transport = pc_->GetRtpTransport(mid); RtpTransportInternal* rtp_transport = pc_->GetRtpTransport(mid);
// TODO(bugs.webrtc.org/11992): CreateVideoChannel internally switches to the // TODO(bugs.webrtc.org/11992): CreateVideoChannel internally switches to the