From a722d2ac1efb50cc0a84d8cb1809d76ef6a3fc1d Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Tue, 19 Jan 2021 09:00:18 +0100 Subject: [PATCH] Add DeliverPacketAsync method to PacketReceiver. This change includes a basic first implementation of the method in Call. The implementation assumes being called on the network thread and simply posts to the worker thread which is what currently happens inside the BaseChannel class (but I'm that moving out of there). Bug: webrtc:11993 Change-Id: Id94934808f655071ea9dc87ab20d2a4d8ca7e61e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202255 Reviewed-by: Niels Moller Commit-Queue: Tommi Cr-Commit-Position: refs/heads/master@{#33033} --- call/call.cc | 31 +++++++++++++++++++++++++++++++ call/packet_receiver.h | 28 ++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/call/call.cc b/call/call.cc index 6f407fc0f0..dd2a1261fd 100644 --- a/call/call.cc +++ b/call/call.cc @@ -268,6 +268,10 @@ class Call final : public webrtc::Call, DeliveryStatus DeliverPacket(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) override; + void DeliverPacketAsync(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us, + PacketCallback callback) override; // Implements RecoveredPacketReceiver. void OnRecoveredPacket(const uint8_t* packet, size_t length) override; @@ -321,6 +325,7 @@ class Call final : public webrtc::Call, Clock* const clock_; TaskQueueFactory* const task_queue_factory_; TaskQueueBase* const worker_thread_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_; const int num_cpu_cores_; const rtc::scoped_refptr module_process_thread_; @@ -625,6 +630,8 @@ Call::Call(Clock* clock, RTC_DCHECK(config.trials != nullptr); RTC_DCHECK(worker_thread_->IsCurrent()); + network_thread_.Detach(); + // Do not remove this call; it is here to convince the compiler that the // WebRTC source timestamp string needs to be in the final binary. LoadWebRTCVersionInRegister(); @@ -1418,6 +1425,30 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket( return DeliverRtp(media_type, std::move(packet), packet_time_us); } +void Call::DeliverPacketAsync(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us, + PacketCallback callback) { + RTC_DCHECK_RUN_ON(&network_thread_); + + TaskQueueBase* network_thread = rtc::Thread::Current(); + RTC_DCHECK(network_thread); + + worker_thread_->PostTask(ToQueuedTask( + task_safety_, [this, network_thread, media_type, p = std::move(packet), + packet_time_us, cb = std::move(callback)] { + RTC_DCHECK_RUN_ON(worker_thread_); + DeliveryStatus status = DeliverPacket(media_type, p, packet_time_us); + if (cb) { + network_thread->PostTask( + ToQueuedTask([cb = std::move(cb), status, media_type, + p = std::move(p), packet_time_us]() { + cb(status, media_type, std::move(p), packet_time_us); + })); + } + })); +} + void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { RTC_DCHECK_RUN_ON(worker_thread_); RtpPacketReceived parsed_packet; diff --git a/call/packet_receiver.h b/call/packet_receiver.h index df57d8f4f4..f18ee65c70 100644 --- a/call/packet_receiver.h +++ b/call/packet_receiver.h @@ -11,8 +11,10 @@ #define CALL_PACKET_RECEIVER_H_ #include +#include #include #include +#include #include #include "api/media_types.h" @@ -28,6 +30,32 @@ class PacketReceiver { DELIVERY_PACKET_ERROR, }; + // Definition of the callback to execute when packet delivery is complete. + // The callback will be issued on the same thread as called DeliverPacket. + typedef std::function< + void(DeliveryStatus, MediaType, rtc::CopyOnWriteBuffer, int64_t)> + PacketCallback; + + // Asynchronously handle packet delivery and report back to the caller when + // delivery of the packet has completed. + // Note that if the packet is invalid or can be processed without the need of + // asynchronous operations that the |callback| may have been called before + // the function returns. + // TODO(bugs.webrtc.org/11993): This function is meant to be called on the + // network thread exclusively but while the code is being updated to align + // with those goals, it may be called either on the worker or network threads. + // Update docs etc when the work has been completed. Once we're done with the + // updates, we might be able to go back to returning the status from this + // function instead of having to report it via a callback. + virtual void DeliverPacketAsync(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us, + PacketCallback callback) { + DeliveryStatus status = DeliverPacket(media_type, packet, packet_time_us); + if (callback) + callback(status, media_type, std::move(packet), packet_time_us); + } + virtual DeliveryStatus DeliverPacket(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) = 0;