From f2a083f262d86737893e774c696716742fcab3e3 Mon Sep 17 00:00:00 2001 From: Andrey Logvin Date: Wed, 25 Jan 2023 08:57:56 +0000 Subject: [PATCH] Revert "Delete PacketReceiver::DeliverPacket from all implementations" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 897ea04db5db2e591e28bd884191be58d9bcdc63. Reason for revert: Speculative revert as it could be the reason why perf tests started failing: https://ci.chromium.org/p/webrtc/g/perf/console?limit=200 Original change's description: > Delete PacketReceiver::DeliverPacket from all implementations > > And fix tests that still depend on extensions to be known by the receiver. > > Change-Id: I62227829af81af07769189e547f1cdb8ed4d06b3 > > Bug: webrtc:7135,webrtc:14795 > Change-Id: I62227829af81af07769189e547f1cdb8ed4d06b3 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/290996 > Commit-Queue: Per Kjellander > Reviewed-by: Danil Chapovalov > Reviewed-by: Erik Språng > Cr-Commit-Position: refs/heads/main@{#39184} Bug: webrtc:7135,webrtc:14795,b/266658815 Change-Id: I9d03f4952938d176ffee110a707acadc1846457c No-Presubmit: true No-Tree-Checks: true No-Try: true Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/291400 Commit-Queue: Andrey Logvin Bot-Commit: rubber-stamper@appspot.gserviceaccount.com Owners-Override: Andrey Logvin Reviewed-by: Jeremy Leconte Cr-Commit-Position: refs/heads/main@{#39189} --- call/bitrate_estimator_tests.cc | 5 -- call/call.cc | 73 ++++++++++++++++++++++++++++ call/degraded_call.cc | 19 ++++++++ call/degraded_call.h | 3 ++ call/fake_network_pipe.cc | 14 ++++++ call/fake_network_pipe.h | 6 +++ call/fake_network_pipe_unittest.cc | 4 ++ call/packet_receiver.h | 26 +++++++++- media/engine/fake_webrtc_call.cc | 15 ++++++ media/engine/fake_webrtc_call.h | 4 ++ test/direct_transport.cc | 69 ++++++++++++++++++-------- test/direct_transport.h | 9 ++++ video/end_to_end_tests/ssrc_tests.cc | 3 +- 13 files changed, 220 insertions(+), 30 deletions(-) diff --git a/call/bitrate_estimator_tests.cc b/call/bitrate_estimator_tests.cc index 6dedc59059..afa3136e0a 100644 --- a/call/bitrate_estimator_tests.cc +++ b/call/bitrate_estimator_tests.cc @@ -110,11 +110,6 @@ class BitrateEstimatorTest : public test::CallTest { virtual void SetUp() { SendTask(task_queue(), [this]() { - RegisterRtpExtension( - RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId)); - RegisterRtpExtension( - RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId)); - CreateCalls(); CreateSendTransport(BuiltInNetworkBehaviorConfig(), /*observer=*/nullptr); diff --git a/call/call.cc b/call/call.cc index e676d7a30a..218505cdea 100644 --- a/call/call.cc +++ b/call/call.cc @@ -241,6 +241,11 @@ class Call final : public webrtc::Call, TaskQueueBase* network_thread() const override; TaskQueueBase* worker_thread() const override; + // Implements PacketReceiver. + DeliveryStatus DeliverPacket(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) override; + void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override; void DeliverRtpPacket( @@ -334,6 +339,9 @@ class Call final : public webrtc::Call, void DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) RTC_RUN_ON(network_thread_); + DeliveryStatus DeliverRtp(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) RTC_RUN_ON(worker_thread_); AudioReceiveStreamImpl* FindAudioStreamForSyncGroup( absl::string_view sync_group) RTC_RUN_ON(worker_thread_); @@ -343,6 +351,7 @@ class Call final : public webrtc::Call, MediaType media_type) RTC_RUN_ON(worker_thread_); + bool IdentifyReceivedPacket(RtpPacketReceived& packet); bool RegisterReceiveStream(uint32_t ssrc, ReceiveStreamInterface* stream); bool UnregisterReceiveStream(uint32_t ssrc); @@ -1466,6 +1475,57 @@ void Call::DeliverRtpPacket( } } +PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + // TODO(perkj, https://bugs.webrtc.org/7135): Deprecate this method and + // direcly use DeliverRtpPacket. + TRACE_EVENT0("webrtc", "Call::DeliverRtp"); + RTC_DCHECK_NE(media_type, MediaType::ANY); + + RtpPacketReceived parsed_packet; + if (!parsed_packet.Parse(std::move(packet))) + return DELIVERY_PACKET_ERROR; + + if (packet_time_us != -1) { + parsed_packet.set_arrival_time(Timestamp::Micros(packet_time_us)); + } else { + parsed_packet.set_arrival_time(clock_->CurrentTime()); + } + + if (!IdentifyReceivedPacket(parsed_packet)) + return DELIVERY_UNKNOWN_SSRC; + if (media_type == MediaType::VIDEO) { + parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency); + } + DeliverRtpPacket(media_type, std::move(parsed_packet), + [](const webrtc::RtpPacketReceived& packet) { + // If IdentifyReceivedPacket returns true, a packet is + // expected to be demuxable. + RTC_DCHECK_NOTREACHED(); + return false; + }); + return DELIVERY_OK; +} + +PacketReceiver::DeliveryStatus Call::DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + if (IsRtcpPacket(packet)) { + RTC_DCHECK_RUN_ON(network_thread_); + worker_thread_->PostTask(SafeTask( + task_safety_.flag(), [this, packet = std::move(packet)]() mutable { + RTC_DCHECK_RUN_ON(worker_thread_); + DeliverRtcpPacket(std::move(packet)); + })); + return DELIVERY_OK; + } + + RTC_DCHECK_RUN_ON(worker_thread_); + return DeliverRtp(media_type, std::move(packet), packet_time_us); +} + void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, MediaType media_type) { RTC_DCHECK_RUN_ON(worker_thread_); @@ -1489,6 +1549,19 @@ void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, } } +bool Call::IdentifyReceivedPacket(RtpPacketReceived& packet) { + RTC_DCHECK_RUN_ON(&receive_11993_checker_); + auto it = receive_rtp_config_.find(packet.Ssrc()); + if (it == receive_rtp_config_.end()) { + RTC_DLOG(LS_WARNING) << "receive_rtp_config_ lookup failed for ssrc " + << packet.Ssrc(); + return false; + } + + packet.IdentifyExtensions(it->second->GetRtpExtensionMap()); + return true; +} + bool Call::RegisterReceiveStream(uint32_t ssrc, ReceiveStreamInterface* stream) { RTC_DCHECK_RUN_ON(&receive_11993_checker_); diff --git a/call/degraded_call.cc b/call/degraded_call.cc index fc76c7be5c..50349c1086 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -346,6 +346,25 @@ void DegradedCall::OnSentPacket(const rtc::SentPacket& sent_packet) { call_->OnSentPacket(sent_packet); } +PacketReceiver::DeliveryStatus DegradedCall::DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + RTC_DCHECK_RUN_ON(&received_packet_sequence_checker_); + PacketReceiver::DeliveryStatus status = receive_pipe_->DeliverPacket( + media_type, std::move(packet), packet_time_us); + // This is not optimal, but there are many places where there are thread + // checks that fail if we're not using the worker thread call into this + // method. If we want to fix this we probably need a task queue to do handover + // of all overriden methods, which feels like overkill for the current use + // case. + // By just having this thread call out via the Process() method we work around + // that, with the tradeoff that a non-zero delay may become a little larger + // than anticipated at very low packet rates. + receive_pipe_->Process(); + return status; +} + void DegradedCall::DeliverRtpPacket( MediaType media_type, RtpPacketReceived packet, diff --git a/call/degraded_call.h b/call/degraded_call.h index 98e7891d6a..6a22b69e4a 100644 --- a/call/degraded_call.h +++ b/call/degraded_call.h @@ -113,6 +113,9 @@ class DegradedCall : public Call, private PacketReceiver { protected: // Implements PacketReceiver. + DeliveryStatus DeliverPacket(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) override; void DeliverRtpPacket( MediaType media_type, RtpPacketReceived packet, diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc index 8879927a5b..76adfe3cf0 100644 --- a/call/fake_network_pipe.cc +++ b/call/fake_network_pipe.cc @@ -191,6 +191,16 @@ bool FakeNetworkPipe::SendRtcp(const uint8_t* packet, return true; } +PacketReceiver::DeliveryStatus FakeNetworkPipe::DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + return EnqueuePacket(std::move(packet), absl::nullopt, false, media_type, + packet_time_us) + ? PacketReceiver::DELIVERY_OK + : PacketReceiver::DELIVERY_PACKET_ERROR; +} + void FakeNetworkPipe::DeliverRtpPacket( MediaType media_type, RtpPacketReceived packet, @@ -383,6 +393,10 @@ void FakeNetworkPipe::DeliverNetworkPacket(NetworkPacket* packet) { << packet.Ssrc() << " seq : " << packet.SequenceNumber(); return false; }); + } else { + receiver_->DeliverPacket(packet->media_type(), + std::move(*packet->raw_packet()), + packet_time_us); } } } diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h index ba4c89e382..2649a00904 100644 --- a/call/fake_network_pipe.h +++ b/call/fake_network_pipe.h @@ -162,6 +162,12 @@ class FakeNetworkPipe : public SimulatedPacketReceiverInterface { OnUndemuxablePacketHandler undemuxable_packet_handler) override; void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override; + // TODO(perkj, https://bugs.webrtc.org/7135): Remove once implementations + // dont use it. + PacketReceiver::DeliveryStatus DeliverPacket(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) override; + // Processes the network queues and trigger PacketReceiver::IncomingPacket for // packets ready to be delivered. void Process() override; diff --git a/call/fake_network_pipe_unittest.cc b/call/fake_network_pipe_unittest.cc index 31f97fc85c..d3f7734893 100644 --- a/call/fake_network_pipe_unittest.cc +++ b/call/fake_network_pipe_unittest.cc @@ -31,6 +31,10 @@ using ::testing::WithArg; namespace webrtc { class MockReceiver : public PacketReceiver { public: + MOCK_METHOD(DeliveryStatus, + DeliverPacket, + (MediaType, rtc::CopyOnWriteBuffer, int64_t), + (override)); MOCK_METHOD(void, DeliverRtcpPacket, (rtc::CopyOnWriteBuffer packet), diff --git a/call/packet_receiver.h b/call/packet_receiver.h index c7f55ac46c..a36ab44ea7 100644 --- a/call/packet_receiver.h +++ b/call/packet_receiver.h @@ -20,8 +20,26 @@ namespace webrtc { class PacketReceiver { public: + enum DeliveryStatus { + DELIVERY_OK, + DELIVERY_UNKNOWN_SSRC, + DELIVERY_PACKET_ERROR, + }; + + // TODO(perkj, https://bugs.webrtc.org/7135): Remove this method. This method + // is no longer used by PeerConnections. Some tests still use it. + virtual DeliveryStatus DeliverPacket(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + RTC_CHECK_NOTREACHED(); + } + // Demux RTCP packets. Must be called on the worker thread. - virtual void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) = 0; + virtual void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) { + // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and + // FakeNetworkPipe. + RTC_CHECK_NOTREACHED(); + } // Invoked once when a packet packet is received that can not be demuxed. // If the method returns true, a new attempt is made to demux the packet. @@ -32,7 +50,11 @@ class PacketReceiver { virtual void DeliverRtpPacket( MediaType media_type, RtpPacketReceived packet, - OnUndemuxablePacketHandler undemuxable_packet_handler) = 0; + OnUndemuxablePacketHandler undemuxable_packet_handler) { + // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and + // FakeNetworkPipe. + RTC_CHECK_NOTREACHED(); + } protected: virtual ~PacketReceiver() {} diff --git a/media/engine/fake_webrtc_call.cc b/media/engine/fake_webrtc_call.cc index 6408e4e951..a20b826b41 100644 --- a/media/engine/fake_webrtc_call.cc +++ b/media/engine/fake_webrtc_call.cc @@ -665,6 +665,21 @@ webrtc::PacketReceiver* FakeCall::Receiver() { return this; } +webrtc::PacketReceiver::DeliveryStatus FakeCall::DeliverPacket( + webrtc::MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + RTC_DCHECK(webrtc::IsRtpPacket(packet)); + uint32_t ssrc = ParseRtpSsrc(packet); + webrtc::Timestamp arrival_time = + packet_time_us > -1 ? webrtc::Timestamp::Micros(packet_time_us) + : webrtc::Timestamp::Zero(); + if (DeliverPacketInternal(media_type, ssrc, packet, arrival_time)) { + return DELIVERY_OK; + } + return DELIVERY_UNKNOWN_SSRC; +} + void FakeCall::DeliverRtpPacket( webrtc::MediaType media_type, webrtc::RtpPacketReceived packet, diff --git a/media/engine/fake_webrtc_call.h b/media/engine/fake_webrtc_call.h index 954bd16254..f7e3de5efb 100644 --- a/media/engine/fake_webrtc_call.h +++ b/media/engine/fake_webrtc_call.h @@ -442,6 +442,10 @@ class FakeCall final : public webrtc::Call, public webrtc::PacketReceiver { webrtc::PacketReceiver* Receiver() override; + DeliveryStatus DeliverPacket(webrtc::MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) override; + void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override {} void DeliverRtpPacket( diff --git a/test/direct_transport.cc b/test/direct_transport.cc index 260497947c..3ae0216186 100644 --- a/test/direct_transport.cc +++ b/test/direct_transport.cc @@ -40,6 +40,18 @@ MediaType Demuxer::GetMediaType(const uint8_t* packet_data, return MediaType::ANY; } +DirectTransport::DirectTransport( + TaskQueueBase* task_queue, + std::unique_ptr pipe, + Call* send_call, + const std::map& payload_type_map) + : DirectTransport(task_queue, + std::move(pipe), + send_call, + payload_type_map, + {}, + {}) {} + DirectTransport::DirectTransport( TaskQueueBase* task_queue, std::unique_ptr pipe, @@ -51,6 +63,7 @@ DirectTransport::DirectTransport( task_queue_(task_queue), demuxer_(payload_type_map), fake_network_(std::move(pipe)), + use_legacy_send_(audio_extensions.empty() && video_extensions.empty()), audio_extensions_(audio_extensions), video_extensions_(video_extensions) { Start(); @@ -76,27 +89,30 @@ bool DirectTransport::SendRtp(const uint8_t* data, send_call_->OnSentPacket(sent_packet); } - const RtpHeaderExtensionMap* extensions = nullptr; - MediaType media_type = demuxer_.GetMediaType(data, length); - switch (demuxer_.GetMediaType(data, length)) { - case webrtc::MediaType::AUDIO: - extensions = &audio_extensions_; - break; - case webrtc::MediaType::VIDEO: - extensions = &video_extensions_; - break; - default: - RTC_CHECK_NOTREACHED(); + if (use_legacy_send_) { + LegacySendPacket(data, length); + } else { + const RtpHeaderExtensionMap* extensions = nullptr; + MediaType media_type = demuxer_.GetMediaType(data, length); + switch (demuxer_.GetMediaType(data, length)) { + case webrtc::MediaType::AUDIO: + extensions = &audio_extensions_; + break; + case webrtc::MediaType::VIDEO: + extensions = &video_extensions_; + break; + default: + RTC_CHECK_NOTREACHED(); + } + RtpPacketReceived packet(extensions, Timestamp::Micros(rtc::TimeMicros())); + if (media_type == MediaType::VIDEO) { + packet.set_payload_type_frequency(kVideoPayloadTypeFrequency); + } + RTC_CHECK(packet.Parse(rtc::CopyOnWriteBuffer(data, length))); + fake_network_->DeliverRtpPacket( + media_type, std::move(packet), + [](const RtpPacketReceived& packet) { return false; }); } - RtpPacketReceived packet(extensions, Timestamp::Micros(rtc::TimeMicros())); - if (media_type == MediaType::VIDEO) { - packet.set_payload_type_frequency(kVideoPayloadTypeFrequency); - } - RTC_CHECK(packet.Parse(rtc::CopyOnWriteBuffer(data, length))); - fake_network_->DeliverRtpPacket( - media_type, std::move(packet), - [](const RtpPacketReceived& packet) { return false; }); - MutexLock lock(&process_lock_); if (!next_process_task_.Running()) ProcessPackets(); @@ -104,13 +120,24 @@ bool DirectTransport::SendRtp(const uint8_t* data, } bool DirectTransport::SendRtcp(const uint8_t* data, size_t length) { - fake_network_->DeliverRtcpPacket(rtc::CopyOnWriteBuffer(data, length)); + if (use_legacy_send_) { + LegacySendPacket(data, length); + } else { + fake_network_->DeliverRtcpPacket(rtc::CopyOnWriteBuffer(data, length)); + } MutexLock lock(&process_lock_); if (!next_process_task_.Running()) ProcessPackets(); return true; } +void DirectTransport::LegacySendPacket(const uint8_t* data, size_t length) { + MediaType media_type = demuxer_.GetMediaType(data, length); + int64_t send_time_us = rtc::TimeMicros(); + fake_network_->DeliverPacket(media_type, rtc::CopyOnWriteBuffer(data, length), + send_time_us); +} + int DirectTransport::GetAverageDelayMs() { return fake_network_->AverageDelay(); } diff --git a/test/direct_transport.h b/test/direct_transport.h index 468e339c0a..4776084ae2 100644 --- a/test/direct_transport.h +++ b/test/direct_transport.h @@ -44,6 +44,14 @@ class Demuxer { // same task-queue - the one that's passed in via the constructor. class DirectTransport : public Transport { public: + // TODO(perkj, https://bugs.webrtc.org/7135): Remove header once downstream + // projects have been updated. + [[deprecated("Use ctor that provide header extensions.")]] DirectTransport( + TaskQueueBase* task_queue, + std::unique_ptr pipe, + Call* send_call, + const std::map& payload_type_map); + DirectTransport(TaskQueueBase* task_queue, std::unique_ptr pipe, Call* send_call, @@ -77,6 +85,7 @@ class DirectTransport : public Transport { const Demuxer demuxer_; const std::unique_ptr fake_network_; + const bool use_legacy_send_; const RtpHeaderExtensionMap audio_extensions_; const RtpHeaderExtensionMap video_extensions_; }; diff --git a/video/end_to_end_tests/ssrc_tests.cc b/video/end_to_end_tests/ssrc_tests.cc index edacde115a..a3bce40fd2 100644 --- a/video/end_to_end_tests/ssrc_tests.cc +++ b/video/end_to_end_tests/ssrc_tests.cc @@ -108,8 +108,7 @@ TEST_F(SsrcEndToEndTest, UnknownRtpPacketTriggersUndemuxablePacketHandler) { std::make_unique( Clock::GetRealTimeClock(), std::make_unique( BuiltInNetworkBehaviorConfig())), - receiver_call_.get(), payload_type_map_, GetRegisteredExtensions(), - GetRegisteredExtensions()); + receiver_call_.get(), payload_type_map_); input_observer = std::make_unique(receiver_call_->Receiver()); send_transport->SetReceiver(input_observer.get());