From f351cfffe2d36c29176082187d5a889593f26353 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Thu, 5 Mar 2020 15:43:24 +0100 Subject: [PATCH] Migrate RtcpTransceiver to use webrtc::TaskQueueBase instead of rtc::TaskQueue This changes removes an extra layer of indirection since RtcpTransceiver doesn't own TaskQueue it uses. Bug: None Change-Id: Ie1ef4cd8c3fb18a8e0b7ddaf0d6a319392b9e9f7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/126040 Reviewed-by: Per Kjellander Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#30704} --- modules/rtp_rtcp/BUILD.gn | 2 +- modules/rtp_rtcp/source/rtcp_transceiver.cc | 47 ++++++++++--------- modules/rtp_rtcp/source/rtcp_transceiver.h | 4 +- .../source/rtcp_transceiver_config.cc | 2 +- .../rtp_rtcp/source/rtcp_transceiver_config.h | 4 +- .../rtp_rtcp/source/rtcp_transceiver_impl.cc | 8 ++-- .../source/rtcp_transceiver_impl_unittest.cc | 12 ++--- .../source/rtcp_transceiver_unittest.cc | 22 ++++----- 8 files changed, 53 insertions(+), 48 deletions(-) diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index 9e6221e4a0..19a2c137a9 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -320,10 +320,10 @@ rtc_library("rtcp_transceiver") { "../../api:array_view", "../../api:rtp_headers", "../../api:transport_api", + "../../api/task_queue", "../../api/video:video_bitrate_allocation", "../../rtc_base:checks", "../../rtc_base:rtc_base_approved", - "../../rtc_base:rtc_task_queue", "../../rtc_base/task_utils:repeating_task", "../../rtc_base/task_utils:to_queued_task", "../../system_wrappers", diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.cc b/modules/rtp_rtcp/source/rtcp_transceiver.cc index 2060b0b5e0..1de581849b 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver.cc @@ -32,9 +32,10 @@ RtcpTransceiver::~RtcpTransceiver() { if (!rtcp_transceiver_) return; auto rtcp_transceiver = std::move(rtcp_transceiver_); - task_queue_->PostTask([rtcp_transceiver = std::move(rtcp_transceiver)] { - rtcp_transceiver->StopPeriodicTask(); - }); + task_queue_->PostTask( + ToQueuedTask([rtcp_transceiver = std::move(rtcp_transceiver)] { + rtcp_transceiver->StopPeriodicTask(); + })); RTC_DCHECK(!rtcp_transceiver_); } @@ -54,9 +55,9 @@ void RtcpTransceiver::AddMediaReceiverRtcpObserver( MediaReceiverRtcpObserver* observer) { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask([ptr, remote_ssrc, observer] { + task_queue_->PostTask(ToQueuedTask([ptr, remote_ssrc, observer] { ptr->AddMediaReceiverRtcpObserver(remote_ssrc, observer); - }); + })); } void RtcpTransceiver::RemoveMediaReceiverRtcpObserver( @@ -74,36 +75,38 @@ void RtcpTransceiver::RemoveMediaReceiverRtcpObserver( void RtcpTransceiver::SetReadyToSend(bool ready) { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask([ptr, ready] { ptr->SetReadyToSend(ready); }); + task_queue_->PostTask( + ToQueuedTask([ptr, ready] { ptr->SetReadyToSend(ready); })); } void RtcpTransceiver::ReceivePacket(rtc::CopyOnWriteBuffer packet) { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); int64_t now_us = rtc::TimeMicros(); - task_queue_->PostTask( - [ptr, packet, now_us] { ptr->ReceivePacket(packet, now_us); }); + task_queue_->PostTask(ToQueuedTask( + [ptr, packet, now_us] { ptr->ReceivePacket(packet, now_us); })); } void RtcpTransceiver::SendCompoundPacket() { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask([ptr] { ptr->SendCompoundPacket(); }); + task_queue_->PostTask(ToQueuedTask([ptr] { ptr->SendCompoundPacket(); })); } void RtcpTransceiver::SetRemb(int64_t bitrate_bps, std::vector ssrcs) { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask([ptr, bitrate_bps, ssrcs = std::move(ssrcs)]() mutable { - ptr->SetRemb(bitrate_bps, std::move(ssrcs)); - }); + task_queue_->PostTask( + ToQueuedTask([ptr, bitrate_bps, ssrcs = std::move(ssrcs)]() mutable { + ptr->SetRemb(bitrate_bps, std::move(ssrcs)); + })); } void RtcpTransceiver::UnsetRemb() { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask([ptr] { ptr->UnsetRemb(); }); + task_queue_->PostTask(ToQueuedTask([ptr] { ptr->UnsetRemb(); })); } void RtcpTransceiver::SendCombinedRtcpPacket( @@ -111,25 +114,26 @@ void RtcpTransceiver::SendCombinedRtcpPacket( RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); task_queue_->PostTask( - [ptr, rtcp_packets = std::move(rtcp_packets)]() mutable { + ToQueuedTask([ptr, rtcp_packets = std::move(rtcp_packets)]() mutable { ptr->SendCombinedRtcpPacket(std::move(rtcp_packets)); - }); + })); } void RtcpTransceiver::SendNack(uint32_t ssrc, std::vector sequence_numbers) { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask( + task_queue_->PostTask(ToQueuedTask( [ptr, ssrc, sequence_numbers = std::move(sequence_numbers)]() mutable { ptr->SendNack(ssrc, std::move(sequence_numbers)); - }); + })); } void RtcpTransceiver::SendPictureLossIndication(uint32_t ssrc) { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask([ptr, ssrc] { ptr->SendPictureLossIndication(ssrc); }); + task_queue_->PostTask( + ToQueuedTask([ptr, ssrc] { ptr->SendPictureLossIndication(ssrc); })); } void RtcpTransceiver::SendFullIntraRequest(std::vector ssrcs) { @@ -140,9 +144,10 @@ void RtcpTransceiver::SendFullIntraRequest(std::vector ssrcs, bool new_request) { RTC_CHECK(rtcp_transceiver_); RtcpTransceiverImpl* ptr = rtcp_transceiver_.get(); - task_queue_->PostTask([ptr, ssrcs = std::move(ssrcs), new_request] { - ptr->SendFullIntraRequest(ssrcs, new_request); - }); + task_queue_->PostTask( + ToQueuedTask([ptr, ssrcs = std::move(ssrcs), new_request] { + ptr->SendFullIntraRequest(ssrcs, new_request); + })); } } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.h b/modules/rtp_rtcp/source/rtcp_transceiver.h index fe5c9695c3..2d1f37cd44 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver.h @@ -16,10 +16,10 @@ #include #include +#include "api/task_queue/task_queue_base.h" #include "modules/rtp_rtcp/source/rtcp_transceiver_config.h" #include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h" #include "rtc_base/copy_on_write_buffer.h" -#include "rtc_base/task_queue.h" namespace webrtc { // @@ -93,7 +93,7 @@ class RtcpTransceiver : public RtcpFeedbackSenderInterface { void SendFullIntraRequest(std::vector ssrcs, bool new_request); private: - rtc::TaskQueue* const task_queue_; + TaskQueueBase* const task_queue_; std::unique_ptr rtcp_transceiver_; }; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_config.cc b/modules/rtp_rtcp/source/rtcp_transceiver_config.cc index 64e034a2e5..214d8fd409 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_config.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_config.cc @@ -58,7 +58,7 @@ bool RtcpTransceiverConfig::Validate() const { << "ms between reports should be positive."; return false; } - if (schedule_periodic_compound_packets && !task_queue) { + if (schedule_periodic_compound_packets && task_queue == nullptr) { RTC_LOG(LS_ERROR) << debug_id << "missing task queue for periodic compound packets"; return false; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_config.h b/modules/rtp_rtcp/source/rtcp_transceiver_config.h index 8a77e709d3..2cbd1045d2 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_config.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver_config.h @@ -14,9 +14,9 @@ #include #include "api/rtp_headers.h" +#include "api/task_queue/task_queue_base.h" #include "api/video/video_bitrate_allocation.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "rtc_base/task_queue.h" #include "system_wrappers/include/ntp_time.h" namespace webrtc { @@ -65,7 +65,7 @@ struct RtcpTransceiverConfig { Transport* outgoing_transport = nullptr; // Queue for scheduling delayed tasks, e.g. sending periodic compound packets. - rtc::TaskQueue* task_queue = nullptr; + TaskQueueBase* task_queue = nullptr; // Rtcp report block generator for outgoing receiver reports. ReceiveStatisticsProvider* receive_statistics = nullptr; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc index 5f2f2e02c3..0102616d59 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc @@ -32,8 +32,8 @@ #include "modules/rtp_rtcp/source/time_util.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" -#include "rtc_base/task_queue.h" #include "rtc_base/task_utils/repeating_task.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" namespace webrtc { @@ -92,9 +92,9 @@ RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config) : config_(config), ready_to_send_(config.initial_ready_to_send) { RTC_CHECK(config_.Validate()); if (ready_to_send_ && config_.schedule_periodic_compound_packets) { - config_.task_queue->PostTask([this] { + config_.task_queue->PostTask(ToQueuedTask([this] { SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms); - }); + })); } } @@ -342,7 +342,7 @@ void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() { void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) { periodic_task_handle_ = RepeatingTaskHandle::DelayedStart( - config_.task_queue->Get(), TimeDelta::Millis(delay_ms), [this] { + config_.task_queue, TimeDelta::Millis(delay_ms), [this] { RTC_DCHECK(config_.schedule_periodic_compound_packets); RTC_DCHECK(ready_to_send_); SendPeriodicCompoundPacket(); diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc index 47ce4a825d..727a9bca23 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc @@ -141,7 +141,7 @@ TEST(RtcpTransceiverImplTest, NeedToStopPeriodicTaskToDestroyOnTaskQueue) { FakeRtcpTransport transport; TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config = DefaultTestConfig(); - config.task_queue = &queue; + config.task_queue = queue.Get(); config.schedule_periodic_compound_packets = true; config.outgoing_transport = &transport; auto* rtcp_transceiver = new RtcpTransceiverImpl(config); @@ -161,7 +161,7 @@ TEST(RtcpTransceiverImplTest, CanDestroyAfterTaskQueue) { FakeRtcpTransport transport; auto* queue = new TaskQueueForTest("rtcp"); RtcpTransceiverConfig config = DefaultTestConfig(); - config.task_queue = queue; + config.task_queue = queue->Get(); config.schedule_periodic_compound_packets = true; config.outgoing_transport = &transport; auto* rtcp_transceiver = new RtcpTransceiverImpl(config); @@ -178,7 +178,7 @@ TEST(RtcpTransceiverImplTest, DelaysSendingFirstCompondPacket) { RtcpTransceiverConfig config; config.outgoing_transport = &transport; config.initial_report_delay_ms = 10; - config.task_queue = &queue; + config.task_queue = queue.Get(); absl::optional rtcp_transceiver; int64_t started_ms = rtc::TimeMillis(); @@ -204,7 +204,7 @@ TEST(RtcpTransceiverImplTest, PeriodicallySendsPackets) { config.outgoing_transport = &transport; config.initial_report_delay_ms = 0; config.report_period_ms = kReportPeriodMs; - config.task_queue = &queue; + config.task_queue = queue.Get(); absl::optional rtcp_transceiver; int64_t time_just_before_1st_packet_ms = 0; queue.PostTask([&] { @@ -238,7 +238,7 @@ TEST(RtcpTransceiverImplTest, SendCompoundPacketDelaysPeriodicSendPackets) { config.outgoing_transport = &transport; config.initial_report_delay_ms = 0; config.report_period_ms = kReportPeriodMs; - config.task_queue = &queue; + config.task_queue = queue.Get(); absl::optional rtcp_transceiver; queue.PostTask([&] { rtcp_transceiver.emplace(config); }); @@ -324,7 +324,7 @@ TEST(RtcpTransceiverImplTest, SendsPeriodicRtcpWhenNetworkStateIsUp) { config.schedule_periodic_compound_packets = true; config.initial_ready_to_send = false; config.outgoing_transport = &transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); absl::optional rtcp_transceiver; rtcp_transceiver.emplace(config); diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc index cd35cfb1da..5fb2aa55eb 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_unittest.cc @@ -58,7 +58,7 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) { TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); EXPECT_CALL(outgoing_transport, SendRtcp(_, _)) .WillRepeatedly(InvokeWithoutArgs([&] { EXPECT_TRUE(queue.IsCurrent()); @@ -75,7 +75,7 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) { TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); EXPECT_CALL(outgoing_transport, SendRtcp(_, _)) .WillRepeatedly(InvokeWithoutArgs([&] { EXPECT_TRUE(queue.IsCurrent()); @@ -95,7 +95,7 @@ TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueue) { TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); auto rtcp_transceiver = std::make_unique(config); queue.PostTask([&] { @@ -111,7 +111,7 @@ TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlocking) { NiceMock outgoing_transport; RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); auto* rtcp_transceiver = new RtcpTransceiver(config); rtcp_transceiver->SendCompoundPacket(); @@ -132,7 +132,7 @@ TEST(RtcpTransceiverTest, MaySendPacketsAfterDestructor) { // i.e. Be careful! TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); auto* rtcp_transceiver = new RtcpTransceiver(config); rtc::Event heavy_task; @@ -163,7 +163,7 @@ TEST(RtcpTransceiverTest, DoesntPostToRtcpObserverAfterCallToRemove) { TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &null_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); RtcpTransceiver rtcp_transceiver(config); rtc::Event observer_deleted; @@ -190,7 +190,7 @@ TEST(RtcpTransceiverTest, RemoveMediaReceiverRtcpObserverIsNonBlocking) { TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &null_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); RtcpTransceiver rtcp_transceiver(config); auto observer = std::make_unique(); rtcp_transceiver.AddMediaReceiverRtcpObserver(kRemoteSsrc, observer.get()); @@ -214,7 +214,7 @@ TEST(RtcpTransceiverTest, CanCallSendCompoundPacketFromAnyThread) { TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); EXPECT_CALL(outgoing_transport, SendRtcp(_, _)) // If test is slow, a periodic task may send an extra packet. @@ -243,7 +243,7 @@ TEST(RtcpTransceiverTest, DoesntSendPacketsAfterStopCallback) { TaskQueueForTest queue("rtcp"); RtcpTransceiverConfig config; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); config.schedule_periodic_compound_packets = true; auto rtcp_transceiver = std::make_unique(config); @@ -265,7 +265,7 @@ TEST(RtcpTransceiverTest, SendsCombinedRtcpPacketOnTaskQueue) { RtcpTransceiverConfig config; config.feedback_ssrc = kSenderSsrc; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); config.schedule_periodic_compound_packets = false; RtcpTransceiver rtcp_transceiver(config); @@ -302,7 +302,7 @@ TEST(RtcpTransceiverTest, SendFrameIntraRequestDefaultsToNewRequest) { RtcpTransceiverConfig config; config.feedback_ssrc = kSenderSsrc; config.outgoing_transport = &outgoing_transport; - config.task_queue = &queue; + config.task_queue = queue.Get(); config.schedule_periodic_compound_packets = false; RtcpTransceiver rtcp_transceiver(config);