diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index f78b4efc5e..50204d396c 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -255,7 +255,6 @@ rtc_source_set("rtcp_transceiver") { "../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_cancelable_task", "../../rtc_base:rtc_task_queue", - "../../rtc_base:weak_ptr", "../../system_wrappers", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.cc b/modules/rtp_rtcp/source/rtcp_transceiver.cc index 9a179295b2..e5ad3bae95 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver.cc @@ -22,42 +22,48 @@ namespace webrtc { RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config) : task_queue_(config.task_queue), - rtcp_transceiver_(absl::make_unique(config)), - ptr_factory_(rtcp_transceiver_.get()), - // Creating first weak ptr can be done on any thread, but is not - // thread-safe, thus do it at construction. Creating second (e.g. making a - // copy) is thread-safe. - ptr_(ptr_factory_.GetWeakPtr()) { + rtcp_transceiver_(absl::make_unique(config)) { RTC_DCHECK(task_queue_); } RtcpTransceiver::~RtcpTransceiver() { - if (task_queue_->IsCurrent()) + if (!rtcp_transceiver_) return; + RTC_CHECK(!task_queue_->IsCurrent()); rtc::Event done(false, false); // TODO(danilchap): Merge cleanup into main closure when task queue does not // silently drop tasks. task_queue_->PostTask(rtc::NewClosure( [this] { - // Destructor steps that has to run on the task_queue_. - ptr_factory_.InvalidateWeakPtrs(); + // Destructor steps that better run on the task_queue_. rtcp_transceiver_.reset(); }, /*cleanup=*/[&done] { done.Set(); })); - // Wait until destruction is complete to be sure weak pointers invalidated and - // rtcp_transceiver destroyed on the queue while |this| still valid. + // Wait until destruction is complete to guarantee callbacks are not used + // after destructor returns. done.Wait(rtc::Event::kForever); RTC_CHECK(!rtcp_transceiver_) << "Task queue is too busy to handle rtcp"; } +void RtcpTransceiver::Stop(std::unique_ptr on_destroyed) { + RTC_DCHECK(rtcp_transceiver_); + struct Destructor { + void operator()() { rtcp_transceiver = nullptr; } + std::unique_ptr rtcp_transceiver; + }; + task_queue_->PostTaskAndReply(Destructor{std::move(rtcp_transceiver_)}, + std::move(on_destroyed)); + RTC_DCHECK(!rtcp_transceiver_); +} + void RtcpTransceiver::AddMediaReceiverRtcpObserver( uint32_t remote_ssrc, MediaReceiverRtcpObserver* observer) { - rtc::WeakPtr ptr = ptr_; + RTC_CHECK(rtcp_transceiver_); + RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); task_queue_->PostTask([ptr, remote_ssrc, observer] { - if (ptr) - ptr->AddMediaReceiverRtcpObserver(remote_ssrc, observer); + ptr->AddMediaReceiverRtcpObserver(remote_ssrc, observer); }); } @@ -65,59 +71,60 @@ void RtcpTransceiver::RemoveMediaReceiverRtcpObserver( uint32_t remote_ssrc, MediaReceiverRtcpObserver* observer, std::unique_ptr on_removed) { - rtc::WeakPtr ptr = ptr_; + RTC_CHECK(rtcp_transceiver_); + RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); auto remove = [ptr, remote_ssrc, observer] { - if (ptr) - ptr->RemoveMediaReceiverRtcpObserver(remote_ssrc, observer); + ptr->RemoveMediaReceiverRtcpObserver(remote_ssrc, observer); }; task_queue_->PostTaskAndReply(std::move(remove), std::move(on_removed)); } void RtcpTransceiver::SetReadyToSend(bool ready) { - rtc::WeakPtr ptr = ptr_; + RTC_CHECK(rtcp_transceiver_); + RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); task_queue_->PostTask([ptr, ready] { - if (ptr) ptr->SetReadyToSend(ready); }); } void RtcpTransceiver::ReceivePacket(rtc::CopyOnWriteBuffer packet) { - rtc::WeakPtr ptr = ptr_; + RTC_CHECK(rtcp_transceiver_); + RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); int64_t now_us = rtc::TimeMicros(); task_queue_->PostTask([ptr, packet, now_us] { - if (ptr) ptr->ReceivePacket(packet, now_us); }); } void RtcpTransceiver::SendCompoundPacket() { - rtc::WeakPtr ptr = ptr_; + RTC_CHECK(rtcp_transceiver_); + RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); task_queue_->PostTask([ptr] { - if (ptr) ptr->SendCompoundPacket(); }); } void RtcpTransceiver::SetRemb(int64_t bitrate_bps, std::vector ssrcs) { + RTC_CHECK(rtcp_transceiver_); // TODO(danilchap): Replace with lambda with move capture when available. struct SetRembClosure { void operator()() { - if (ptr) ptr->SetRemb(bitrate_bps, std::move(ssrcs)); } - rtc::WeakPtr ptr; + RtcpTransceiverImpl* ptr; int64_t bitrate_bps; std::vector ssrcs; }; - task_queue_->PostTask(SetRembClosure{ptr_, bitrate_bps, std::move(ssrcs)}); + task_queue_->PostTask( + SetRembClosure{rtcp_transceiver_.get(), bitrate_bps, std::move(ssrcs)}); } void RtcpTransceiver::UnsetRemb() { - rtc::WeakPtr ptr = ptr_; + RTC_CHECK(rtcp_transceiver_); + RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); task_queue_->PostTask([ptr] { - if (ptr) ptr->UnsetRemb(); }); } @@ -128,54 +135,53 @@ uint32_t RtcpTransceiver::SSRC() const { bool RtcpTransceiver::SendFeedbackPacket( const rtcp::TransportFeedback& packet) { + RTC_CHECK(rtcp_transceiver_); struct Closure { void operator()() { - if (ptr) ptr->SendRawPacket(raw_packet); } - rtc::WeakPtr ptr; + RtcpTransceiverImpl* ptr; rtc::Buffer raw_packet; }; - task_queue_->PostTask(Closure{ptr_, packet.Build()}); + task_queue_->PostTask(Closure{rtcp_transceiver_.get(), packet.Build()}); return true; } void RtcpTransceiver::SendNack(uint32_t ssrc, std::vector sequence_numbers) { + RTC_CHECK(rtcp_transceiver_); // TODO(danilchap): Replace with lambda with move capture when available. struct Closure { void operator()() { - if (ptr) ptr->SendNack(ssrc, std::move(sequence_numbers)); } - rtc::WeakPtr ptr; + RtcpTransceiverImpl* ptr; uint32_t ssrc; std::vector sequence_numbers; }; - task_queue_->PostTask(Closure{ptr_, ssrc, std::move(sequence_numbers)}); + task_queue_->PostTask( + Closure{rtcp_transceiver_.get(), ssrc, std::move(sequence_numbers)}); } void RtcpTransceiver::SendPictureLossIndication(uint32_t ssrc) { - rtc::WeakPtr ptr = ptr_; - task_queue_->PostTask([ptr, ssrc] { - if (ptr) - ptr->SendPictureLossIndication(ssrc); - }); + RTC_CHECK(rtcp_transceiver_); + RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); + task_queue_->PostTask([ptr, ssrc] { ptr->SendPictureLossIndication(ssrc); }); } void RtcpTransceiver::SendFullIntraRequest(std::vector ssrcs) { + RTC_CHECK(rtcp_transceiver_); // TODO(danilchap): Replace with lambda with move capture when available. struct Closure { void operator()() { - if (ptr) ptr->SendFullIntraRequest(ssrcs); } - rtc::WeakPtr ptr; + RtcpTransceiverImpl* ptr; std::vector ssrcs; }; - task_queue_->PostTask(Closure{ptr_, std::move(ssrcs)}); + task_queue_->PostTask(Closure{rtcp_transceiver_.get(), std::move(ssrcs)}); } } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.h b/modules/rtp_rtcp/source/rtcp_transceiver.h index 9125781ea5..e3e7cfa569 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver.h @@ -20,7 +20,6 @@ #include "rtc_base/constructormagic.h" #include "rtc_base/copyonwritebuffer.h" #include "rtc_base/task_queue.h" -#include "rtc_base/weak_ptr.h" namespace webrtc { // @@ -30,8 +29,18 @@ namespace webrtc { class RtcpTransceiver : public RtcpFeedbackSenderInterface { public: explicit RtcpTransceiver(const RtcpTransceiverConfig& config); + // Blocks unless Stop was called. + // TODO(danilchap): Change destructor to never block by breaking assumption + // callbacks are not used after destruction. ~RtcpTransceiver() override; + // Start asynchronious destruction of the RtcpTransceiver. + // It is safe to call destructor right after Stop exits. + // No other methods can be called. + // Note that observers provided in constructor or registered with AddObserver + // still might be used by the transceiver until |on_destroyed| runs. + void Stop(std::unique_ptr on_destroyed); + // Registers observer to be notified about incoming rtcp packets. // Calls to observer will be done on the |config.task_queue|. void AddMediaReceiverRtcpObserver(uint32_t remote_ssrc, @@ -80,12 +89,6 @@ class RtcpTransceiver : public RtcpFeedbackSenderInterface { private: rtc::TaskQueue* const task_queue_; std::unique_ptr rtcp_transceiver_; - rtc::WeakPtrFactory ptr_factory_; - // TaskQueue, and thus tasks posted to it, may outlive this. - // Thus when Posting task class always pass copy of the weak_ptr to access - // the RtcpTransceiver and never guarantee it still will be alive when task - // runs. - rtc::WeakPtr ptr_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcpTransceiver); }; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc index 1305d9a47b..1ef199849e 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc @@ -83,18 +83,39 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) { WaitPostedTasks(&queue); } -TEST(RtcpTransceiverTest, CanBeDestoryedOnTaskQueue) { +TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueueAfterStop) { rtc::TaskQueue queue("rtcp"); NiceMock outgoing_transport; RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; config.task_queue = &queue; auto rtcp_transceiver = absl::make_unique(config); + rtcp_transceiver->Stop(rtc::NewClosure([] {})); queue.PostTask([&] { rtcp_transceiver.reset(); }); WaitPostedTasks(&queue); } +TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlockingAfterStop) { + rtc::TaskQueue queue("rtcp"); + NiceMock outgoing_transport; + RtcpTransceiverConfig config; + config.outgoing_transport = &outgoing_transport; + config.task_queue = &queue; + auto rtcp_transceiver = absl::make_unique(config); + rtcp_transceiver->SendCompoundPacket(); + + rtc::Event heavy_task(false, false); + queue.PostTask( + rtc::NewClosure([&] { EXPECT_TRUE(heavy_task.Wait(kTimeoutMs)); })); + rtc::Event done(false, false); + rtcp_transceiver->Stop(rtc::NewClosure([&done] { done.Set(); })); + rtcp_transceiver = nullptr; + + heavy_task.Set(); + EXPECT_TRUE(done.Wait(kTimeoutMs)); +} + // Use rtp timestamp to distinguish different incoming sender reports. rtc::CopyOnWriteBuffer CreateSenderReport(uint32_t ssrc, uint32_t rtp_time) { webrtc::rtcp::SenderReport sr; @@ -189,26 +210,23 @@ TEST(RtcpTransceiverTest, CanCallSendCompoundPacketFromAnyThread) { WaitPostedTasks(&queue); } -TEST(RtcpTransceiverTest, DoesntSendPacketsAfterDestruction) { - MockTransport outgoing_transport; +TEST(RtcpTransceiverTest, DoesntSendPacketsAfterStopCallback) { + NiceMock outgoing_transport; rtc::TaskQueue queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; config.task_queue = &queue; - config.schedule_periodic_compound_packets = false; - - EXPECT_CALL(outgoing_transport, SendRtcp(_, _)).Times(0); + config.schedule_periodic_compound_packets = true; auto rtcp_transceiver = absl::make_unique(config); - rtc::Event pause(false, false); - queue.PostTask([&] { - pause.Wait(rtc::Event::kForever); - rtcp_transceiver.reset(); - }); + rtc::Event done(false, false); rtcp_transceiver->SendCompoundPacket(); - pause.Set(); - WaitPostedTasks(&queue); - EXPECT_FALSE(rtcp_transceiver); + rtcp_transceiver->Stop(rtc::NewClosure([&] { + EXPECT_CALL(outgoing_transport, SendRtcp).Times(0); + done.Set(); + })); + rtcp_transceiver = nullptr; + EXPECT_TRUE(done.Wait(kTimeoutMs)); } TEST(RtcpTransceiverTest, SendsTransportFeedbackOnTaskQueue) {