Migrate RtcpTransceiver to use webrtc::TaskQueueBase instead of rtc::TaskQueue
This changes removes an extra layer of indirection since RtcpTransceiver doesn't own TaskQueue it uses. Bug: None Change-Id: Ie1ef4cd8c3fb18a8e0b7ddaf0d6a319392b9e9f7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/126040 Reviewed-by: Per Kjellander <perkj@webrtc.org> Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30704}
This commit is contained in:
parent
8e9fd4857e
commit
f351cfffe2
@ -320,10 +320,10 @@ rtc_library("rtcp_transceiver") {
|
||||
"../../api:array_view",
|
||||
"../../api:rtp_headers",
|
||||
"../../api:transport_api",
|
||||
"../../api/task_queue",
|
||||
"../../api/video:video_bitrate_allocation",
|
||||
"../../rtc_base:checks",
|
||||
"../../rtc_base:rtc_base_approved",
|
||||
"../../rtc_base:rtc_task_queue",
|
||||
"../../rtc_base/task_utils:repeating_task",
|
||||
"../../rtc_base/task_utils:to_queued_task",
|
||||
"../../system_wrappers",
|
||||
|
||||
@ -32,9 +32,10 @@ RtcpTransceiver::~RtcpTransceiver() {
|
||||
if (!rtcp_transceiver_)
|
||||
return;
|
||||
auto rtcp_transceiver = std::move(rtcp_transceiver_);
|
||||
task_queue_->PostTask([rtcp_transceiver = std::move(rtcp_transceiver)] {
|
||||
rtcp_transceiver->StopPeriodicTask();
|
||||
});
|
||||
task_queue_->PostTask(
|
||||
ToQueuedTask([rtcp_transceiver = std::move(rtcp_transceiver)] {
|
||||
rtcp_transceiver->StopPeriodicTask();
|
||||
}));
|
||||
RTC_DCHECK(!rtcp_transceiver_);
|
||||
}
|
||||
|
||||
@ -54,9 +55,9 @@ void RtcpTransceiver::AddMediaReceiverRtcpObserver(
|
||||
MediaReceiverRtcpObserver* observer) {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask([ptr, remote_ssrc, observer] {
|
||||
task_queue_->PostTask(ToQueuedTask([ptr, remote_ssrc, observer] {
|
||||
ptr->AddMediaReceiverRtcpObserver(remote_ssrc, observer);
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::RemoveMediaReceiverRtcpObserver(
|
||||
@ -74,36 +75,38 @@ void RtcpTransceiver::RemoveMediaReceiverRtcpObserver(
|
||||
void RtcpTransceiver::SetReadyToSend(bool ready) {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask([ptr, ready] { ptr->SetReadyToSend(ready); });
|
||||
task_queue_->PostTask(
|
||||
ToQueuedTask([ptr, ready] { ptr->SetReadyToSend(ready); }));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::ReceivePacket(rtc::CopyOnWriteBuffer packet) {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
int64_t now_us = rtc::TimeMicros();
|
||||
task_queue_->PostTask(
|
||||
[ptr, packet, now_us] { ptr->ReceivePacket(packet, now_us); });
|
||||
task_queue_->PostTask(ToQueuedTask(
|
||||
[ptr, packet, now_us] { ptr->ReceivePacket(packet, now_us); }));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::SendCompoundPacket() {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask([ptr] { ptr->SendCompoundPacket(); });
|
||||
task_queue_->PostTask(ToQueuedTask([ptr] { ptr->SendCompoundPacket(); }));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::SetRemb(int64_t bitrate_bps,
|
||||
std::vector<uint32_t> ssrcs) {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask([ptr, bitrate_bps, ssrcs = std::move(ssrcs)]() mutable {
|
||||
ptr->SetRemb(bitrate_bps, std::move(ssrcs));
|
||||
});
|
||||
task_queue_->PostTask(
|
||||
ToQueuedTask([ptr, bitrate_bps, ssrcs = std::move(ssrcs)]() mutable {
|
||||
ptr->SetRemb(bitrate_bps, std::move(ssrcs));
|
||||
}));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::UnsetRemb() {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask([ptr] { ptr->UnsetRemb(); });
|
||||
task_queue_->PostTask(ToQueuedTask([ptr] { ptr->UnsetRemb(); }));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::SendCombinedRtcpPacket(
|
||||
@ -111,25 +114,26 @@ void RtcpTransceiver::SendCombinedRtcpPacket(
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask(
|
||||
[ptr, rtcp_packets = std::move(rtcp_packets)]() mutable {
|
||||
ToQueuedTask([ptr, rtcp_packets = std::move(rtcp_packets)]() mutable {
|
||||
ptr->SendCombinedRtcpPacket(std::move(rtcp_packets));
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::SendNack(uint32_t ssrc,
|
||||
std::vector<uint16_t> sequence_numbers) {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask(
|
||||
task_queue_->PostTask(ToQueuedTask(
|
||||
[ptr, ssrc, sequence_numbers = std::move(sequence_numbers)]() mutable {
|
||||
ptr->SendNack(ssrc, std::move(sequence_numbers));
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::SendPictureLossIndication(uint32_t ssrc) {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask([ptr, ssrc] { ptr->SendPictureLossIndication(ssrc); });
|
||||
task_queue_->PostTask(
|
||||
ToQueuedTask([ptr, ssrc] { ptr->SendPictureLossIndication(ssrc); }));
|
||||
}
|
||||
|
||||
void RtcpTransceiver::SendFullIntraRequest(std::vector<uint32_t> ssrcs) {
|
||||
@ -140,9 +144,10 @@ void RtcpTransceiver::SendFullIntraRequest(std::vector<uint32_t> ssrcs,
|
||||
bool new_request) {
|
||||
RTC_CHECK(rtcp_transceiver_);
|
||||
RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
|
||||
task_queue_->PostTask([ptr, ssrcs = std::move(ssrcs), new_request] {
|
||||
ptr->SendFullIntraRequest(ssrcs, new_request);
|
||||
});
|
||||
task_queue_->PostTask(
|
||||
ToQueuedTask([ptr, ssrcs = std::move(ssrcs), new_request] {
|
||||
ptr->SendFullIntraRequest(ssrcs, new_request);
|
||||
}));
|
||||
}
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
@ -16,10 +16,10 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "api/task_queue/task_queue_base.h"
|
||||
#include "modules/rtp_rtcp/source/rtcp_transceiver_config.h"
|
||||
#include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
|
||||
namespace webrtc {
|
||||
//
|
||||
@ -93,7 +93,7 @@ class RtcpTransceiver : public RtcpFeedbackSenderInterface {
|
||||
void SendFullIntraRequest(std::vector<uint32_t> ssrcs, bool new_request);
|
||||
|
||||
private:
|
||||
rtc::TaskQueue* const task_queue_;
|
||||
TaskQueueBase* const task_queue_;
|
||||
std::unique_ptr<RtcpTransceiverImpl> rtcp_transceiver_;
|
||||
};
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ bool RtcpTransceiverConfig::Validate() const {
|
||||
<< "ms between reports should be positive.";
|
||||
return false;
|
||||
}
|
||||
if (schedule_periodic_compound_packets && !task_queue) {
|
||||
if (schedule_periodic_compound_packets && task_queue == nullptr) {
|
||||
RTC_LOG(LS_ERROR) << debug_id
|
||||
<< "missing task queue for periodic compound packets";
|
||||
return false;
|
||||
|
||||
@ -14,9 +14,9 @@
|
||||
#include <string>
|
||||
|
||||
#include "api/rtp_headers.h"
|
||||
#include "api/task_queue/task_queue_base.h"
|
||||
#include "api/video/video_bitrate_allocation.h"
|
||||
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
#include "system_wrappers/include/ntp_time.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -65,7 +65,7 @@ struct RtcpTransceiverConfig {
|
||||
Transport* outgoing_transport = nullptr;
|
||||
|
||||
// Queue for scheduling delayed tasks, e.g. sending periodic compound packets.
|
||||
rtc::TaskQueue* task_queue = nullptr;
|
||||
TaskQueueBase* task_queue = nullptr;
|
||||
|
||||
// Rtcp report block generator for outgoing receiver reports.
|
||||
ReceiveStatisticsProvider* receive_statistics = nullptr;
|
||||
|
||||
@ -32,8 +32,8 @@
|
||||
#include "modules/rtp_rtcp/source/time_util.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
#include "rtc_base/task_utils/repeating_task.h"
|
||||
#include "rtc_base/task_utils/to_queued_task.h"
|
||||
#include "rtc_base/time_utils.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -92,9 +92,9 @@ RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
|
||||
: config_(config), ready_to_send_(config.initial_ready_to_send) {
|
||||
RTC_CHECK(config_.Validate());
|
||||
if (ready_to_send_ && config_.schedule_periodic_compound_packets) {
|
||||
config_.task_queue->PostTask([this] {
|
||||
config_.task_queue->PostTask(ToQueuedTask([this] {
|
||||
SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms);
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@ -342,7 +342,7 @@ void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() {
|
||||
|
||||
void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
|
||||
periodic_task_handle_ = RepeatingTaskHandle::DelayedStart(
|
||||
config_.task_queue->Get(), TimeDelta::Millis(delay_ms), [this] {
|
||||
config_.task_queue, TimeDelta::Millis(delay_ms), [this] {
|
||||
RTC_DCHECK(config_.schedule_periodic_compound_packets);
|
||||
RTC_DCHECK(ready_to_send_);
|
||||
SendPeriodicCompoundPacket();
|
||||
|
||||
@ -141,7 +141,7 @@ TEST(RtcpTransceiverImplTest, NeedToStopPeriodicTaskToDestroyOnTaskQueue) {
|
||||
FakeRtcpTransport transport;
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config = DefaultTestConfig();
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
config.schedule_periodic_compound_packets = true;
|
||||
config.outgoing_transport = &transport;
|
||||
auto* rtcp_transceiver = new RtcpTransceiverImpl(config);
|
||||
@ -161,7 +161,7 @@ TEST(RtcpTransceiverImplTest, CanDestroyAfterTaskQueue) {
|
||||
FakeRtcpTransport transport;
|
||||
auto* queue = new TaskQueueForTest("rtcp");
|
||||
RtcpTransceiverConfig config = DefaultTestConfig();
|
||||
config.task_queue = queue;
|
||||
config.task_queue = queue->Get();
|
||||
config.schedule_periodic_compound_packets = true;
|
||||
config.outgoing_transport = &transport;
|
||||
auto* rtcp_transceiver = new RtcpTransceiverImpl(config);
|
||||
@ -178,7 +178,7 @@ TEST(RtcpTransceiverImplTest, DelaysSendingFirstCompondPacket) {
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &transport;
|
||||
config.initial_report_delay_ms = 10;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
absl::optional<RtcpTransceiverImpl> rtcp_transceiver;
|
||||
|
||||
int64_t started_ms = rtc::TimeMillis();
|
||||
@ -204,7 +204,7 @@ TEST(RtcpTransceiverImplTest, PeriodicallySendsPackets) {
|
||||
config.outgoing_transport = &transport;
|
||||
config.initial_report_delay_ms = 0;
|
||||
config.report_period_ms = kReportPeriodMs;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
absl::optional<RtcpTransceiverImpl> rtcp_transceiver;
|
||||
int64_t time_just_before_1st_packet_ms = 0;
|
||||
queue.PostTask([&] {
|
||||
@ -238,7 +238,7 @@ TEST(RtcpTransceiverImplTest, SendCompoundPacketDelaysPeriodicSendPackets) {
|
||||
config.outgoing_transport = &transport;
|
||||
config.initial_report_delay_ms = 0;
|
||||
config.report_period_ms = kReportPeriodMs;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
absl::optional<RtcpTransceiverImpl> rtcp_transceiver;
|
||||
queue.PostTask([&] { rtcp_transceiver.emplace(config); });
|
||||
|
||||
@ -324,7 +324,7 @@ TEST(RtcpTransceiverImplTest, SendsPeriodicRtcpWhenNetworkStateIsUp) {
|
||||
config.schedule_periodic_compound_packets = true;
|
||||
config.initial_ready_to_send = false;
|
||||
config.outgoing_transport = &transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
absl::optional<RtcpTransceiverImpl> rtcp_transceiver;
|
||||
rtcp_transceiver.emplace(config);
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) {
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
|
||||
.WillRepeatedly(InvokeWithoutArgs([&] {
|
||||
EXPECT_TRUE(queue.IsCurrent());
|
||||
@ -75,7 +75,7 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) {
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
|
||||
.WillRepeatedly(InvokeWithoutArgs([&] {
|
||||
EXPECT_TRUE(queue.IsCurrent());
|
||||
@ -95,7 +95,7 @@ TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueue) {
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
auto rtcp_transceiver = std::make_unique<RtcpTransceiver>(config);
|
||||
|
||||
queue.PostTask([&] {
|
||||
@ -111,7 +111,7 @@ TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlocking) {
|
||||
NiceMock<MockTransport> outgoing_transport;
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
auto* rtcp_transceiver = new RtcpTransceiver(config);
|
||||
rtcp_transceiver->SendCompoundPacket();
|
||||
|
||||
@ -132,7 +132,7 @@ TEST(RtcpTransceiverTest, MaySendPacketsAfterDestructor) { // i.e. Be careful!
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
auto* rtcp_transceiver = new RtcpTransceiver(config);
|
||||
|
||||
rtc::Event heavy_task;
|
||||
@ -163,7 +163,7 @@ TEST(RtcpTransceiverTest, DoesntPostToRtcpObserverAfterCallToRemove) {
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &null_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
RtcpTransceiver rtcp_transceiver(config);
|
||||
rtc::Event observer_deleted;
|
||||
|
||||
@ -190,7 +190,7 @@ TEST(RtcpTransceiverTest, RemoveMediaReceiverRtcpObserverIsNonBlocking) {
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &null_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
RtcpTransceiver rtcp_transceiver(config);
|
||||
auto observer = std::make_unique<MockMediaReceiverRtcpObserver>();
|
||||
rtcp_transceiver.AddMediaReceiverRtcpObserver(kRemoteSsrc, observer.get());
|
||||
@ -214,7 +214,7 @@ TEST(RtcpTransceiverTest, CanCallSendCompoundPacketFromAnyThread) {
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
|
||||
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
|
||||
// If test is slow, a periodic task may send an extra packet.
|
||||
@ -243,7 +243,7 @@ TEST(RtcpTransceiverTest, DoesntSendPacketsAfterStopCallback) {
|
||||
TaskQueueForTest queue("rtcp");
|
||||
RtcpTransceiverConfig config;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
config.schedule_periodic_compound_packets = true;
|
||||
|
||||
auto rtcp_transceiver = std::make_unique<RtcpTransceiver>(config);
|
||||
@ -265,7 +265,7 @@ TEST(RtcpTransceiverTest, SendsCombinedRtcpPacketOnTaskQueue) {
|
||||
RtcpTransceiverConfig config;
|
||||
config.feedback_ssrc = kSenderSsrc;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
config.schedule_periodic_compound_packets = false;
|
||||
RtcpTransceiver rtcp_transceiver(config);
|
||||
|
||||
@ -302,7 +302,7 @@ TEST(RtcpTransceiverTest, SendFrameIntraRequestDefaultsToNewRequest) {
|
||||
RtcpTransceiverConfig config;
|
||||
config.feedback_ssrc = kSenderSsrc;
|
||||
config.outgoing_transport = &outgoing_transport;
|
||||
config.task_queue = &queue;
|
||||
config.task_queue = queue.Get();
|
||||
config.schedule_periodic_compound_packets = false;
|
||||
RtcpTransceiver rtcp_transceiver(config);
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user