diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 106ee55025..e4311ec952 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -400,6 +400,8 @@ rtc_library("rtp_transceiver") { "../rtc_base:macromagic", "../rtc_base:refcount", "../rtc_base:threading", + "../rtc_base/task_utils:pending_task_safety_flag", + "../rtc_base/task_utils:to_queued_task", "../rtc_base/third_party/sigslot", ] absl_deps = [ diff --git a/pc/channel.cc b/pc/channel.cc index 53933c33ed..0ed5665dae 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -89,7 +89,6 @@ enum { MSG_SEND_RTCP_PACKET, MSG_READYTOSENDDATA, MSG_DATARECEIVED, - MSG_FIRSTPACKETRECEIVED, }; static void SafeSetError(const std::string& message, std::string* error_desc) { @@ -156,7 +155,6 @@ BaseChannel::~BaseChannel() { // Eats any outstanding messages or packets. alive_->SetNotAlive(); - signaling_thread_->Clear(this); // The media channel is destroyed at the end of the destructor, since it // is a std::unique_ptr. The transport channel (rtp_transport) must outlive // the media channel. @@ -411,9 +409,11 @@ void BaseChannel::OnNetworkRouteChanged( media_channel_->OnNetworkRouteChanged(transport_name_, new_route); } -sigslot::signal1& BaseChannel::SignalFirstPacketReceived() { - RTC_DCHECK_RUN_ON(signaling_thread_); - return SignalFirstPacketReceived_; +void BaseChannel::SetFirstPacketReceivedCallback( + std::function callback) { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(!on_first_packet_received_ || !callback); + on_first_packet_received_ = std::move(callback); } void BaseChannel::OnTransportReadyToSend(bool ready) { @@ -490,6 +490,8 @@ bool BaseChannel::SendPacket(bool rtcp, } void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { + RTC_DCHECK_RUN_ON(network_thread()); + // Take packet time from the |parsed_packet|. // RtpPacketReceived.arrival_time_ms = (timestamp_us + 500) / 1000; int64_t packet_time_us = -1; @@ -497,9 +499,9 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { packet_time_us = parsed_packet.arrival_time_ms() * 1000; } - if (!has_received_packet_) { - has_received_packet_ = true; - signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED); + if (on_first_packet_received_) { + on_first_packet_received_(); + on_first_packet_received_ = nullptr; } if (!srtp_active() && srtp_required_) { @@ -830,11 +832,6 @@ void BaseChannel::OnMessage(rtc::Message* pmsg) { delete data; break; } - case MSG_FIRSTPACKETRECEIVED: { - RTC_DCHECK_RUN_ON(signaling_thread_); - SignalFirstPacketReceived_(this); - break; - } } } diff --git a/pc/channel.h b/pc/channel.h index 528e7d0ac6..5c2235e9ee 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -93,8 +93,12 @@ struct CryptoParams; // NetworkInterface. class BaseChannel : public ChannelInterface, - public rtc::MessageHandlerAutoCleanup, + // TODO(tommi): Remove MessageHandler inheritance. + public rtc::MessageHandler, + // TODO(tommi): Remove has_slots inheritance. public sigslot::has_slots<>, + // TODO(tommi): Consider implementing these interfaces + // via composition. public MediaChannel::NetworkInterface, public webrtc::RtpPacketSinkInterface { public: @@ -175,7 +179,7 @@ class BaseChannel : public ChannelInterface, } // Used for latency measurements. - sigslot::signal1& SignalFirstPacketReceived() override; + void SetFirstPacketReceivedCallback(std::function callback) override; // From RtpTransport - public for testing only void OnTransportReadyToSend(bool ready); @@ -319,12 +323,11 @@ class BaseChannel : public ChannelInterface, rtc::Thread* const network_thread_; rtc::Thread* const signaling_thread_; rtc::scoped_refptr alive_; - sigslot::signal1 SignalFirstPacketReceived_ - RTC_GUARDED_BY(signaling_thread_); const std::string content_name_; - bool has_received_packet_ = false; + std::function on_first_packet_received_ + RTC_GUARDED_BY(network_thread()); // Won't be set when using raw packet transports. SDP-specific thing. // TODO(bugs.webrtc.org/12230): Written on network thread, read on diff --git a/pc/channel_interface.h b/pc/channel_interface.h index 46170a721b..fced8cc267 100644 --- a/pc/channel_interface.h +++ b/pc/channel_interface.h @@ -41,7 +41,8 @@ class ChannelInterface { virtual void Enable(bool enable) = 0; // Used for latency measurements. - virtual sigslot::signal1& SignalFirstPacketReceived() = 0; + virtual void SetFirstPacketReceivedCallback( + std::function callback) = 0; // Channel control virtual bool SetLocalContent(const MediaContentDescription* content, diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 1efb0ab20a..0323636f02 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -631,10 +631,12 @@ RTCError PeerConnection::Initialize( if (!IsUnifiedPlan()) { rtp_manager()->transceivers()->Add( RtpTransceiverProxyWithInternal::Create( - signaling_thread(), new RtpTransceiver(cricket::MEDIA_TYPE_AUDIO))); + signaling_thread(), + new RtpTransceiver(cricket::MEDIA_TYPE_AUDIO, channel_manager()))); rtp_manager()->transceivers()->Add( RtpTransceiverProxyWithInternal::Create( - signaling_thread(), new RtpTransceiver(cricket::MEDIA_TYPE_VIDEO))); + signaling_thread(), + new RtpTransceiver(cricket::MEDIA_TYPE_VIDEO, channel_manager()))); } int delay_ms = configuration.report_usage_pattern_delay_ms diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index d2d05bcedd..15c4c4c4d1 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -25,6 +25,7 @@ #include "pc/session_description.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/thread.h" namespace webrtc { @@ -112,12 +113,16 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() { } // namespace -RtpTransceiver::RtpTransceiver(cricket::MediaType media_type) +RtpTransceiver::RtpTransceiver( + cricket::MediaType media_type, + cricket::ChannelManager* channel_manager /* = nullptr*/) : thread_(GetCurrentTaskQueueOrThread()), unified_plan_(false), - media_type_(media_type) { + media_type_(media_type), + channel_manager_(channel_manager) { RTC_DCHECK(media_type == cricket::MEDIA_TYPE_AUDIO || media_type == cricket::MEDIA_TYPE_VIDEO); + RTC_DCHECK(channel_manager_); } RtpTransceiver::RtpTransceiver( @@ -136,11 +141,15 @@ RtpTransceiver::RtpTransceiver( RTC_DCHECK(media_type_ == cricket::MEDIA_TYPE_AUDIO || media_type_ == cricket::MEDIA_TYPE_VIDEO); RTC_DCHECK_EQ(sender->media_type(), receiver->media_type()); + RTC_DCHECK(channel_manager_); senders_.push_back(sender); receivers_.push_back(receiver); } RtpTransceiver::~RtpTransceiver() { + // TODO(tommi): On Android, when running PeerConnectionClientTest (e.g. + // PeerConnectionClientTest#testCameraSwitch), the instance doesn't get + // deleted on `thread_`. See if we can fix that. if (!stopped_) { RTC_DCHECK_RUN_ON(thread_); StopInternal(); @@ -148,34 +157,57 @@ RtpTransceiver::~RtpTransceiver() { } void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) { + RTC_DCHECK_RUN_ON(thread_); // Cannot set a non-null channel on a stopped transceiver. if (stopped_ && channel) { return; } + RTC_DCHECK(channel || channel_); + RTC_LOG_THREAD_BLOCK_COUNT(); + if (channel_) { + signaling_thread_safety_->SetNotAlive(); + signaling_thread_safety_ = nullptr; + } + if (channel) { RTC_DCHECK_EQ(media_type(), channel->media_type()); + signaling_thread_safety_ = PendingTaskSafetyFlag::Create(); } - if (channel_) { - channel_->SignalFirstPacketReceived().disconnect(this); - } + // An alternative to this, could be to require SetChannel to be called + // on the network thread. The channel object operates for the most part + // on the network thread, as part of its initialization being on the network + // thread is required, so setting a channel object as part of the construction + // (without thread hopping) might be the more efficient thing to do than + // how SetChannel works today. + // Similarly, if the channel() accessor is limited to the network thread, that + // helps with keeping the channel implementation requirements being met and + // avoids synchronization for accessing the pointer or network related state. + channel_manager_->network_thread()->Invoke(RTC_FROM_HERE, [&]() { + if (channel_) { + channel_->SetFirstPacketReceivedCallback(nullptr); + } - channel_ = channel; + channel_ = channel; - if (channel_) { - channel_->SignalFirstPacketReceived().connect( - this, &RtpTransceiver::OnFirstPacketReceived); - } + if (channel_) { + channel_->SetFirstPacketReceivedCallback( + [thread = thread_, flag = signaling_thread_safety_, this]() mutable { + thread->PostTask(ToQueuedTask( + std::move(flag), [this]() { OnFirstPacketReceived(); })); + }); + } + }); for (const auto& sender : senders_) { sender->internal()->SetMediaChannel(channel_ ? channel_->media_channel() : nullptr); } - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); for (const auto& receiver : receivers_) { if (!channel_) { @@ -188,12 +220,11 @@ void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) { receiver->internal()->SetMediaChannel(channel_ ? channel_->media_channel() : nullptr); } - - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(receivers_.size() * 2); } void RtpTransceiver::AddSender( rtc::scoped_refptr> sender) { + RTC_DCHECK_RUN_ON(thread_); RTC_DCHECK(!stopped_); RTC_DCHECK(!unified_plan_); RTC_DCHECK(sender); @@ -219,6 +250,7 @@ bool RtpTransceiver::RemoveSender(RtpSenderInterface* sender) { void RtpTransceiver::AddReceiver( rtc::scoped_refptr> receiver) { + RTC_DCHECK_RUN_ON(thread_); RTC_DCHECK(!stopped_); RTC_DCHECK(!unified_plan_); RTC_DCHECK(receiver); @@ -267,7 +299,7 @@ absl::optional RtpTransceiver::mid() const { return mid_; } -void RtpTransceiver::OnFirstPacketReceived(cricket::ChannelInterface*) { +void RtpTransceiver::OnFirstPacketReceived() { for (const auto& receiver : receivers_) { receiver->internal()->NotifyFirstPacketReceived(); } @@ -304,6 +336,7 @@ void RtpTransceiver::set_fired_direction(RtpTransceiverDirection direction) { } bool RtpTransceiver::stopped() const { + RTC_DCHECK_RUN_ON(thread_); return stopped_; } diff --git a/pc/rtp_transceiver.h b/pc/rtp_transceiver.h index 8d2d72857d..32da9af601 100644 --- a/pc/rtp_transceiver.h +++ b/pc/rtp_transceiver.h @@ -35,6 +35,7 @@ #include "pc/rtp_receiver.h" #include "pc/rtp_sender.h" #include "rtc_base/ref_counted_object.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread_annotations.h" @@ -78,7 +79,8 @@ class RtpTransceiver final // channel set. // |media_type| specifies the type of RtpTransceiver (and, by transitivity, // the type of senders, receivers, and channel). Can either by audio or video. - explicit RtpTransceiver(cricket::MediaType media_type); + RtpTransceiver(cricket::MediaType media_type, + cricket::ChannelManager* channel_manager); // Construct a Unified Plan-style RtpTransceiver with the given sender and // receiver. The media type will be derived from the media types of the sender // and receiver. The sender and receiver should have the same media type. @@ -232,20 +234,21 @@ class RtpTransceiver final header_extensions_to_offer) override; private: - void OnFirstPacketReceived(cricket::ChannelInterface* channel); + void OnFirstPacketReceived(); void StopSendingAndReceiving(); // Enforce that this object is created, used and destroyed on one thread. - const TaskQueueBase* thread_; + TaskQueueBase* const thread_; const bool unified_plan_; const cricket::MediaType media_type_; + rtc::scoped_refptr signaling_thread_safety_; std::vector>> senders_; std::vector< rtc::scoped_refptr>> receivers_; - bool stopped_ = false; + bool stopped_ RTC_GUARDED_BY(thread_) = false; bool stopping_ RTC_GUARDED_BY(thread_) = false; bool is_pc_closed_ = false; RtpTransceiverDirection direction_ = RtpTransceiverDirection::kInactive; diff --git a/pc/rtp_transceiver_unittest.cc b/pc/rtp_transceiver_unittest.cc index c518aae0c7..523f30737a 100644 --- a/pc/rtp_transceiver_unittest.cc +++ b/pc/rtp_transceiver_unittest.cc @@ -23,6 +23,7 @@ #include "test/gmock.h" #include "test/gtest.h" +using ::testing::_; using ::testing::ElementsAre; using ::testing::Optional; using ::testing::Property; @@ -33,13 +34,13 @@ namespace webrtc { // Checks that a channel cannot be set on a stopped |RtpTransceiver|. TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO); + auto cm = cricket::ChannelManager::Create( + nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, cm.get()); cricket::MockChannelInterface channel1; - sigslot::signal1 signal; EXPECT_CALL(channel1, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); - EXPECT_CALL(channel1, SignalFirstPacketReceived()) - .WillRepeatedly(ReturnRef(signal)); + EXPECT_CALL(channel1, SetFirstPacketReceivedCallback(_)); transceiver.SetChannel(&channel1); EXPECT_EQ(&channel1, transceiver.channel()); @@ -59,13 +60,14 @@ TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { // Checks that a channel can be unset on a stopped |RtpTransceiver| TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO); + auto cm = cricket::ChannelManager::Create( + nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, cm.get()); cricket::MockChannelInterface channel; - sigslot::signal1 signal; EXPECT_CALL(channel, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_VIDEO)); - EXPECT_CALL(channel, SignalFirstPacketReceived()) - .WillRepeatedly(ReturnRef(signal)); + EXPECT_CALL(channel, SetFirstPacketReceivedCallback(_)) + .WillRepeatedly(testing::Return()); transceiver.SetChannel(&channel); EXPECT_EQ(&channel, transceiver.channel()); @@ -89,20 +91,40 @@ class RtpTransceiverUnifiedPlanTest : public ::testing::Test { rtc::Thread::Current())), transceiver_(RtpSenderProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + sender_), RtpReceiverProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + receiver_), channel_manager_.get(), channel_manager_->GetSupportedAudioRtpHeaderExtensions(), /* on_negotiation_needed= */ [] {}) {} + static rtc::scoped_refptr MockReceiver() { + auto receiver = rtc::make_ref_counted(); + EXPECT_CALL(*receiver.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return receiver; + } + + static rtc::scoped_refptr MockSender() { + auto sender = rtc::make_ref_counted(); + EXPECT_CALL(*sender.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return sender; + } + + rtc::scoped_refptr receiver_ = MockReceiver(); + rtc::scoped_refptr sender_ = MockSender(); std::unique_ptr channel_manager_; RtpTransceiver transceiver_; }; // Basic tests for Stop() TEST_F(RtpTransceiverUnifiedPlanTest, StopSetsDirection) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + EXPECT_EQ(RtpTransceiverDirection::kInactive, transceiver_.direction()); EXPECT_FALSE(transceiver_.current_direction()); transceiver_.StopStandard(); @@ -138,24 +160,49 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { RtpTransceiverDirection::kSendRecv)}), transceiver_(RtpSenderProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + sender_), RtpReceiverProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + receiver_), channel_manager_.get(), extensions_, /* on_negotiation_needed= */ [] {}) {} + static rtc::scoped_refptr MockReceiver() { + auto receiver = rtc::make_ref_counted(); + EXPECT_CALL(*receiver.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return receiver; + } + + static rtc::scoped_refptr MockSender() { + auto sender = rtc::make_ref_counted(); + EXPECT_CALL(*sender.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return sender; + } + + rtc::scoped_refptr receiver_ = MockReceiver(); + rtc::scoped_refptr sender_ = MockSender(); + std::unique_ptr channel_manager_; std::vector extensions_; RtpTransceiver transceiver_; }; TEST_F(RtpTransceiverTestForHeaderExtensions, OffersChannelManagerList) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + EXPECT_EQ(transceiver_.HeaderExtensionsToOffer(), extensions_); } TEST_F(RtpTransceiverTestForHeaderExtensions, ModifiesDirection) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + auto modified_extensions = extensions_; modified_extensions[0].direction = RtpTransceiverDirection::kSendOnly; EXPECT_TRUE( @@ -176,6 +223,10 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ModifiesDirection) { } TEST_F(RtpTransceiverTestForHeaderExtensions, AcceptsStoppedExtension) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + auto modified_extensions = extensions_; modified_extensions[0].direction = RtpTransceiverDirection::kStopped; EXPECT_TRUE( @@ -184,6 +235,10 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, AcceptsStoppedExtension) { } TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsUnsupportedExtension) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + std::vector modified_extensions( {RtpHeaderExtensionCapability("uri3", 1, RtpTransceiverDirection::kSendRecv)}); @@ -194,6 +249,10 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsUnsupportedExtension) { TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsStoppedMandatoryExtensions) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + std::vector modified_extensions = extensions_; // Attempting to stop the mandatory MID extension. modified_extensions[2].direction = RtpTransceiverDirection::kStopped; @@ -210,24 +269,43 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, TEST_F(RtpTransceiverTestForHeaderExtensions, NoNegotiatedHdrExtsWithoutChannel) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre()); } TEST_F(RtpTransceiverTestForHeaderExtensions, NoNegotiatedHdrExtsWithChannelWithoutNegotiation) { + EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); cricket::MockChannelInterface mock_channel; - sigslot::signal1 signal; - ON_CALL(mock_channel, SignalFirstPacketReceived) - .WillByDefault(ReturnRef(signal)); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); + cricket::RtpHeaderExtensions extensions; + EXPECT_CALL(mock_channel, GetNegotiatedRtpHeaderExtensions) + .WillOnce(Return(extensions)); transceiver_.SetChannel(&mock_channel); EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre()); } TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { + EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + cricket::MockChannelInterface mock_channel; - sigslot::signal1 signal; - ON_CALL(mock_channel, SignalFirstPacketReceived) - .WillByDefault(ReturnRef(signal)); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); cricket::RtpHeaderExtensions extensions = {webrtc::RtpExtension("uri1", 1), webrtc::RtpExtension("uri2", 2)}; EXPECT_CALL(mock_channel, GetNegotiatedRtpHeaderExtensions) @@ -242,10 +320,18 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExtsSecondTime) { + EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + cricket::MockChannelInterface mock_channel; - sigslot::signal1 signal; - ON_CALL(mock_channel, SignalFirstPacketReceived) - .WillByDefault(ReturnRef(signal)); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); + cricket::RtpHeaderExtensions extensions = {webrtc::RtpExtension("uri1", 1), webrtc::RtpExtension("uri2", 2)}; @@ -254,6 +340,9 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, transceiver_.SetChannel(&mock_channel); transceiver_.HeaderExtensionsNegotiated(); testing::Mock::VerifyAndClearExpectations(&mock_channel); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); extensions = {webrtc::RtpExtension("uri3", 4), webrtc::RtpExtension("uri5", 6)}; diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 91ea3794bf..e3fe002207 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -4586,14 +4586,19 @@ void SdpOfferAnswerHandler::DestroyTransceiverChannel( RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); if (channel) { // TODO(tommi): VideoRtpReceiver::SetMediaChannel blocks and jumps to the - // worker thread. When being set to nullptrpus, there are additional + // worker thread. When being set to nullptr, there are additional // blocking calls to e.g. ClearRecordableEncodedFrameCallback which triggers // another blocking call or Stop() for video channels. + // The channel object also needs to be de-initialized on the network thread + // so if ownership of the channel object lies with the transceiver, we could + // un-set the channel pointer and uninitialize/destruct the channel object + // at the same time, rather than in separate steps. transceiver->internal()->SetChannel(nullptr); - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(2); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(3); // TODO(tommi): All channel objects end up getting deleted on the - // worker thread. Can DestroyTransceiverChannel be purely posted to the - // worker? + // worker thread (ideally should be on the network thread but the + // MediaChannel objects are tied to the worker. Can the teardown be done + // asynchronously across the threads rather than blocking? DestroyChannelInterface(channel); } } diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index 3f3e0a9ee0..df697b3155 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -388,7 +388,8 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { } } auto transceiver = RtpTransceiverProxyWithInternal::Create( - signaling_thread_, new RtpTransceiver(media_type)); + signaling_thread_, + new RtpTransceiver(media_type, channel_manager_.get())); transceivers_.push_back(transceiver); return transceiver; } @@ -397,6 +398,12 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { rtc::Thread* const worker_thread_; rtc::Thread* const signaling_thread_; + std::unique_ptr channel_manager_ = + cricket::ChannelManager::Create(nullptr /* MediaEngineInterface */, + true, + worker_thread_, + network_thread_); + rtc::scoped_refptr local_streams_; rtc::scoped_refptr remote_streams_; diff --git a/pc/test/mock_channel_interface.h b/pc/test/mock_channel_interface.h index 5d3c66d1ae..d376e423d5 100644 --- a/pc/test/mock_channel_interface.h +++ b/pc/test/mock_channel_interface.h @@ -29,9 +29,9 @@ class MockChannelInterface : public cricket::ChannelInterface { MOCK_METHOD(const std::string&, transport_name, (), (const, override)); MOCK_METHOD(const std::string&, content_name, (), (const, override)); MOCK_METHOD(void, Enable, (bool), (override)); - MOCK_METHOD(sigslot::signal1&, - SignalFirstPacketReceived, - (), + MOCK_METHOD(void, + SetFirstPacketReceivedCallback, + (std::function), (override)); MOCK_METHOD(bool, SetLocalContent,