From 99c8a80b8ea9b204d69216d8f1e0405fa8bfb0fb Mon Sep 17 00:00:00 2001 From: Tommi Date: Tue, 27 Apr 2021 15:00:00 +0200 Subject: [PATCH] Change the first-packet-received notification in Channel. This changes the notification to a single std::function pointer instead of being a sigslot::signal1<> collection. Summary: * Remove SignalFirstPacketReceived_, the last sigslot member variable. (still inherits from sigslot::has_slots<>) * BaseChannel doesn't post to the signaling thread anymore. The only reason that remains for the signaling_thread_ variable, is for thread checking. * Remove BaseChannel's reliance on MessageHandlerAutoCleanup (still inherits from MessageHandler) RtpTransceiver is the consumer of this event. That class is also the class that sits between the PC classes and the channel object, holding a pointer to the channel and managing calls that come in on the signaling thread, such as SetChannel. The responsibility of delivering the first packet received on the signaling thread is now with RtpTransceiver: * RtpTransceiver always requires a ChannelManager instance. Previously this variable was sometimes set, but it's now required. * Updated tests in rtp_transceiver_unittest.cc to include a ChannelManager as well as fix them to include call expectations for mock sender and receivers. Bug: webrtc:11993, webrtc:11988 Change-Id: If49d6be157cd7599fa6fe3a42cd0a363464e3a74 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215979 Commit-Queue: Tommi Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#33853} --- pc/BUILD.gn | 2 + pc/channel.cc | 23 ++-- pc/channel.h | 13 ++- pc/channel_interface.h | 3 +- pc/peer_connection.cc | 6 +- pc/rtp_transceiver.cc | 61 ++++++++--- pc/rtp_transceiver.h | 11 +- pc/rtp_transceiver_unittest.cc | 131 +++++++++++++++++++---- pc/sdp_offer_answer.cc | 13 ++- pc/test/fake_peer_connection_for_stats.h | 9 +- pc/test/mock_channel_interface.h | 6 +- 11 files changed, 210 insertions(+), 68 deletions(-) diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 106ee55025..e4311ec952 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -400,6 +400,8 @@ rtc_library("rtp_transceiver") { "../rtc_base:macromagic", "../rtc_base:refcount", "../rtc_base:threading", + "../rtc_base/task_utils:pending_task_safety_flag", + "../rtc_base/task_utils:to_queued_task", "../rtc_base/third_party/sigslot", ] absl_deps = [ diff --git a/pc/channel.cc b/pc/channel.cc index 53933c33ed..0ed5665dae 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -89,7 +89,6 @@ enum { MSG_SEND_RTCP_PACKET, MSG_READYTOSENDDATA, MSG_DATARECEIVED, - MSG_FIRSTPACKETRECEIVED, }; static void SafeSetError(const std::string& message, std::string* error_desc) { @@ -156,7 +155,6 @@ BaseChannel::~BaseChannel() { // Eats any outstanding messages or packets. alive_->SetNotAlive(); - signaling_thread_->Clear(this); // The media channel is destroyed at the end of the destructor, since it // is a std::unique_ptr. The transport channel (rtp_transport) must outlive // the media channel. @@ -411,9 +409,11 @@ void BaseChannel::OnNetworkRouteChanged( media_channel_->OnNetworkRouteChanged(transport_name_, new_route); } -sigslot::signal1& BaseChannel::SignalFirstPacketReceived() { - RTC_DCHECK_RUN_ON(signaling_thread_); - return SignalFirstPacketReceived_; +void BaseChannel::SetFirstPacketReceivedCallback( + std::function callback) { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(!on_first_packet_received_ || !callback); + on_first_packet_received_ = std::move(callback); } void BaseChannel::OnTransportReadyToSend(bool ready) { @@ -490,6 +490,8 @@ bool BaseChannel::SendPacket(bool rtcp, } void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { + RTC_DCHECK_RUN_ON(network_thread()); + // Take packet time from the |parsed_packet|. // RtpPacketReceived.arrival_time_ms = (timestamp_us + 500) / 1000; int64_t packet_time_us = -1; @@ -497,9 +499,9 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { packet_time_us = parsed_packet.arrival_time_ms() * 1000; } - if (!has_received_packet_) { - has_received_packet_ = true; - signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED); + if (on_first_packet_received_) { + on_first_packet_received_(); + on_first_packet_received_ = nullptr; } if (!srtp_active() && srtp_required_) { @@ -830,11 +832,6 @@ void BaseChannel::OnMessage(rtc::Message* pmsg) { delete data; break; } - case MSG_FIRSTPACKETRECEIVED: { - RTC_DCHECK_RUN_ON(signaling_thread_); - SignalFirstPacketReceived_(this); - break; - } } } diff --git a/pc/channel.h b/pc/channel.h index 528e7d0ac6..5c2235e9ee 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -93,8 +93,12 @@ struct CryptoParams; // NetworkInterface. class BaseChannel : public ChannelInterface, - public rtc::MessageHandlerAutoCleanup, + // TODO(tommi): Remove MessageHandler inheritance. + public rtc::MessageHandler, + // TODO(tommi): Remove has_slots inheritance. public sigslot::has_slots<>, + // TODO(tommi): Consider implementing these interfaces + // via composition. public MediaChannel::NetworkInterface, public webrtc::RtpPacketSinkInterface { public: @@ -175,7 +179,7 @@ class BaseChannel : public ChannelInterface, } // Used for latency measurements. - sigslot::signal1& SignalFirstPacketReceived() override; + void SetFirstPacketReceivedCallback(std::function callback) override; // From RtpTransport - public for testing only void OnTransportReadyToSend(bool ready); @@ -319,12 +323,11 @@ class BaseChannel : public ChannelInterface, rtc::Thread* const network_thread_; rtc::Thread* const signaling_thread_; rtc::scoped_refptr alive_; - sigslot::signal1 SignalFirstPacketReceived_ - RTC_GUARDED_BY(signaling_thread_); const std::string content_name_; - bool has_received_packet_ = false; + std::function on_first_packet_received_ + RTC_GUARDED_BY(network_thread()); // Won't be set when using raw packet transports. SDP-specific thing. // TODO(bugs.webrtc.org/12230): Written on network thread, read on diff --git a/pc/channel_interface.h b/pc/channel_interface.h index 46170a721b..fced8cc267 100644 --- a/pc/channel_interface.h +++ b/pc/channel_interface.h @@ -41,7 +41,8 @@ class ChannelInterface { virtual void Enable(bool enable) = 0; // Used for latency measurements. - virtual sigslot::signal1& SignalFirstPacketReceived() = 0; + virtual void SetFirstPacketReceivedCallback( + std::function callback) = 0; // Channel control virtual bool SetLocalContent(const MediaContentDescription* content, diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 1efb0ab20a..0323636f02 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -631,10 +631,12 @@ RTCError PeerConnection::Initialize( if (!IsUnifiedPlan()) { rtp_manager()->transceivers()->Add( RtpTransceiverProxyWithInternal::Create( - signaling_thread(), new RtpTransceiver(cricket::MEDIA_TYPE_AUDIO))); + signaling_thread(), + new RtpTransceiver(cricket::MEDIA_TYPE_AUDIO, channel_manager()))); rtp_manager()->transceivers()->Add( RtpTransceiverProxyWithInternal::Create( - signaling_thread(), new RtpTransceiver(cricket::MEDIA_TYPE_VIDEO))); + signaling_thread(), + new RtpTransceiver(cricket::MEDIA_TYPE_VIDEO, channel_manager()))); } int delay_ms = configuration.report_usage_pattern_delay_ms diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index d2d05bcedd..15c4c4c4d1 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -25,6 +25,7 @@ #include "pc/session_description.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/thread.h" namespace webrtc { @@ -112,12 +113,16 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() { } // namespace -RtpTransceiver::RtpTransceiver(cricket::MediaType media_type) +RtpTransceiver::RtpTransceiver( + cricket::MediaType media_type, + cricket::ChannelManager* channel_manager /* = nullptr*/) : thread_(GetCurrentTaskQueueOrThread()), unified_plan_(false), - media_type_(media_type) { + media_type_(media_type), + channel_manager_(channel_manager) { RTC_DCHECK(media_type == cricket::MEDIA_TYPE_AUDIO || media_type == cricket::MEDIA_TYPE_VIDEO); + RTC_DCHECK(channel_manager_); } RtpTransceiver::RtpTransceiver( @@ -136,11 +141,15 @@ RtpTransceiver::RtpTransceiver( RTC_DCHECK(media_type_ == cricket::MEDIA_TYPE_AUDIO || media_type_ == cricket::MEDIA_TYPE_VIDEO); RTC_DCHECK_EQ(sender->media_type(), receiver->media_type()); + RTC_DCHECK(channel_manager_); senders_.push_back(sender); receivers_.push_back(receiver); } RtpTransceiver::~RtpTransceiver() { + // TODO(tommi): On Android, when running PeerConnectionClientTest (e.g. + // PeerConnectionClientTest#testCameraSwitch), the instance doesn't get + // deleted on `thread_`. See if we can fix that. if (!stopped_) { RTC_DCHECK_RUN_ON(thread_); StopInternal(); @@ -148,34 +157,57 @@ RtpTransceiver::~RtpTransceiver() { } void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) { + RTC_DCHECK_RUN_ON(thread_); // Cannot set a non-null channel on a stopped transceiver. if (stopped_ && channel) { return; } + RTC_DCHECK(channel || channel_); + RTC_LOG_THREAD_BLOCK_COUNT(); + if (channel_) { + signaling_thread_safety_->SetNotAlive(); + signaling_thread_safety_ = nullptr; + } + if (channel) { RTC_DCHECK_EQ(media_type(), channel->media_type()); + signaling_thread_safety_ = PendingTaskSafetyFlag::Create(); } - if (channel_) { - channel_->SignalFirstPacketReceived().disconnect(this); - } + // An alternative to this, could be to require SetChannel to be called + // on the network thread. The channel object operates for the most part + // on the network thread, as part of its initialization being on the network + // thread is required, so setting a channel object as part of the construction + // (without thread hopping) might be the more efficient thing to do than + // how SetChannel works today. + // Similarly, if the channel() accessor is limited to the network thread, that + // helps with keeping the channel implementation requirements being met and + // avoids synchronization for accessing the pointer or network related state. + channel_manager_->network_thread()->Invoke(RTC_FROM_HERE, [&]() { + if (channel_) { + channel_->SetFirstPacketReceivedCallback(nullptr); + } - channel_ = channel; + channel_ = channel; - if (channel_) { - channel_->SignalFirstPacketReceived().connect( - this, &RtpTransceiver::OnFirstPacketReceived); - } + if (channel_) { + channel_->SetFirstPacketReceivedCallback( + [thread = thread_, flag = signaling_thread_safety_, this]() mutable { + thread->PostTask(ToQueuedTask( + std::move(flag), [this]() { OnFirstPacketReceived(); })); + }); + } + }); for (const auto& sender : senders_) { sender->internal()->SetMediaChannel(channel_ ? channel_->media_channel() : nullptr); } - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); for (const auto& receiver : receivers_) { if (!channel_) { @@ -188,12 +220,11 @@ void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) { receiver->internal()->SetMediaChannel(channel_ ? channel_->media_channel() : nullptr); } - - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(receivers_.size() * 2); } void RtpTransceiver::AddSender( rtc::scoped_refptr> sender) { + RTC_DCHECK_RUN_ON(thread_); RTC_DCHECK(!stopped_); RTC_DCHECK(!unified_plan_); RTC_DCHECK(sender); @@ -219,6 +250,7 @@ bool RtpTransceiver::RemoveSender(RtpSenderInterface* sender) { void RtpTransceiver::AddReceiver( rtc::scoped_refptr> receiver) { + RTC_DCHECK_RUN_ON(thread_); RTC_DCHECK(!stopped_); RTC_DCHECK(!unified_plan_); RTC_DCHECK(receiver); @@ -267,7 +299,7 @@ absl::optional RtpTransceiver::mid() const { return mid_; } -void RtpTransceiver::OnFirstPacketReceived(cricket::ChannelInterface*) { +void RtpTransceiver::OnFirstPacketReceived() { for (const auto& receiver : receivers_) { receiver->internal()->NotifyFirstPacketReceived(); } @@ -304,6 +336,7 @@ void RtpTransceiver::set_fired_direction(RtpTransceiverDirection direction) { } bool RtpTransceiver::stopped() const { + RTC_DCHECK_RUN_ON(thread_); return stopped_; } diff --git a/pc/rtp_transceiver.h b/pc/rtp_transceiver.h index 8d2d72857d..32da9af601 100644 --- a/pc/rtp_transceiver.h +++ b/pc/rtp_transceiver.h @@ -35,6 +35,7 @@ #include "pc/rtp_receiver.h" #include "pc/rtp_sender.h" #include "rtc_base/ref_counted_object.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread_annotations.h" @@ -78,7 +79,8 @@ class RtpTransceiver final // channel set. // |media_type| specifies the type of RtpTransceiver (and, by transitivity, // the type of senders, receivers, and channel). Can either by audio or video. - explicit RtpTransceiver(cricket::MediaType media_type); + RtpTransceiver(cricket::MediaType media_type, + cricket::ChannelManager* channel_manager); // Construct a Unified Plan-style RtpTransceiver with the given sender and // receiver. The media type will be derived from the media types of the sender // and receiver. The sender and receiver should have the same media type. @@ -232,20 +234,21 @@ class RtpTransceiver final header_extensions_to_offer) override; private: - void OnFirstPacketReceived(cricket::ChannelInterface* channel); + void OnFirstPacketReceived(); void StopSendingAndReceiving(); // Enforce that this object is created, used and destroyed on one thread. - const TaskQueueBase* thread_; + TaskQueueBase* const thread_; const bool unified_plan_; const cricket::MediaType media_type_; + rtc::scoped_refptr signaling_thread_safety_; std::vector>> senders_; std::vector< rtc::scoped_refptr>> receivers_; - bool stopped_ = false; + bool stopped_ RTC_GUARDED_BY(thread_) = false; bool stopping_ RTC_GUARDED_BY(thread_) = false; bool is_pc_closed_ = false; RtpTransceiverDirection direction_ = RtpTransceiverDirection::kInactive; diff --git a/pc/rtp_transceiver_unittest.cc b/pc/rtp_transceiver_unittest.cc index c518aae0c7..523f30737a 100644 --- a/pc/rtp_transceiver_unittest.cc +++ b/pc/rtp_transceiver_unittest.cc @@ -23,6 +23,7 @@ #include "test/gmock.h" #include "test/gtest.h" +using ::testing::_; using ::testing::ElementsAre; using ::testing::Optional; using ::testing::Property; @@ -33,13 +34,13 @@ namespace webrtc { // Checks that a channel cannot be set on a stopped |RtpTransceiver|. TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO); + auto cm = cricket::ChannelManager::Create( + nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, cm.get()); cricket::MockChannelInterface channel1; - sigslot::signal1 signal; EXPECT_CALL(channel1, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); - EXPECT_CALL(channel1, SignalFirstPacketReceived()) - .WillRepeatedly(ReturnRef(signal)); + EXPECT_CALL(channel1, SetFirstPacketReceivedCallback(_)); transceiver.SetChannel(&channel1); EXPECT_EQ(&channel1, transceiver.channel()); @@ -59,13 +60,14 @@ TEST(RtpTransceiverTest, CannotSetChannelOnStoppedTransceiver) { // Checks that a channel can be unset on a stopped |RtpTransceiver| TEST(RtpTransceiverTest, CanUnsetChannelOnStoppedTransceiver) { - RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO); + auto cm = cricket::ChannelManager::Create( + nullptr, true, rtc::Thread::Current(), rtc::Thread::Current()); + RtpTransceiver transceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, cm.get()); cricket::MockChannelInterface channel; - sigslot::signal1 signal; EXPECT_CALL(channel, media_type()) .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_VIDEO)); - EXPECT_CALL(channel, SignalFirstPacketReceived()) - .WillRepeatedly(ReturnRef(signal)); + EXPECT_CALL(channel, SetFirstPacketReceivedCallback(_)) + .WillRepeatedly(testing::Return()); transceiver.SetChannel(&channel); EXPECT_EQ(&channel, transceiver.channel()); @@ -89,20 +91,40 @@ class RtpTransceiverUnifiedPlanTest : public ::testing::Test { rtc::Thread::Current())), transceiver_(RtpSenderProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + sender_), RtpReceiverProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + receiver_), channel_manager_.get(), channel_manager_->GetSupportedAudioRtpHeaderExtensions(), /* on_negotiation_needed= */ [] {}) {} + static rtc::scoped_refptr MockReceiver() { + auto receiver = rtc::make_ref_counted(); + EXPECT_CALL(*receiver.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return receiver; + } + + static rtc::scoped_refptr MockSender() { + auto sender = rtc::make_ref_counted(); + EXPECT_CALL(*sender.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return sender; + } + + rtc::scoped_refptr receiver_ = MockReceiver(); + rtc::scoped_refptr sender_ = MockSender(); std::unique_ptr channel_manager_; RtpTransceiver transceiver_; }; // Basic tests for Stop() TEST_F(RtpTransceiverUnifiedPlanTest, StopSetsDirection) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + EXPECT_EQ(RtpTransceiverDirection::kInactive, transceiver_.direction()); EXPECT_FALSE(transceiver_.current_direction()); transceiver_.StopStandard(); @@ -138,24 +160,49 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { RtpTransceiverDirection::kSendRecv)}), transceiver_(RtpSenderProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + sender_), RtpReceiverProxyWithInternal::Create( rtc::Thread::Current(), - new rtc::RefCountedObject()), + receiver_), channel_manager_.get(), extensions_, /* on_negotiation_needed= */ [] {}) {} + static rtc::scoped_refptr MockReceiver() { + auto receiver = rtc::make_ref_counted(); + EXPECT_CALL(*receiver.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return receiver; + } + + static rtc::scoped_refptr MockSender() { + auto sender = rtc::make_ref_counted(); + EXPECT_CALL(*sender.get(), media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + return sender; + } + + rtc::scoped_refptr receiver_ = MockReceiver(); + rtc::scoped_refptr sender_ = MockSender(); + std::unique_ptr channel_manager_; std::vector extensions_; RtpTransceiver transceiver_; }; TEST_F(RtpTransceiverTestForHeaderExtensions, OffersChannelManagerList) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + EXPECT_EQ(transceiver_.HeaderExtensionsToOffer(), extensions_); } TEST_F(RtpTransceiverTestForHeaderExtensions, ModifiesDirection) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + auto modified_extensions = extensions_; modified_extensions[0].direction = RtpTransceiverDirection::kSendOnly; EXPECT_TRUE( @@ -176,6 +223,10 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ModifiesDirection) { } TEST_F(RtpTransceiverTestForHeaderExtensions, AcceptsStoppedExtension) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + auto modified_extensions = extensions_; modified_extensions[0].direction = RtpTransceiverDirection::kStopped; EXPECT_TRUE( @@ -184,6 +235,10 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, AcceptsStoppedExtension) { } TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsUnsupportedExtension) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + std::vector modified_extensions( {RtpHeaderExtensionCapability("uri3", 1, RtpTransceiverDirection::kSendRecv)}); @@ -194,6 +249,10 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsUnsupportedExtension) { TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsStoppedMandatoryExtensions) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + std::vector modified_extensions = extensions_; // Attempting to stop the mandatory MID extension. modified_extensions[2].direction = RtpTransceiverDirection::kStopped; @@ -210,24 +269,43 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, TEST_F(RtpTransceiverTestForHeaderExtensions, NoNegotiatedHdrExtsWithoutChannel) { + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre()); } TEST_F(RtpTransceiverTestForHeaderExtensions, NoNegotiatedHdrExtsWithChannelWithoutNegotiation) { + EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); cricket::MockChannelInterface mock_channel; - sigslot::signal1 signal; - ON_CALL(mock_channel, SignalFirstPacketReceived) - .WillByDefault(ReturnRef(signal)); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); + cricket::RtpHeaderExtensions extensions; + EXPECT_CALL(mock_channel, GetNegotiatedRtpHeaderExtensions) + .WillOnce(Return(extensions)); transceiver_.SetChannel(&mock_channel); EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre()); } TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { + EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + cricket::MockChannelInterface mock_channel; - sigslot::signal1 signal; - ON_CALL(mock_channel, SignalFirstPacketReceived) - .WillByDefault(ReturnRef(signal)); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); cricket::RtpHeaderExtensions extensions = {webrtc::RtpExtension("uri1", 1), webrtc::RtpExtension("uri2", 2)}; EXPECT_CALL(mock_channel, GetNegotiatedRtpHeaderExtensions) @@ -242,10 +320,18 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExtsSecondTime) { + EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); + EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); + EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); + EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); + EXPECT_CALL(*sender_.get(), Stop()); + cricket::MockChannelInterface mock_channel; - sigslot::signal1 signal; - ON_CALL(mock_channel, SignalFirstPacketReceived) - .WillByDefault(ReturnRef(signal)); + EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); + cricket::RtpHeaderExtensions extensions = {webrtc::RtpExtension("uri1", 1), webrtc::RtpExtension("uri2", 2)}; @@ -254,6 +340,9 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, transceiver_.SetChannel(&mock_channel); transceiver_.HeaderExtensionsNegotiated(); testing::Mock::VerifyAndClearExpectations(&mock_channel); + EXPECT_CALL(mock_channel, media_type()) + .WillRepeatedly(Return(cricket::MediaType::MEDIA_TYPE_AUDIO)); + EXPECT_CALL(mock_channel, media_channel()).WillRepeatedly(Return(nullptr)); extensions = {webrtc::RtpExtension("uri3", 4), webrtc::RtpExtension("uri5", 6)}; diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 91ea3794bf..e3fe002207 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -4586,14 +4586,19 @@ void SdpOfferAnswerHandler::DestroyTransceiverChannel( RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0); if (channel) { // TODO(tommi): VideoRtpReceiver::SetMediaChannel blocks and jumps to the - // worker thread. When being set to nullptrpus, there are additional + // worker thread. When being set to nullptr, there are additional // blocking calls to e.g. ClearRecordableEncodedFrameCallback which triggers // another blocking call or Stop() for video channels. + // The channel object also needs to be de-initialized on the network thread + // so if ownership of the channel object lies with the transceiver, we could + // un-set the channel pointer and uninitialize/destruct the channel object + // at the same time, rather than in separate steps. transceiver->internal()->SetChannel(nullptr); - RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(2); + RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(3); // TODO(tommi): All channel objects end up getting deleted on the - // worker thread. Can DestroyTransceiverChannel be purely posted to the - // worker? + // worker thread (ideally should be on the network thread but the + // MediaChannel objects are tied to the worker. Can the teardown be done + // asynchronously across the threads rather than blocking? DestroyChannelInterface(channel); } } diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index 3f3e0a9ee0..df697b3155 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -388,7 +388,8 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { } } auto transceiver = RtpTransceiverProxyWithInternal::Create( - signaling_thread_, new RtpTransceiver(media_type)); + signaling_thread_, + new RtpTransceiver(media_type, channel_manager_.get())); transceivers_.push_back(transceiver); return transceiver; } @@ -397,6 +398,12 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { rtc::Thread* const worker_thread_; rtc::Thread* const signaling_thread_; + std::unique_ptr channel_manager_ = + cricket::ChannelManager::Create(nullptr /* MediaEngineInterface */, + true, + worker_thread_, + network_thread_); + rtc::scoped_refptr local_streams_; rtc::scoped_refptr remote_streams_; diff --git a/pc/test/mock_channel_interface.h b/pc/test/mock_channel_interface.h index 5d3c66d1ae..d376e423d5 100644 --- a/pc/test/mock_channel_interface.h +++ b/pc/test/mock_channel_interface.h @@ -29,9 +29,9 @@ class MockChannelInterface : public cricket::ChannelInterface { MOCK_METHOD(const std::string&, transport_name, (), (const, override)); MOCK_METHOD(const std::string&, content_name, (), (const, override)); MOCK_METHOD(void, Enable, (bool), (override)); - MOCK_METHOD(sigslot::signal1&, - SignalFirstPacketReceived, - (), + MOCK_METHOD(void, + SetFirstPacketReceivedCallback, + (std::function), (override)); MOCK_METHOD(bool, SetLocalContent,