From 27883a2593cd7690f262067e7e997dffa3eab3ad Mon Sep 17 00:00:00 2001 From: Harald Alvestrand Date: Thu, 26 Nov 2020 07:24:32 +0000 Subject: [PATCH] Annotate cricket::BaseChannel with thread guards This CL also adds commentary to member variables that couldn't be guarded because they're accessed from multiple threads. Bug: webrtc:12230 Change-Id: I5193a7ef36ab25588c76ee6a1863de6a844be1dc Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/195331 Reviewed-by: Tommi Commit-Queue: Harald Alvestrand Cr-Commit-Position: refs/heads/master@{#32705} --- pc/channel.cc | 77 +++++++++++++++++++++--------------- pc/channel.h | 106 ++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 121 insertions(+), 62 deletions(-) diff --git a/pc/channel.cc b/pc/channel.cc index 02ee9d2492..6ab43711b6 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -160,6 +160,9 @@ BaseChannel::~BaseChannel() { } std::string BaseChannel::ToString() const { + // TODO(bugs.webrtc.org/12230): When media_channel_ is guarded by + // worker_thread(), rewrite this debug printout to not print the + // media type when called from non-worker-thread. rtc::StringBuilder sb; sb << "{mid: " << content_name_; if (media_channel_) { @@ -170,8 +173,9 @@ std::string BaseChannel::ToString() const { } bool BaseChannel::ConnectToRtpTransport() { + RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(rtp_transport_); - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_n()) { RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString(); return false; } @@ -187,6 +191,7 @@ bool BaseChannel::ConnectToRtpTransport() { } void BaseChannel::DisconnectFromRtpTransport() { + RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(rtp_transport_); rtp_transport_->UnregisterRtpDemuxerSink(this); rtp_transport_->SignalReadyToSend.disconnect(this); @@ -196,7 +201,7 @@ void BaseChannel::DisconnectFromRtpTransport() { } void BaseChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) { - RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK_RUN_ON(worker_thread()); network_thread_->Invoke( RTC_FROM_HERE, [this, rtp_transport] { SetRtpTransport(rtp_transport); }); @@ -213,6 +218,7 @@ void BaseChannel::Deinit() { // functions, so need to stop this process in Deinit that is called in // derived classes destructor. network_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(network_thread()); FlushRtcpMessages_n(); if (rtp_transport_) { @@ -225,15 +231,15 @@ void BaseChannel::Deinit() { } bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) { - if (rtp_transport == rtp_transport_) { - return true; - } - if (!network_thread_->IsCurrent()) { return network_thread_->Invoke(RTC_FROM_HERE, [this, rtp_transport] { return SetRtpTransport(rtp_transport); }); } + RTC_DCHECK_RUN_ON(network_thread()); + if (rtp_transport == rtp_transport_) { + return true; + } if (rtp_transport_) { DisconnectFromRtpTransport(); @@ -338,7 +344,6 @@ int BaseChannel::SetOption(SocketType type, int BaseChannel::SetOption_n(SocketType type, rtc::Socket::Option opt, int value) { - RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(rtp_transport_); switch (type) { case ST_RTP: @@ -376,6 +381,7 @@ void BaseChannel::OnNetworkRouteChanged( // work correctly. Intentionally leave it broken to simplify the code and // encourage the users to stop using non-muxing RTCP. invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [=] { + RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnNetworkRouteChanged(transport_name_, new_route); }); } @@ -393,8 +399,10 @@ sigslot::signal1& BaseChannel::SignalSentPacket() { } void BaseChannel::OnTransportReadyToSend(bool ready) { - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, - [=] { media_channel_->OnReadyToSend(ready); }); + invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [=] { + RTC_DCHECK_RUN_ON(worker_thread()); + media_channel_->OnReadyToSend(ready); + }); } bool BaseChannel::SendPacket(bool rtcp, @@ -418,6 +426,7 @@ bool BaseChannel::SendPacket(bool rtcp, network_thread_->Post(RTC_FROM_HERE, this, message_id, data); return true; } + RTC_DCHECK_RUN_ON(network_thread()); TRACE_EVENT0("webrtc", "BaseChannel::SendPacket"); @@ -506,25 +515,34 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { void BaseChannel::UpdateRtpHeaderExtensionMap( const RtpHeaderExtensions& header_extensions) { - RTC_DCHECK(rtp_transport_); // Update the header extension map on network thread in case there is data // race. - // TODO(zhihuang): Add an rtc::ThreadChecker make sure to RtpTransport won't - // be accessed from different threads. // // NOTE: This doesn't take the BUNDLE case in account meaning the RTP header // extension maps are not merged when BUNDLE is enabled. This is fine because // the ID for MID should be consistent among all the RTP transports. network_thread_->Invoke(RTC_FROM_HERE, [this, &header_extensions] { + RTC_DCHECK_RUN_ON(network_thread()); rtp_transport_->UpdateRtpHeaderExtensionMap(header_extensions); }); } -bool BaseChannel::RegisterRtpDemuxerSink() { +bool BaseChannel::RegisterRtpDemuxerSink_w() { + // Copy demuxer criteria, since they're a worker-thread variable + // and we want to pass them to the network thread + return network_thread_->Invoke( + RTC_FROM_HERE, [this, demuxer_criteria = demuxer_criteria_] { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(rtp_transport_); + return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria, this); + }); +} + +bool BaseChannel::RegisterRtpDemuxerSink_n() { RTC_DCHECK(rtp_transport_); - return network_thread_->Invoke(RTC_FROM_HERE, [this] { - return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this); - }); + // TODO(bugs.webrtc.org/12230): This accesses demuxer_criteria_ on the + // networking thread. + return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this); } void BaseChannel::EnableMedia_w() { @@ -557,7 +575,6 @@ void BaseChannel::UpdateWritableState_n() { } void BaseChannel::ChannelWritable_n() { - RTC_DCHECK_RUN_ON(network_thread()); if (writable_) { return; } @@ -571,7 +588,6 @@ void BaseChannel::ChannelWritable_n() { } void BaseChannel::ChannelNotWritable_n() { - RTC_DCHECK_RUN_ON(network_thread()); if (!writable_) return; @@ -581,12 +597,10 @@ void BaseChannel::ChannelNotWritable_n() { } bool BaseChannel::AddRecvStream_w(const StreamParams& sp) { - RTC_DCHECK(worker_thread() == rtc::Thread::Current()); return media_channel()->AddRecvStream(sp); } bool BaseChannel::RemoveRecvStream_w(uint32_t ssrc) { - RTC_DCHECK(worker_thread() == rtc::Thread::Current()); return media_channel()->RemoveRecvStream(ssrc); } @@ -596,7 +610,6 @@ void BaseChannel::ResetUnsignaledRecvStream_w() { } bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) { - RTC_DCHECK_RUN_ON(worker_thread()); if (enabled == payload_type_demuxing_enabled_) { return true; } @@ -609,7 +622,7 @@ bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) { // there is no straightforward way to identify those streams. media_channel()->ResetUnsignaledRecvStream(); demuxer_criteria_.payload_types.clear(); - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to disable payload type demuxing for " << ToString(); return false; @@ -617,7 +630,7 @@ bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) { } else if (!payload_types_.empty()) { demuxer_criteria_.payload_types.insert(payload_types_.begin(), payload_types_.end()); - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to enable payload type demuxing for " << ToString(); return false; @@ -765,7 +778,7 @@ bool BaseChannel::UpdateRemoteStreams_w( new_stream.ssrcs.end()); } // Re-register the sink to update the receiving ssrcs. - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString(); ret = false; } @@ -775,7 +788,6 @@ bool BaseChannel::UpdateRemoteStreams_w( RtpHeaderExtensions BaseChannel::GetFilteredRtpHeaderExtensions( const RtpHeaderExtensions& extensions) { - RTC_DCHECK(rtp_transport_); if (crypto_options_.srtp.enable_encrypted_rtp_header_extensions) { RtpHeaderExtensions filtered; absl::c_copy_if(extensions, std::back_inserter(filtered), @@ -826,7 +838,6 @@ void BaseChannel::ClearHandledPayloadTypes() { void BaseChannel::FlushRtcpMessages_n() { // Flush all remaining RTCP messages. This should only be called in // destructor. - RTC_DCHECK_RUN_ON(network_thread()); rtc::MessageList rtcp_messages; network_thread_->Clear(this, MSG_SEND_RTCP_PACKET, &rtcp_messages); for (const auto& message : rtcp_messages) { @@ -836,7 +847,6 @@ void BaseChannel::FlushRtcpMessages_n() { } void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) { - RTC_DCHECK_RUN_ON(network_thread()); invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [this, sent_packet] { RTC_DCHECK_RUN_ON(worker_thread()); @@ -881,6 +891,7 @@ void VoiceChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) { void VoiceChannel::UpdateMediaSendRecvState_w() { // Render incoming data if we're the active call, and we have the local // content. We receive data on the default channel and multiplexed streams. + RTC_DCHECK_RUN_ON(worker_thread()); bool recv = IsReadyToReceiveMedia_w(); media_channel()->SetPlayout(recv); @@ -931,7 +942,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content, MaybeAddHandledPayloadType(codec.id); } // Need to re-register the sink to update the handled payload. - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to set up audio demuxing for " << ToString(); return false; } @@ -997,7 +1008,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content, "disable payload type demuxing for " << ToString(); ClearHandledPayloadTypes(); - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to update audio demuxing for " << ToString(); return false; } @@ -1048,6 +1059,7 @@ VideoChannel::~VideoChannel() { void VideoChannel::UpdateMediaSendRecvState_w() { // Send outgoing data if we're the active call, we have the remote content, // and we have had some form of connectivity. + RTC_DCHECK_RUN_ON(worker_thread()); bool send = IsReadyToSendMedia_w(); if (!media_channel()->SetSend(send)) { RTC_LOG(LS_ERROR) << "Failed to SetSend on video channel: " + ToString(); @@ -1124,7 +1136,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content, MaybeAddHandledPayloadType(codec.id); } // Need to re-register the sink to update the handled payload. - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to set up video demuxing for " << ToString(); return false; } @@ -1234,7 +1246,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content, "disable payload type demuxing for " << ToString(); ClearHandledPayloadTypes(); - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to update video demuxing for " << ToString(); return false; } @@ -1349,7 +1361,7 @@ bool RtpDataChannel::SetLocalContent_w(const MediaContentDescription* content, MaybeAddHandledPayloadType(codec.id); } // Need to re-register the sink to update the handled payload. - if (!RegisterRtpDemuxerSink()) { + if (!RegisterRtpDemuxerSink_w()) { RTC_LOG(LS_ERROR) << "Failed to set up data demuxing for " << ToString(); return false; } @@ -1437,6 +1449,7 @@ bool RtpDataChannel::SetRemoteContent_w(const MediaContentDescription* content, void RtpDataChannel::UpdateMediaSendRecvState_w() { // Render incoming data if we're the active call, and we have the local // content. We receive data on the default channel and multiplexed streams. + RTC_DCHECK_RUN_ON(worker_thread()); bool recv = IsReadyToReceiveMedia_w(); if (!media_channel()->SetReceive(recv)) { RTC_LOG(LS_ERROR) << "Failed to SetReceive on data channel: " << ToString(); diff --git a/pc/channel.h b/pc/channel.h index 51cc40fc53..ad75070c86 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -106,6 +106,13 @@ class BaseChannel : public ChannelInterface, // This function returns true if using SRTP (DTLS-based keying or SDES). bool srtp_active() const { + // TODO(bugs.webrtc.org/12230): At least some tests call this function + // from other threads. + if (!network_thread_->IsCurrent()) { + return network_thread_->Invoke(RTC_FROM_HERE, + [this] { return srtp_active(); }); + } + RTC_DCHECK_RUN_ON(network_thread()); return rtp_transport_ && rtp_transport_->IsSrtpActive(); } @@ -117,7 +124,16 @@ class BaseChannel : public ChannelInterface, // internally. It would replace the |SetTransports| and its variants. bool SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) override; - webrtc::RtpTransportInternal* rtp_transport() const { return rtp_transport_; } + webrtc::RtpTransportInternal* rtp_transport() const { + // TODO(bugs.webrtc.org/12230): At least some tests call this function + // from other threads. + if (!network_thread_->IsCurrent()) { + return network_thread_->Invoke( + RTC_FROM_HERE, [this] { return rtp_transport(); }); + } + RTC_DCHECK_RUN_ON(network_thread()); + return rtp_transport_; + } // Channel control bool SetLocalContent(const MediaContentDescription* content, @@ -156,7 +172,8 @@ class BaseChannel : public ChannelInterface, // Only public for unit tests. Otherwise, consider protected. int SetOption(SocketType type, rtc::Socket::Option o, int val) override; - int SetOption_n(SocketType type, rtc::Socket::Option o, int val); + int SetOption_n(SocketType type, rtc::Socket::Option o, int val) + RTC_RUN_ON(network_thread()); // RtpPacketSinkInterface overrides. void OnRtpPacket(const webrtc::RtpPacketReceived& packet) override; @@ -167,14 +184,24 @@ class BaseChannel : public ChannelInterface, transport_name_ = transport_name; } - MediaChannel* media_channel() const override { return media_channel_.get(); } + MediaChannel* media_channel() const override { + // TODO(bugs.webrtc.org/12230): Called on multiple threads, + // including from StatsCollector::ExtractMediaInfo. + // RTC_DCHECK_RUN_ON(worker_thread()); + return media_channel_.get(); + } protected: - bool was_ever_writable() const { return was_ever_writable_; } + bool was_ever_writable() const { + RTC_DCHECK_RUN_ON(network_thread()); + return was_ever_writable_; + } void set_local_content_direction(webrtc::RtpTransceiverDirection direction) { + RTC_DCHECK_RUN_ON(worker_thread()); local_content_direction_ = direction; } void set_remote_content_direction(webrtc::RtpTransceiverDirection direction) { + RTC_DCHECK_RUN_ON(worker_thread()); remote_content_direction_ = direction; } // These methods verify that: @@ -187,11 +214,11 @@ class BaseChannel : public ChannelInterface, // // When any of these properties change, UpdateMediaSendRecvState_w should be // called. - bool IsReadyToReceiveMedia_w() const; - bool IsReadyToSendMedia_w() const; + bool IsReadyToReceiveMedia_w() const RTC_RUN_ON(worker_thread()); + bool IsReadyToSendMedia_w() const RTC_RUN_ON(worker_thread()); rtc::Thread* signaling_thread() { return signaling_thread_; } - void FlushRtcpMessages_n(); + void FlushRtcpMessages_n() RTC_RUN_ON(network_thread()); // NetworkInterface implementation, called by MediaEngine bool SendPacket(rtc::CopyOnWriteBuffer* packet, @@ -211,22 +238,23 @@ class BaseChannel : public ChannelInterface, rtc::CopyOnWriteBuffer* packet, const rtc::PacketOptions& options); - void EnableMedia_w(); - void DisableMedia_w(); + void EnableMedia_w() RTC_RUN_ON(worker_thread()); + void DisableMedia_w() RTC_RUN_ON(worker_thread()); // Performs actions if the RTP/RTCP writable state changed. This should // be called whenever a channel's writable state changes or when RTCP muxing // becomes active/inactive. - void UpdateWritableState_n(); - void ChannelWritable_n(); - void ChannelNotWritable_n(); + void UpdateWritableState_n() RTC_RUN_ON(network_thread()); + void ChannelWritable_n() RTC_RUN_ON(network_thread()); + void ChannelNotWritable_n() RTC_RUN_ON(network_thread()); - bool AddRecvStream_w(const StreamParams& sp); - bool RemoveRecvStream_w(uint32_t ssrc); - void ResetUnsignaledRecvStream_w(); - bool SetPayloadTypeDemuxingEnabled_w(bool enabled); - bool AddSendStream_w(const StreamParams& sp); - bool RemoveSendStream_w(uint32_t ssrc); + bool AddRecvStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread()); + bool RemoveRecvStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread()); + void ResetUnsignaledRecvStream_w() RTC_RUN_ON(worker_thread()); + bool SetPayloadTypeDemuxingEnabled_w(bool enabled) + RTC_RUN_ON(worker_thread()); + bool AddSendStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread()); + bool RemoveSendStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread()); // Should be called whenever the conditions for // IsReadyToReceiveMedia/IsReadyToSendMedia are satisfied (or unsatisfied). @@ -236,10 +264,12 @@ class BaseChannel : public ChannelInterface, bool UpdateLocalStreams_w(const std::vector& streams, webrtc::SdpType type, - std::string* error_desc); + std::string* error_desc) + RTC_RUN_ON(worker_thread()); bool UpdateRemoteStreams_w(const std::vector& streams, webrtc::SdpType type, - std::string* error_desc); + std::string* error_desc) + RTC_RUN_ON(worker_thread()); virtual bool SetLocalContent_w(const MediaContentDescription* content, webrtc::SdpType type, std::string* error_desc) = 0; @@ -271,7 +301,8 @@ class BaseChannel : public ChannelInterface, void UpdateRtpHeaderExtensionMap( const RtpHeaderExtensions& header_extensions); - bool RegisterRtpDemuxerSink(); + bool RegisterRtpDemuxerSink_w() RTC_RUN_ON(worker_thread()); + bool RegisterRtpDemuxerSink_n() RTC_RUN_ON(network_thread()); // Return description of media channel to facilitate logging std::string ToString() const; @@ -281,8 +312,9 @@ class BaseChannel : public ChannelInterface, private: bool ConnectToRtpTransport(); void DisconnectFromRtpTransport(); - void SignalSentPacket_n(const rtc::SentPacket& sent_packet); - bool IsReadyToSendMedia_n() const; + void SignalSentPacket_n(const rtc::SentPacket& sent_packet) + RTC_RUN_ON(network_thread()); + bool IsReadyToSendMedia_n() const RTC_RUN_ON(network_thread()); rtc::Thread* const worker_thread_; rtc::Thread* const network_thread_; @@ -296,27 +328,39 @@ class BaseChannel : public ChannelInterface, const std::string content_name_; // Won't be set when using raw packet transports. SDP-specific thing. + // TODO(bugs.webrtc.org/12230): Written on network thread, read on + // worker thread (at least). std::string transport_name_; - webrtc::RtpTransportInternal* rtp_transport_ = nullptr; + webrtc::RtpTransportInternal* rtp_transport_ + RTC_GUARDED_BY(network_thread()) = nullptr; - std::vector > socket_options_; - std::vector > rtcp_socket_options_; + std::vector > socket_options_ + RTC_GUARDED_BY(network_thread()); + std::vector > rtcp_socket_options_ + RTC_GUARDED_BY(network_thread()); + // TODO(bugs.webrtc.org/12230): writable_ is accessed in tests + // outside of the network thread. bool writable_ = false; - bool was_ever_writable_ = false; + bool was_ever_writable_ RTC_GUARDED_BY(network_thread()) = false; const bool srtp_required_ = true; - webrtc::CryptoOptions crypto_options_; + const webrtc::CryptoOptions crypto_options_; // MediaChannel related members that should be accessed from the worker // thread. + // TODO(bugs.webrtc.org/12230): written on worker thread, accessed by + // multiple threads. std::unique_ptr media_channel_; // Currently the |enabled_| flag is accessed from the signaling thread as // well, but it can be changed only when signaling thread does a synchronous // call to the worker thread, so it should be safe. bool enabled_ = false; bool payload_type_demuxing_enabled_ RTC_GUARDED_BY(worker_thread()) = true; - std::vector local_streams_; - std::vector remote_streams_; + std::vector local_streams_ RTC_GUARDED_BY(worker_thread()); + std::vector remote_streams_ RTC_GUARDED_BY(worker_thread()); + // TODO(bugs.webrtc.org/12230): local_content_direction and + // remote_content_direction are set on the worker thread, but accessed on the + // network thread. webrtc::RtpTransceiverDirection local_content_direction_ = webrtc::RtpTransceiverDirection::kInactive; webrtc::RtpTransceiverDirection remote_content_direction_ = @@ -324,6 +368,8 @@ class BaseChannel : public ChannelInterface, // Cached list of payload types, used if payload type demuxing is re-enabled. std::set payload_types_ RTC_GUARDED_BY(worker_thread()); + // TODO(bugs.webrtc.org/12239): Modified on worker thread, accessed + // on network thread in RegisterRtpDemuxerSink_n (called from Init_w) webrtc::RtpDemuxerCriteria demuxer_criteria_; // This generator is used to generate SSRCs for local streams. // This is needed in cases where SSRCs are not negotiated or set explicitly