From c0fd5f97a8fbda3ddc97d8fd0f4e88a1501dffba Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Thu, 16 Nov 2017 14:35:32 +0100 Subject: [PATCH] Add Thread-safe wrapper for RtcpTransceiver Bug: webrtc:8239 Change-Id: I4cc2f7f2b27c764e1aae734f933902102b345614 Reviewed-on: https://webrtc-review.googlesource.com/21680 Reviewed-by: Niels Moller Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#20714} --- modules/rtp_rtcp/BUILD.gn | 3 + modules/rtp_rtcp/source/rtcp_transceiver.cc | 93 ++++++++++++ modules/rtp_rtcp/source/rtcp_transceiver.h | 62 ++++++++ .../rtp_rtcp/source/rtcp_transceiver_impl.cc | 25 ++-- .../rtp_rtcp/source/rtcp_transceiver_impl.h | 7 +- .../source/rtcp_transceiver_impl_unittest.cc | 4 +- .../source/rtcp_transceiver_unittest.cc | 135 ++++++++++++++++++ 7 files changed, 313 insertions(+), 16 deletions(-) create mode 100644 modules/rtp_rtcp/source/rtcp_transceiver.cc create mode 100644 modules/rtp_rtcp/source/rtcp_transceiver.h create mode 100644 modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index c391d31690..0174394910 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -221,10 +221,12 @@ rtc_static_library("rtp_rtcp") { rtc_source_set("rtcp_transceiver") { public = [ + "source/rtcp_transceiver.h", "source/rtcp_transceiver_config.h", "source/rtcp_transceiver_impl.h", ] sources = [ + "source/rtcp_transceiver.cc", "source/rtcp_transceiver_config.cc", "source/rtcp_transceiver_impl.cc", ] @@ -349,6 +351,7 @@ if (rtc_include_tests) { "source/rtcp_receiver_unittest.cc", "source/rtcp_sender_unittest.cc", "source/rtcp_transceiver_impl_unittest.cc", + "source/rtcp_transceiver_unittest.cc", "source/rtp_fec_unittest.cc", "source/rtp_format_h264_unittest.cc", "source/rtp_format_video_generic_unittest.cc", diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.cc b/modules/rtp_rtcp/source/rtcp_transceiver.cc new file mode 100644 index 0000000000..fa104f1bd3 --- /dev/null +++ b/modules/rtp_rtcp/source/rtcp_transceiver.cc @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/rtp_rtcp/source/rtcp_transceiver.h" + +#include + +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/ptr_util.h" +#include "rtc_base/timeutils.h" + +namespace webrtc { + +RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config) + : task_queue_(config.task_queue), + rtcp_transceiver_(rtc::MakeUnique(config)), + ptr_factory_(rtcp_transceiver_.get()), + // Creating first weak ptr can be done on any thread, but is not + // thread-safe, thus do it at construction. Creating second (e.g. making a + // copy) is thread-safe. + ptr_(ptr_factory_.GetWeakPtr()) { + RTC_DCHECK(task_queue_); +} + +RtcpTransceiver::~RtcpTransceiver() { + if (task_queue_->IsCurrent()) + return; + + rtc::Event done(false, false); + // TODO(danilchap): Merge cleanup into main closure when task queue does not + // silently drop tasks. + task_queue_->PostTask(rtc::NewClosure( + [this] { + // Destructor steps that has to run on the task_queue_. + ptr_factory_.InvalidateWeakPtrs(); + rtcp_transceiver_.reset(); + }, + /*cleanup=*/[&done] { done.Set(); })); + // Wait until destruction is complete to be sure weak pointers invalidated and + // rtcp_transceiver destroyed on the queue while |this| still valid. + done.Wait(rtc::Event::kForever); + RTC_CHECK(!rtcp_transceiver_) << "Task queue is too busy to handle rtcp"; +} + +void RtcpTransceiver::ReceivePacket(rtc::CopyOnWriteBuffer packet) { + rtc::WeakPtr ptr = ptr_; + int64_t now_us = rtc::TimeMicros(); + task_queue_->PostTask([ptr, packet, now_us] { + if (ptr) + ptr->ReceivePacket(packet, now_us); + }); +} + +void RtcpTransceiver::SendCompoundPacket() { + rtc::WeakPtr ptr = ptr_; + task_queue_->PostTask([ptr] { + if (ptr) + ptr->SendCompoundPacket(); + }); +} + +void RtcpTransceiver::SetRemb(int bitrate_bps, std::vector ssrcs) { + // TODO(danilchap): Replace with lambda with move capture when available. + struct SetRembClosure { + void operator()() { + if (ptr) + ptr->SetRemb(bitrate_bps, std::move(ssrcs)); + } + + rtc::WeakPtr ptr; + int bitrate_bps; + std::vector ssrcs; + }; + task_queue_->PostTask(SetRembClosure{ptr_, bitrate_bps, std::move(ssrcs)}); +} + +void RtcpTransceiver::UnsetRemb() { + rtc::WeakPtr ptr = ptr_; + task_queue_->PostTask([ptr] { + if (ptr) + ptr->UnsetRemb(); + }); +} + +} // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.h b/modules/rtp_rtcp/source/rtcp_transceiver.h new file mode 100644 index 0000000000..5a9998b0b0 --- /dev/null +++ b/modules/rtp_rtcp/source/rtcp_transceiver.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_H_ +#define MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_H_ + +#include +#include +#include + +#include "modules/rtp_rtcp/source/rtcp_transceiver_config.h" +#include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h" +#include "rtc_base/constructormagic.h" +#include "rtc_base/copyonwritebuffer.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/weak_ptr.h" + +namespace webrtc { +// +// Manage incoming and outgoing rtcp messages for multiple BUNDLED streams. +// +// This class is thread-safe wrapper of RtcpTransceiverImpl +class RtcpTransceiver { + public: + explicit RtcpTransceiver(const RtcpTransceiverConfig& config); + ~RtcpTransceiver(); + + // Handles incoming rtcp packets. + void ReceivePacket(rtc::CopyOnWriteBuffer packet); + + // Sends RTCP packets starting with a sender or receiver report. + void SendCompoundPacket(); + + // (REMB) Receiver Estimated Max Bitrate. + // Includes REMB in following compound packets. + void SetRemb(int bitrate_bps, std::vector ssrcs); + // Stops sending REMB in following compound packets. + void UnsetRemb(); + + private: + rtc::TaskQueue* const task_queue_; + std::unique_ptr rtcp_transceiver_; + rtc::WeakPtrFactory ptr_factory_; + // TaskQueue, and thus tasks posted to it, may outlive this. + // Thus when Posting task class always pass copy of the weak_ptr to access + // the RtcpTransceiver and never guarantee it still will be alive when task + // runs. + rtc::WeakPtr ptr_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcpTransceiver); +}; + +} // namespace webrtc + +#endif // MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_H_ diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc index d6127a9592..2c6eb8d48b 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc @@ -25,6 +25,7 @@ #include "rtc_base/checks.h" #include "rtc_base/ptr_util.h" #include "rtc_base/task_queue.h" +#include "rtc_base/timeutils.h" namespace webrtc { namespace { @@ -73,18 +74,19 @@ RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config) : config_(config), ptr_factory_(this) { RTC_CHECK(config_.Validate()); if (config_.schedule_periodic_compound_packets) - ReschedulePeriodicCompoundPackets(config_.initial_report_delay_ms); + SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms); } RtcpTransceiverImpl::~RtcpTransceiverImpl() = default; -void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView packet) { +void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView packet, + int64_t now_us) { while (!packet.empty()) { rtcp::CommonHeader rtcp_block; if (!rtcp_block.Parse(packet.data(), packet.size())) return; - HandleReceivedPacket(rtcp_block); + HandleReceivedPacket(rtcp_block, now_us); // TODO(danilchap): Use packet.remove_prefix() when that function exists. packet = packet.subview(rtcp_block.packet_size()); @@ -93,8 +95,11 @@ void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView packet) { void RtcpTransceiverImpl::SendCompoundPacket() { SendPacket(); - if (config_.schedule_periodic_compound_packets) - ReschedulePeriodicCompoundPackets(config_.report_period_ms); + if (config_.schedule_periodic_compound_packets) { + // Stop existent send task. + ptr_factory_.InvalidateWeakPtrs(); + SchedulePeriodicCompoundPackets(config_.report_period_ms); + } } void RtcpTransceiverImpl::SetRemb(int bitrate_bps, @@ -113,7 +118,8 @@ void RtcpTransceiverImpl::UnsetRemb() { } void RtcpTransceiverImpl::HandleReceivedPacket( - const rtcp::CommonHeader& rtcp_packet_header) { + const rtcp::CommonHeader& rtcp_packet_header, + int64_t now_us) { switch (rtcp_packet_header.type()) { case rtcp::SenderReport::kPacketType: { rtcp::SenderReport sender_report; @@ -121,14 +127,14 @@ void RtcpTransceiverImpl::HandleReceivedPacket( return; SenderReportTimes& last = last_received_sender_reports_[sender_report.sender_ssrc()]; - last.local_received_time_us = rtc::TimeMicros(); + last.local_received_time_us = now_us; last.remote_sent_time = sender_report.ntp(); break; } } } -void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) { +void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) { class SendPeriodicCompoundPacket : public rtc::QueuedTask { public: SendPeriodicCompoundPacket(rtc::TaskQueue* task_queue, @@ -150,10 +156,7 @@ void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) { }; RTC_DCHECK(config_.schedule_periodic_compound_packets); - RTC_DCHECK(config_.task_queue->IsCurrent()); - // Stop existent send task if there is one. - ptr_factory_.InvalidateWeakPtrs(); auto task = rtc::MakeUnique( config_.task_queue, ptr_factory_.GetWeakPtr()); if (delay_ms > 0) diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h index 81f0cace9c..689ed1e9d1 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h @@ -37,7 +37,7 @@ class RtcpTransceiverImpl { ~RtcpTransceiverImpl(); // Handles incoming rtcp packets. - void ReceivePacket(rtc::ArrayView packet); + void ReceivePacket(rtc::ArrayView packet, int64_t now_us); // Sends RTCP packets starting with a sender or receiver report. void SendCompoundPacket(); @@ -54,9 +54,10 @@ class RtcpTransceiverImpl { NtpTime remote_sent_time; }; - void HandleReceivedPacket(const rtcp::CommonHeader& rtcp_packet_header); + void HandleReceivedPacket(const rtcp::CommonHeader& rtcp_packet_header, + int64_t now_us); - void ReschedulePeriodicCompoundPackets(int64_t delay_ms); + void SchedulePeriodicCompoundPackets(int64_t delay_ms); // Sends RTCP packets. void SendPacket(); // Generate Report Blocks to be send in Sender or Receiver Report. diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc index e38d706199..b144ec4c02 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc @@ -374,7 +374,7 @@ TEST(RtcpTransceiverImplTest, sr.SetSenderSsrc(kRemoteSsrc2); sr.SetNtp(kRemoteNtp); auto raw_packet = sr.Build(); - rtcp_transceiver.ReceivePacket(raw_packet); + rtcp_transceiver.ReceivePacket(raw_packet, /*now_us=*/0); // Trigger sending ReceiverReport. RtcpPacketParser rtcp_parser; @@ -419,7 +419,7 @@ TEST(RtcpTransceiverImplTest, SenderReport sr; sr.SetSenderSsrc(remote_ssrc); auto raw_packet = sr.Build(); - rtcp_transceiver.ReceivePacket(raw_packet); + rtcp_transceiver.ReceivePacket(raw_packet, rtc::TimeMicros()); }; receive_sender_report(kRemoteSsrc1); diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc new file mode 100644 index 0000000000..dea91dba86 --- /dev/null +++ b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/rtp_rtcp/source/rtcp_transceiver.h" + +#include "rtc_base/event.h" +#include "rtc_base/ptr_util.h" +#include "test/gmock.h" +#include "test/gtest.h" +#include "test/mock_transport.h" + +namespace { + +using ::testing::AtLeast; +using ::testing::InvokeWithoutArgs; +using ::testing::NiceMock; +using ::testing::_; +using ::webrtc::MockTransport; +using ::webrtc::RtcpTransceiver; +using ::webrtc::RtcpTransceiverConfig; + +void WaitPostedTasks(rtc::TaskQueue* queue) { + rtc::Event done(false, false); + queue->PostTask([&done] { done.Set(); }); + ASSERT_TRUE(done.Wait(1000)); +} + +TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) { + rtc::TaskQueue queue("rtcp"); + MockTransport outgoing_transport; + RtcpTransceiverConfig config; + config.outgoing_transport = &outgoing_transport; + config.task_queue = &queue; + EXPECT_CALL(outgoing_transport, SendRtcp(_, _)) + .WillRepeatedly(InvokeWithoutArgs([&] { + EXPECT_TRUE(queue.IsCurrent()); + return true; + })); + + RtcpTransceiver rtcp_transceiver(config); + rtcp_transceiver.SendCompoundPacket(); + WaitPostedTasks(&queue); +} + +TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) { + rtc::TaskQueue queue("rtcp"); + MockTransport outgoing_transport; + RtcpTransceiverConfig config; + config.outgoing_transport = &outgoing_transport; + config.task_queue = &queue; + EXPECT_CALL(outgoing_transport, SendRtcp(_, _)) + .WillRepeatedly(InvokeWithoutArgs([&] { + EXPECT_TRUE(queue.IsCurrent()); + return true; + })); + + std::unique_ptr rtcp_transceiver; + queue.PostTask([&] { + rtcp_transceiver = rtc::MakeUnique(config); + rtcp_transceiver->SendCompoundPacket(); + }); + WaitPostedTasks(&queue); +} + +TEST(RtcpTransceiverTest, CanBeDestoryedOnTaskQueue) { + rtc::TaskQueue queue("rtcp"); + NiceMock outgoing_transport; + RtcpTransceiverConfig config; + config.outgoing_transport = &outgoing_transport; + config.task_queue = &queue; + auto rtcp_transceiver = rtc::MakeUnique(config); + + queue.PostTask([&] { rtcp_transceiver.reset(); }); + WaitPostedTasks(&queue); +} + +TEST(RtcpTransceiverTest, CanCallSendCompoundPacketFromAnyThread) { + MockTransport outgoing_transport; + rtc::TaskQueue queue("rtcp"); + RtcpTransceiverConfig config; + config.outgoing_transport = &outgoing_transport; + config.task_queue = &queue; + + EXPECT_CALL(outgoing_transport, SendRtcp(_, _)) + // If test is slow, a periodic task may send an extra packet. + .Times(AtLeast(3)) + .WillRepeatedly(InvokeWithoutArgs([&] { + EXPECT_TRUE(queue.IsCurrent()); + return true; + })); + + RtcpTransceiver rtcp_transceiver(config); + + // Call from the construction thread. + rtcp_transceiver.SendCompoundPacket(); + // Call from the same queue transceiver use for processing. + queue.PostTask([&] { rtcp_transceiver.SendCompoundPacket(); }); + // Call from unrelated task queue. + rtc::TaskQueue queue_send("send_packet"); + queue_send.PostTask([&] { rtcp_transceiver.SendCompoundPacket(); }); + + WaitPostedTasks(&queue_send); + WaitPostedTasks(&queue); +} + +TEST(RtcpTransceiverTest, DoesntSendPacketsAfterDestruction) { + MockTransport outgoing_transport; + rtc::TaskQueue queue("rtcp"); + RtcpTransceiverConfig config; + config.outgoing_transport = &outgoing_transport; + config.task_queue = &queue; + config.schedule_periodic_compound_packets = false; + + EXPECT_CALL(outgoing_transport, SendRtcp(_, _)).Times(0); + + auto rtcp_transceiver = rtc::MakeUnique(config); + rtc::Event pause(false, false); + queue.PostTask([&] { + pause.Wait(rtc::Event::kForever); + rtcp_transceiver.reset(); + }); + rtcp_transceiver->SendCompoundPacket(); + pause.Set(); + WaitPostedTasks(&queue); + EXPECT_FALSE(rtcp_transceiver); +} + +} // namespace