Fix unsynchronized access to jsep_transports_by_name_.

Also removing need for lock for ice restart flag, fix call paths and
add information about how JsepTransportController's events could live
fully on the network thread and complexity around signaling thread
should be handled by PeerConnection (more details in webrtc:12427).

Bug: webrtc:12426, webrtc:12427
Change-Id: I9b1fae8acf16d90d9716054fc3c390700877a82a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205221
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33159}
This commit is contained in:
Tomas Gunnarsson 2021-02-04 10:22:50 +01:00 committed by Commit Bot
parent 426b6e49de
commit 20f7456da9
6 changed files with 81 additions and 58 deletions

View File

@ -232,13 +232,11 @@ webrtc::RTCError JsepTransport::SetLocalJsepTransportDescription(
local_description_.reset();
return error;
}
{
webrtc::MutexLock lock(&accessor_lock_);
if (needs_ice_restart_ && ice_restarting) {
needs_ice_restart_ = false;
RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag cleared for transport "
<< mid();
}
if (needs_ice_restart_ && ice_restarting) {
needs_ice_restart_ = false;
RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag cleared for transport "
<< mid();
}
return webrtc::RTCError::OK();
@ -341,7 +339,7 @@ webrtc::RTCError JsepTransport::AddRemoteCandidates(
}
void JsepTransport::SetNeedsIceRestartFlag() {
webrtc::MutexLock lock(&accessor_lock_);
RTC_DCHECK_RUN_ON(network_thread_);
if (!needs_ice_restart_) {
needs_ice_restart_ = true;
RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag set for transport " << mid();

View File

@ -141,16 +141,14 @@ class JsepTransport : public sigslot::has_slots<> {
// set, offers should generate new ufrags/passwords until an ICE restart
// occurs.
//
// This and the below method can be called safely from any thread as long as
// SetXTransportDescription is not in progress.
// TODO(tommi): Investigate on which threads (network or signal?) we really
// need to access the needs_ice_restart flag.
void SetNeedsIceRestartFlag() RTC_LOCKS_EXCLUDED(accessor_lock_);
// This and |needs_ice_restart()| must be called on the network thread.
void SetNeedsIceRestartFlag();
// Returns true if the ICE restart flag above was set, and no ICE restart has
// occurred yet for this transport (by applying a local description with
// changed ufrag/password).
bool needs_ice_restart() const RTC_LOCKS_EXCLUDED(accessor_lock_) {
webrtc::MutexLock lock(&accessor_lock_);
bool needs_ice_restart() const {
RTC_DCHECK_RUN_ON(network_thread_);
return needs_ice_restart_;
}
@ -335,7 +333,7 @@ class JsepTransport : public sigslot::has_slots<> {
mutable webrtc::Mutex accessor_lock_;
const std::string mid_;
// needs-ice-restart bit as described in JSEP.
bool needs_ice_restart_ RTC_GUARDED_BY(accessor_lock_) = false;
bool needs_ice_restart_ RTC_GUARDED_BY(network_thread_) = false;
rtc::scoped_refptr<rtc::RTCCertificate> local_certificate_
RTC_GUARDED_BY(network_thread_);
std::unique_ptr<JsepTransportDescription> local_description_

View File

@ -210,6 +210,7 @@ void JsepTransportController::SetIceConfig(const cricket::IceConfig& config) {
}
void JsepTransportController::SetNeedsIceRestartFlag() {
RTC_DCHECK_RUN_ON(network_thread_);
for (auto& kv : jsep_transports_by_name_) {
kv.second->SetNeedsIceRestartFlag();
}
@ -217,6 +218,14 @@ void JsepTransportController::SetNeedsIceRestartFlag() {
bool JsepTransportController::NeedsIceRestart(
const std::string& transport_name) const {
if (!network_thread_->IsCurrent()) {
RTC_DCHECK_RUN_ON(signaling_thread_);
return network_thread_->Invoke<bool>(
RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); });
}
RTC_DCHECK_RUN_ON(network_thread_);
const cricket::JsepTransport* transport =
GetJsepTransportByName(transport_name);
if (!transport) {
@ -246,6 +255,8 @@ bool JsepTransportController::SetLocalCertificate(
RTC_FROM_HERE, [&] { return SetLocalCertificate(certificate); });
}
RTC_DCHECK_RUN_ON(network_thread_);
// Can't change a certificate, or set a null certificate.
if (certificate_ || !certificate) {
return false;
@ -273,6 +284,8 @@ JsepTransportController::GetLocalCertificate(
RTC_FROM_HERE, [&] { return GetLocalCertificate(transport_name); });
}
RTC_DCHECK_RUN_ON(network_thread_);
const cricket::JsepTransport* t = GetJsepTransportByName(transport_name);
if (!t) {
return nullptr;
@ -287,6 +300,7 @@ JsepTransportController::GetRemoteSSLCertChain(
return network_thread_->Invoke<std::unique_ptr<rtc::SSLCertChain>>(
RTC_FROM_HERE, [&] { return GetRemoteSSLCertChain(transport_name); });
}
RTC_DCHECK_RUN_ON(network_thread_);
// Get the certificate from the RTP transport's DTLS handshake. Should be
// identical to the RTCP transport's, since they were given the same remote
@ -324,6 +338,8 @@ RTCError JsepTransportController::AddRemoteCandidates(
});
}
RTC_DCHECK_RUN_ON(network_thread_);
// Verify each candidate before passing down to the transport layer.
RTCError error = VerifyCandidates(candidates);
if (!error.ok()) {
@ -345,6 +361,8 @@ RTCError JsepTransportController::RemoveRemoteCandidates(
RTC_FROM_HERE, [&] { return RemoveRemoteCandidates(candidates); });
}
RTC_DCHECK_RUN_ON(network_thread_);
// Verify each candidate before passing down to the transport layer.
RTCError error = VerifyCandidates(candidates);
if (!error.ok()) {
@ -392,6 +410,8 @@ bool JsepTransportController::GetStats(const std::string& transport_name,
RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); });
}
RTC_DCHECK_RUN_ON(network_thread_);
cricket::JsepTransport* transport = GetJsepTransportByName(transport_name);
if (!transport) {
return false;
@ -450,7 +470,7 @@ std::unique_ptr<cricket::DtlsTransportInternal>
JsepTransportController::CreateDtlsTransport(
const cricket::ContentInfo& content_info,
cricket::IceTransportInternal* ice) {
RTC_DCHECK(network_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(network_thread_);
std::unique_ptr<cricket::DtlsTransportInternal> dtls;
@ -504,7 +524,7 @@ JsepTransportController::CreateUnencryptedRtpTransport(
const std::string& transport_name,
rtc::PacketTransportInternal* rtp_packet_transport,
rtc::PacketTransportInternal* rtcp_packet_transport) {
RTC_DCHECK(network_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(network_thread_);
auto unencrypted_rtp_transport =
std::make_unique<RtpTransport>(rtcp_packet_transport == nullptr);
unencrypted_rtp_transport->SetRtpPacketTransport(rtp_packet_transport);
@ -519,7 +539,7 @@ JsepTransportController::CreateSdesTransport(
const std::string& transport_name,
cricket::DtlsTransportInternal* rtp_dtls_transport,
cricket::DtlsTransportInternal* rtcp_dtls_transport) {
RTC_DCHECK(network_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(network_thread_);
auto srtp_transport =
std::make_unique<webrtc::SrtpTransport>(rtcp_dtls_transport == nullptr);
RTC_DCHECK(rtp_dtls_transport);
@ -555,6 +575,7 @@ JsepTransportController::CreateDtlsSrtpTransport(
std::vector<cricket::DtlsTransportInternal*>
JsepTransportController::GetDtlsTransports() {
RTC_DCHECK_RUN_ON(network_thread_);
std::vector<cricket::DtlsTransportInternal*> dtls_transports;
for (auto it = jsep_transports_by_name_.begin();
it != jsep_transports_by_name_.end(); ++it) {
@ -1066,8 +1087,6 @@ void JsepTransportController::MaybeDestroyJsepTransport(
}
void JsepTransportController::DestroyAllJsepTransports_n() {
RTC_DCHECK(network_thread_->IsCurrent());
for (const auto& jsep_transport : jsep_transports_by_name_) {
config_.transport_observer->OnTransportChanged(jsep_transport.first,
nullptr, nullptr, nullptr);
@ -1077,10 +1096,9 @@ void JsepTransportController::DestroyAllJsepTransports_n() {
}
void JsepTransportController::SetIceRole_n(cricket::IceRole ice_role) {
RTC_DCHECK(network_thread_->IsCurrent());
ice_role_ = ice_role;
for (auto& dtls : GetDtlsTransports()) {
auto dtls_transports = GetDtlsTransports();
for (auto& dtls : dtls_transports) {
dtls->ice_transport()->SetIceRole(ice_role_);
}
}
@ -1135,7 +1153,6 @@ cricket::IceRole JsepTransportController::DetermineIceRole(
void JsepTransportController::OnTransportWritableState_n(
rtc::PacketTransportInternal* transport) {
RTC_DCHECK(network_thread_->IsCurrent());
RTC_LOG(LS_INFO) << " Transport " << transport->transport_name()
<< " writability changed to " << transport->writable()
<< ".";
@ -1144,27 +1161,25 @@ void JsepTransportController::OnTransportWritableState_n(
void JsepTransportController::OnTransportReceivingState_n(
rtc::PacketTransportInternal* transport) {
RTC_DCHECK(network_thread_->IsCurrent());
UpdateAggregateStates_n();
}
void JsepTransportController::OnTransportGatheringState_n(
cricket::IceTransportInternal* transport) {
RTC_DCHECK(network_thread_->IsCurrent());
UpdateAggregateStates_n();
}
void JsepTransportController::OnTransportCandidateGathered_n(
cricket::IceTransportInternal* transport,
const cricket::Candidate& candidate) {
RTC_DCHECK(network_thread_->IsCurrent());
// We should never signal peer-reflexive candidates.
if (candidate.type() == cricket::PRFLX_PORT_TYPE) {
RTC_NOTREACHED();
return;
}
std::string transport_name = transport->transport_name();
// TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be
// able to just call this directly here.
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] {
signal_ice_candidates_gathered_.Send(
@ -1175,8 +1190,6 @@ void JsepTransportController::OnTransportCandidateGathered_n(
void JsepTransportController::OnTransportCandidateError_n(
cricket::IceTransportInternal* transport,
const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK(network_thread_->IsCurrent());
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this, event] {
signal_ice_candidate_error_.Send(event);
});
@ -1197,7 +1210,6 @@ void JsepTransportController::OnTransportCandidatePairChanged_n(
void JsepTransportController::OnTransportRoleConflict_n(
cricket::IceTransportInternal* transport) {
RTC_DCHECK(network_thread_->IsCurrent());
// Note: since the role conflict is handled entirely on the network thread,
// we don't need to worry about role conflicts occurring on two ports at
// once. The first one encountered should immediately reverse the role.
@ -1214,7 +1226,6 @@ void JsepTransportController::OnTransportRoleConflict_n(
void JsepTransportController::OnTransportStateChanged_n(
cricket::IceTransportInternal* transport) {
RTC_DCHECK(network_thread_->IsCurrent());
RTC_LOG(LS_INFO) << transport->transport_name() << " Transport "
<< transport->component()
<< " state changed. Check if state is complete.";
@ -1222,8 +1233,6 @@ void JsepTransportController::OnTransportStateChanged_n(
}
void JsepTransportController::UpdateAggregateStates_n() {
RTC_DCHECK(network_thread_->IsCurrent());
auto dtls_transports = GetDtlsTransports();
cricket::IceConnectionState new_connection_state =
cricket::kIceConnectionConnecting;

View File

@ -227,6 +227,8 @@ class JsepTransportController : public sigslot::has_slots<> {
// F: void(const std::string&, const std::vector<cricket::Candidate>&)
template <typename F>
void SubscribeIceCandidateGathered(F&& callback) {
// TODO(bugs.webrtc.org/12427): Post this subscription to the network
// thread.
signal_ice_candidates_gathered_.AddReceiver(std::forward<F>(callback));
}
@ -294,6 +296,7 @@ class JsepTransportController : public sigslot::has_slots<> {
CallbackList<cricket::IceGatheringState> signal_ice_gathering_state_;
// [mid, candidates]
// TODO(bugs.webrtc.org/12427): Protect this with network_thread_.
CallbackList<const std::string&, const std::vector<cricket::Candidate>&>
signal_ice_candidates_gathered_;
@ -366,9 +369,9 @@ class JsepTransportController : public sigslot::has_slots<> {
// Get the JsepTransport without considering the BUNDLE group. Return nullptr
// if the JsepTransport is destroyed.
const cricket::JsepTransport* GetJsepTransportByName(
const std::string& transport_name) const;
const std::string& transport_name) const RTC_RUN_ON(network_thread_);
cricket::JsepTransport* GetJsepTransportByName(
const std::string& transport_name);
const std::string& transport_name) RTC_RUN_ON(network_thread_);
// Creates jsep transport. Noop if transport is already created.
// Transport is created either during SetLocalDescription (|local| == true) or
@ -454,7 +457,7 @@ class JsepTransportController : public sigslot::has_slots<> {
AsyncResolverFactory* const async_resolver_factory_ = nullptr;
std::map<std::string, std::unique_ptr<cricket::JsepTransport>>
jsep_transports_by_name_;
jsep_transports_by_name_ RTC_GUARDED_BY(network_thread_);
// This keeps track of the mapping between media section
// (BaseChannel/SctpTransport) and the JsepTransport underneath.
std::map<std::string, cricket::JsepTransport*> mid_to_transport_;

View File

@ -528,6 +528,9 @@ RTCError PeerConnection::Initialize(
// The port allocator lives on the network thread and should be initialized
// there.
// TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and
// initialize all the |transport_controller_->Subscribe*| calls below on the
// network thread via this invoke.
const auto pa_result =
network_thread()->Invoke<InitializePortAllocatorResult>(
RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] {
@ -620,6 +623,10 @@ RTCError PeerConnection::Initialize(
// due to lack of unit tests which trigger these scenarios.
// TODO(bugs.webrtc.org/12160): Remove above comments.
// callbacks for signaling_thread.
// TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network
// Invoke(), then perhaps we could post these subscription calls to the
// network thread so that the transport controller doesn't have to do the
// signaling/network handling internally and use AsyncInvoker.
transport_controller_->SubscribeIceConnectionState(
[this](cricket::IceConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
@ -1379,10 +1386,29 @@ RTCError PeerConnection::SetConfiguration(
const bool has_local_description = local_description() != nullptr;
// In theory this shouldn't fail.
const bool needs_ice_restart =
modified_config.servers != configuration_.servers ||
NeedIceRestart(
configuration_.surface_ice_candidates_on_ice_transport_type_changed,
configuration_.type, modified_config.type) ||
modified_config.GetTurnPortPrunePolicy() !=
configuration_.GetTurnPortPrunePolicy();
cricket::IceConfig ice_config = ParseIceConfig(modified_config);
// Apply part of the configuration on the network thread. In theory this
// shouldn't fail.
if (!network_thread()->Invoke<bool>(
RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &modified_config,
has_local_description] {
RTC_FROM_HERE,
[this, needs_ice_restart, &ice_config, &stun_servers, &turn_servers,
&modified_config, has_local_description] {
// As described in JSEP, calling setConfiguration with new ICE
// servers or candidate policy must set a "needs-ice-restart" bit so
// that the next offer triggers an ICE restart which will pick up
// the changes.
if (needs_ice_restart)
transport_controller_->SetNeedsIceRestartFlag();
transport_controller_->SetIceConfig(ice_config);
return ReconfigurePortAllocator_n(
stun_servers, turn_servers, modified_config.type,
modified_config.ice_candidate_pool_size,
@ -1395,20 +1421,6 @@ RTCError PeerConnection::SetConfiguration(
"Failed to apply configuration to PortAllocator.");
}
// As described in JSEP, calling setConfiguration with new ICE servers or
// candidate policy must set a "needs-ice-restart" bit so that the next offer
// triggers an ICE restart which will pick up the changes.
if (modified_config.servers != configuration_.servers ||
NeedIceRestart(
configuration_.surface_ice_candidates_on_ice_transport_type_changed,
configuration_.type, modified_config.type) ||
modified_config.GetTurnPortPrunePolicy() !=
configuration_.GetTurnPortPrunePolicy()) {
transport_controller_->SetNeedsIceRestartFlag();
}
transport_controller_->SetIceConfig(ParseIceConfig(modified_config));
if (configuration_.active_reset_srtp_params !=
modified_config.active_reset_srtp_params) {
transport_controller_->SetActiveResetSrtpParams(
@ -2155,6 +2167,8 @@ void PeerConnection::OnTransportControllerConnectionState(
void PeerConnection::OnTransportControllerCandidatesGathered(
const std::string& transport_name,
const cricket::Candidates& candidates) {
// TODO(bugs.webrtc.org/12427): Expect this to come in on the network thread
// (not signaling as it currently does), handle appropriately.
int sdp_mline_index;
if (!GetLocalCandidateMediaIndex(transport_name, &sdp_mline_index)) {
RTC_LOG(LS_ERROR)

View File

@ -194,7 +194,7 @@ WebRtcSessionDescriptionFactory::WebRtcSessionDescriptionFactory(
}
WebRtcSessionDescriptionFactory::~WebRtcSessionDescriptionFactory() {
RTC_DCHECK(signaling_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(signaling_thread_);
// Fail any requests that were asked for before identity generation completed.
FailPendingRequests(kFailedDueToSessionShutdown);
@ -222,6 +222,7 @@ void WebRtcSessionDescriptionFactory::CreateOffer(
CreateSessionDescriptionObserver* observer,
const PeerConnectionInterface::RTCOfferAnswerOptions& options,
const cricket::MediaSessionOptions& session_options) {
RTC_DCHECK_RUN_ON(signaling_thread_);
std::string error = "CreateOffer";
if (certificate_request_state_ == CERTIFICATE_FAILED) {
error += kFailedDueToIdentityFailed;
@ -441,7 +442,7 @@ void WebRtcSessionDescriptionFactory::InternalCreateAnswer(
void WebRtcSessionDescriptionFactory::FailPendingRequests(
const std::string& reason) {
RTC_DCHECK(signaling_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(signaling_thread_);
while (!create_session_description_requests_.empty()) {
const CreateSessionDescriptionRequest& request =
create_session_description_requests_.front();
@ -476,7 +477,7 @@ void WebRtcSessionDescriptionFactory::PostCreateSessionDescriptionSucceeded(
}
void WebRtcSessionDescriptionFactory::OnCertificateRequestFailed() {
RTC_DCHECK(signaling_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_LOG(LS_ERROR) << "Asynchronous certificate generation request failed.";
certificate_request_state_ = CERTIFICATE_FAILED;