diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.cc b/modules/rtp_rtcp/source/rtcp_transceiver.cc index e5ad3bae95..77b10ca052 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver.cc @@ -19,6 +19,12 @@ #include "rtc_base/timeutils.h" namespace webrtc { +namespace { +struct Destructor { + void operator()() { rtcp_transceiver = nullptr; } + std::unique_ptr rtcp_transceiver; +}; +} // namespace RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config) : task_queue_(config.task_queue), @@ -29,29 +35,12 @@ RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config) RtcpTransceiver::~RtcpTransceiver() { if (!rtcp_transceiver_) return; - RTC_CHECK(!task_queue_->IsCurrent()); - - rtc::Event done(false, false); - // TODO(danilchap): Merge cleanup into main closure when task queue does not - // silently drop tasks. - task_queue_->PostTask(rtc::NewClosure( - [this] { - // Destructor steps that better run on the task_queue_. - rtcp_transceiver_.reset(); - }, - /*cleanup=*/[&done] { done.Set(); })); - // Wait until destruction is complete to guarantee callbacks are not used - // after destructor returns. - done.Wait(rtc::Event::kForever); - RTC_CHECK(!rtcp_transceiver_) << "Task queue is too busy to handle rtcp"; + task_queue_->PostTask(Destructor{std::move(rtcp_transceiver_)}); + RTC_DCHECK(!rtcp_transceiver_); } void RtcpTransceiver::Stop(std::unique_ptr on_destroyed) { RTC_DCHECK(rtcp_transceiver_); - struct Destructor { - void operator()() { rtcp_transceiver = nullptr; } - std::unique_ptr rtcp_transceiver; - }; task_queue_->PostTaskAndReply(Destructor{std::move(rtcp_transceiver_)}, std::move(on_destroyed)); RTC_DCHECK(!rtcp_transceiver_); diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.h b/modules/rtp_rtcp/source/rtcp_transceiver.h index e3e7cfa569..9f823cccf3 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver.h @@ -29,16 +29,18 @@ namespace webrtc { class RtcpTransceiver : public RtcpFeedbackSenderInterface { public: explicit RtcpTransceiver(const RtcpTransceiverConfig& config); - // Blocks unless Stop was called. - // TODO(danilchap): Change destructor to never block by breaking assumption - // callbacks are not used after destruction. + // Note that interfaces provided in constructor still might be used after the + // destructor. However they can only be used on the confic.task_queue. + // Use Stop function to get notified when they are no longer used or + // ensure those objects outlive the task queue. ~RtcpTransceiver() override; // Start asynchronious destruction of the RtcpTransceiver. // It is safe to call destructor right after Stop exits. // No other methods can be called. - // Note that observers provided in constructor or registered with AddObserver - // still might be used by the transceiver until |on_destroyed| runs. + // Note that interfaces provided in constructor or registered with AddObserver + // still might be used by the transceiver on the task queue + // until |on_destroyed| runs. void Stop(std::unique_ptr on_destroyed); // Registers observer to be notified about incoming rtcp packets. diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc index 1ef199849e..2ea5bc9b53 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc @@ -47,8 +47,8 @@ void WaitPostedTasks(rtc::TaskQueue* queue) { } TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) { - rtc::TaskQueue queue("rtcp"); MockTransport outgoing_transport; + rtc::TaskQueue queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; config.task_queue = &queue; @@ -64,8 +64,8 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) { } TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) { - rtc::TaskQueue queue("rtcp"); MockTransport outgoing_transport; + rtc::TaskQueue queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; config.task_queue = &queue; @@ -83,39 +83,62 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) { WaitPostedTasks(&queue); } -TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueueAfterStop) { - rtc::TaskQueue queue("rtcp"); +TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueue) { NiceMock outgoing_transport; + rtc::TaskQueue queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; config.task_queue = &queue; auto rtcp_transceiver = absl::make_unique(config); - rtcp_transceiver->Stop(rtc::NewClosure([] {})); - queue.PostTask([&] { rtcp_transceiver.reset(); }); + queue.PostTask([&] { + // Insert a packet just before destruction to test for races. + rtcp_transceiver->SendCompoundPacket(); + rtcp_transceiver.reset(); + }); WaitPostedTasks(&queue); } -TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlockingAfterStop) { +TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlocking) { rtc::TaskQueue queue("rtcp"); NiceMock outgoing_transport; RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; config.task_queue = &queue; - auto rtcp_transceiver = absl::make_unique(config); + auto* rtcp_transceiver = new RtcpTransceiver(config); rtcp_transceiver->SendCompoundPacket(); - rtc::Event heavy_task(false, false); - queue.PostTask( - rtc::NewClosure([&] { EXPECT_TRUE(heavy_task.Wait(kTimeoutMs)); })); rtc::Event done(false, false); - rtcp_transceiver->Stop(rtc::NewClosure([&done] { done.Set(); })); - rtcp_transceiver = nullptr; + rtc::Event heavy_task(false, false); + queue.PostTask([&] { + EXPECT_TRUE(heavy_task.Wait(kTimeoutMs)); + done.Set(); + }); + delete rtcp_transceiver; heavy_task.Set(); EXPECT_TRUE(done.Wait(kTimeoutMs)); } +TEST(RtcpTransceiverTest, MaySendPacketsAfterDestructor) { // i.e. Be careful! + NiceMock outgoing_transport; // Must outlive queue below. + rtc::TaskQueue queue("rtcp"); + RtcpTransceiverConfig config; + config.outgoing_transport = &outgoing_transport; + config.task_queue = &queue; + auto* rtcp_transceiver = new RtcpTransceiver(config); + + rtc::Event heavy_task(false, false); + queue.PostTask([&] { EXPECT_TRUE(heavy_task.Wait(kTimeoutMs)); }); + rtcp_transceiver->SendCompoundPacket(); + delete rtcp_transceiver; + + EXPECT_CALL(outgoing_transport, SendRtcp); + heavy_task.Set(); + + WaitPostedTasks(&queue); +} + // Use rtp timestamp to distinguish different incoming sender reports. rtc::CopyOnWriteBuffer CreateSenderReport(uint32_t ssrc, uint32_t rtp_time) { webrtc::rtcp::SenderReport sr;