Add RtpSender OnFirstPacketSent callback.

It works in the same way as the first packet received callback and can be used for latency measurements.

One important detail is that RTCP and probe packets are excluded from triggering the callback.

Bug: b/375148360
Change-Id: I5f99b565f96b622e864669cf227be5534aab0fc7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/366644
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Commit-Queue: Jakob Ivarsson‎ <jakobi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#43309}
This commit is contained in:
Jakob Ivarsson 2024-10-25 14:58:29 +00:00 committed by WebRTC LUCI CQ
parent 99058d99b7
commit 68f4e27794
17 changed files with 174 additions and 2 deletions

View File

@ -36,6 +36,16 @@
namespace webrtc {
class RtpSenderObserverInterface {
public:
// The observer is called when the first media packet is sent for the observed
// sender. It is called immediately if the first packet was already sent.
virtual void OnFirstPacketSent(cricket::MediaType media_type) = 0;
protected:
virtual ~RtpSenderObserverInterface() {}
};
using SetParametersCallback = absl::AnyInvocable<void(RTCError) &&>;
class RTC_EXPORT RtpSenderInterface : public webrtc::RefCountInterface,
@ -88,6 +98,12 @@ class RTC_EXPORT RtpSenderInterface : public webrtc::RefCountInterface,
virtual void SetParametersAsync(const RtpParameters& parameters,
SetParametersCallback callback);
// Sets an observer which gets a callback when the first media packet is sent
// for this sender.
// Does not take ownership of observer.
// Must call SetObserver(nullptr) before the observer is destroyed.
virtual void SetObserver(RtpSenderObserverInterface* observer) {}
// Returns null for a video sender.
virtual rtc::scoped_refptr<DtmfSenderInterface> GetDtmfSender() const = 0;

View File

@ -82,6 +82,7 @@ class MockRtpSender : public RtpSenderInterface {
SetEncoderSelector,
(std::unique_ptr<VideoEncoderFactory::EncoderSelectorInterface>),
(override));
MOCK_METHOD(void, SetObserver, (RtpSenderObserverInterface*), (override));
};
static_assert(!std::is_abstract_v<rtc::RefCountedObject<MockRtpSender>>, "");

View File

@ -207,6 +207,7 @@ bool MediaChannelUtil::TransportForMediaChannels::SendRtp(
included_in_allocation = options.included_in_allocation,
batchable = options.batchable,
last_packet_in_batch = options.last_packet_in_batch,
is_retransmit = options.is_retransmit,
packet = rtc::CopyOnWriteBuffer(packet, kMaxRtpPacketLen)]() mutable {
rtc::PacketOptions rtc_options;
rtc_options.packet_id = packet_id;
@ -217,6 +218,7 @@ bool MediaChannelUtil::TransportForMediaChannels::SendRtp(
included_in_feedback;
rtc_options.info_signaled_after_sent.included_in_allocation =
included_in_allocation;
rtc_options.info_signaled_after_sent.is_media = !is_retransmit;
rtc_options.batchable = batchable;
rtc_options.last_packet_in_batch = last_packet_in_batch;
DoSendPacket(&packet, false, rtc_options);

View File

@ -382,6 +382,13 @@ void BaseChannel::SetFirstPacketReceivedCallback(
on_first_packet_received_ = std::move(callback);
}
void BaseChannel::SetFirstPacketSentCallback(std::function<void()> callback) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(!on_first_packet_sent_ || !callback);
on_first_packet_sent_ = std::move(callback);
}
void BaseChannel::OnTransportReadyToSend(bool ready) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(network_initialized());
@ -430,6 +437,11 @@ bool BaseChannel::SendPacket(bool rtcp,
<< ".";
}
if (on_first_packet_sent_ && options.info_signaled_after_sent.is_media) {
on_first_packet_sent_();
on_first_packet_sent_ = nullptr;
}
return rtcp ? rtp_transport_->SendRtcpPacket(packet, options, PF_SRTP_BYPASS)
: rtp_transport_->SendRtpPacket(packet, options, PF_SRTP_BYPASS);
}

View File

@ -149,6 +149,7 @@ class BaseChannel : public ChannelInterface,
// Used for latency measurements.
void SetFirstPacketReceivedCallback(std::function<void()> callback) override;
void SetFirstPacketSentCallback(std::function<void()> callback) override;
// From RtpTransport - public for testing only
void OnTransportReadyToSend(bool ready);
@ -316,8 +317,10 @@ class BaseChannel : public ChannelInterface,
webrtc::TaskQueueBase* const signaling_thread_;
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> alive_;
// The functions are deleted after they have been called.
std::function<void()> on_first_packet_received_
RTC_GUARDED_BY(network_thread());
std::function<void()> on_first_packet_sent_ RTC_GUARDED_BY(network_thread());
webrtc::RtpTransportInternal* rtp_transport_
RTC_GUARDED_BY(network_thread()) = nullptr;

View File

@ -11,6 +11,7 @@
#ifndef PC_CHANNEL_INTERFACE_H_
#define PC_CHANNEL_INTERFACE_H_
#include <functional>
#include <memory>
#include <string>
#include <vector>
@ -78,6 +79,7 @@ class ChannelInterface {
// Used for latency measurements.
virtual void SetFirstPacketReceivedCallback(
std::function<void()> callback) = 0;
virtual void SetFirstPacketSentCallback(std::function<void()> callback) = 0;
// Channel control
virtual bool SetLocalContent(const MediaContentDescription* content,

View File

@ -201,6 +201,48 @@ TEST_P(PeerConnectionIntegrationTest,
}));
}
TEST_P(PeerConnectionIntegrationTest, RtpSenderObserverOnFirstPacketSent) {
ASSERT_TRUE(CreatePeerConnectionWrappers());
ConnectFakeSignaling();
caller()->AddAudioVideoTracks();
callee()->AddAudioVideoTracks();
// Start offer/answer exchange and wait for it to complete.
caller()->CreateAndSetAndSignalOffer();
ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout);
// Should be one sender each for audio/video.
EXPECT_EQ(2U, caller()->rtp_sender_observers().size());
EXPECT_EQ(2U, callee()->rtp_sender_observers().size());
// Wait for all "first packet sent" callbacks to be fired.
EXPECT_TRUE_WAIT(
absl::c_all_of(caller()->rtp_sender_observers(),
[](const std::unique_ptr<MockRtpSenderObserver>& o) {
return o->first_packet_sent();
}),
kMaxWaitForFramesMs);
EXPECT_TRUE_WAIT(
absl::c_all_of(callee()->rtp_sender_observers(),
[](const std::unique_ptr<MockRtpSenderObserver>& o) {
return o->first_packet_sent();
}),
kMaxWaitForFramesMs);
// If new observers are set after the first packet was already sent, the
// callback should still be invoked.
caller()->ResetRtpSenderObservers();
callee()->ResetRtpSenderObservers();
EXPECT_EQ(2U, caller()->rtp_sender_observers().size());
EXPECT_EQ(2U, callee()->rtp_sender_observers().size());
EXPECT_TRUE(
absl::c_all_of(caller()->rtp_sender_observers(),
[](const std::unique_ptr<MockRtpSenderObserver>& o) {
return o->first_packet_sent();
}));
EXPECT_TRUE(
absl::c_all_of(callee()->rtp_sender_observers(),
[](const std::unique_ptr<MockRtpSenderObserver>& o) {
return o->first_packet_sent();
}));
}
class DummyDtmfObserver : public DtmfSenderObserverInterface {
public:
DummyDtmfObserver() : completed_(false) {}

View File

@ -407,6 +407,23 @@ void RtpSenderBase::SetParametersAsync(const RtpParameters& parameters,
false);
}
void RtpSenderBase::SetObserver(RtpSenderObserverInterface* observer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = observer;
// Deliver any notifications the observer may have missed by being set late.
if (sent_first_packet_ && observer_) {
observer_->OnFirstPacketSent(media_type());
}
}
void RtpSenderBase::NotifyFirstPacketSent() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (observer_) {
observer_->OnFirstPacketSent(media_type());
}
sent_first_packet_ = true;
}
void RtpSenderBase::set_stream_ids(const std::vector<std::string>& stream_ids) {
stream_ids_.clear();
absl::c_copy_if(stream_ids, std::back_inserter(stream_ids_),

View File

@ -106,6 +106,8 @@ class RtpSenderInternal : public RtpSenderInterface {
// selected codecs.
virtual void SetSendCodecs(std::vector<cricket::Codec> send_codecs) = 0;
virtual std::vector<cricket::Codec> GetSendCodecs() const = 0;
virtual void NotifyFirstPacketSent() = 0;
};
// Shared implementation for RtpSenderInternal interface.
@ -230,6 +232,9 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface {
return send_codecs_;
}
void NotifyFirstPacketSent() override;
void SetObserver(RtpSenderObserverInterface* observer) override;
protected:
// If `set_streams_observer` is not null, it is invoked when SetStreams()
// is called. `set_streams_observer` is not owned by this object. If not
@ -288,6 +293,8 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface {
std::vector<std::string> disabled_rids_;
SetStreamsObserver* set_streams_observer_ = nullptr;
RtpSenderObserverInterface* observer_ = nullptr;
bool sent_first_packet_ = false;
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer_;
std::unique_ptr<VideoEncoderFactory::EncoderSelectorInterface>

View File

@ -43,6 +43,7 @@ PROXY_CONSTMETHOD0(rtc::scoped_refptr<DtmfSenderInterface>, GetDtmfSender)
PROXY_METHOD1(void,
SetFrameEncryptor,
rtc::scoped_refptr<FrameEncryptorInterface>)
PROXY_METHOD1(void, SetObserver, RtpSenderObserverInterface*)
PROXY_CONSTMETHOD0(rtc::scoped_refptr<FrameEncryptorInterface>,
GetFrameEncryptor)
PROXY_METHOD1(void, SetStreams, const std::vector<std::string>&)

View File

@ -356,6 +356,7 @@ void RtpTransceiver::SetChannel(
context()->network_thread()->BlockingCall([&]() {
if (channel_) {
channel_->SetFirstPacketReceivedCallback(nullptr);
channel_->SetFirstPacketSentCallback(nullptr);
channel_->SetRtpTransport(nullptr);
channel_to_delete = std::move(channel_);
}
@ -368,6 +369,11 @@ void RtpTransceiver::SetChannel(
thread->PostTask(
SafeTask(std::move(flag), [this]() { OnFirstPacketReceived(); }));
});
channel_->SetFirstPacketSentCallback(
[thread = thread_, flag = signaling_thread_safety_, this]() mutable {
thread->PostTask(
SafeTask(std::move(flag), [this]() { OnFirstPacketSent(); }));
});
});
PushNewMediaChannelAndDeleteChannel(nullptr);
@ -392,6 +398,7 @@ void RtpTransceiver::ClearChannel() {
context()->network_thread()->BlockingCall([&]() {
if (channel_) {
channel_->SetFirstPacketReceivedCallback(nullptr);
channel_->SetFirstPacketSentCallback(nullptr);
channel_->SetRtpTransport(nullptr);
channel_to_delete = std::move(channel_);
}
@ -523,6 +530,12 @@ void RtpTransceiver::OnFirstPacketReceived() {
}
}
void RtpTransceiver::OnFirstPacketSent() {
for (const auto& sender : senders_) {
sender->internal()->NotifyFirstPacketSent();
}
}
rtc::scoped_refptr<RtpSenderInterface> RtpTransceiver::sender() const {
RTC_DCHECK(unified_plan_);
RTC_CHECK_EQ(1u, senders_.size());

View File

@ -302,6 +302,7 @@ class RtpTransceiver : public RtpTransceiverInterface {
}
ConnectionContext* context() const { return context_; }
void OnFirstPacketReceived();
void OnFirstPacketSent();
void StopSendingAndReceiving();
// Delete a channel, and ensure that references to its media channel
// are updated before deleting it.

View File

@ -230,6 +230,25 @@ class MockRtpReceiverObserver : public RtpReceiverObserverInterface {
cricket::MediaType expected_media_type_;
};
class MockRtpSenderObserver : public RtpSenderObserverInterface {
public:
explicit MockRtpSenderObserver(cricket::MediaType media_type)
: expected_media_type_(media_type) {}
void OnFirstPacketSent(cricket::MediaType media_type) override {
ASSERT_EQ(expected_media_type_, media_type);
first_packet_sent_ = true;
}
bool first_packet_sent() const { return first_packet_sent_; }
virtual ~MockRtpSenderObserver() {}
private:
bool first_packet_sent_ = false;
cricket::MediaType expected_media_type_;
};
// Helper class that wraps a peer connection, observes it, and can accept
// signaling messages from another wrapper.
//
@ -335,6 +354,7 @@ class PeerConnectionIntegrationWrapper : public PeerConnectionObserver,
void AddAudioVideoTracks() {
AddAudioTrack();
AddVideoTrack();
ResetRtpSenderObservers();
}
rtc::scoped_refptr<RtpSenderInterface> AddAudioTrack() {
@ -618,6 +638,22 @@ class PeerConnectionIntegrationWrapper : public PeerConnectionObserver,
}
}
const std::vector<std::unique_ptr<MockRtpSenderObserver>>&
rtp_sender_observers() {
return rtp_sender_observers_;
}
void ResetRtpSenderObservers() {
rtp_sender_observers_.clear();
for (const rtc::scoped_refptr<RtpSenderInterface>& sender :
pc()->GetSenders()) {
std::unique_ptr<MockRtpSenderObserver> observer(
new MockRtpSenderObserver(sender->media_type()));
sender->SetObserver(observer.get());
rtp_sender_observers_.push_back(std::move(observer));
}
}
rtc::FakeNetworkManager* network_manager() const {
return fake_network_manager_.get();
}
@ -1126,6 +1162,7 @@ class PeerConnectionIntegrationWrapper : public PeerConnectionObserver,
std::vector<std::unique_ptr<MockDataChannelObserver>> data_observers_;
std::vector<std::unique_ptr<MockRtpReceiverObserver>> rtp_receiver_observers_;
std::vector<std::unique_ptr<MockRtpSenderObserver>> rtp_sender_observers_;
std::vector<PeerConnectionInterface::IceConnectionState>
ice_connection_state_history_;

View File

@ -56,6 +56,10 @@ class MockChannelInterface : public cricket::ChannelInterface {
SetFirstPacketReceivedCallback,
(std::function<void()>),
(override));
MOCK_METHOD(void,
SetFirstPacketSentCallback,
(std::function<void()>),
(override));
MOCK_METHOD(bool,
SetLocalContent,
(const cricket::MediaContentDescription*,

View File

@ -93,6 +93,7 @@ class MockRtpSenderInternal : public RtpSenderInternal {
SetEncoderSelector,
(std::unique_ptr<VideoEncoderFactory::EncoderSelectorInterface>),
(override));
MOCK_METHOD(void, SetObserver, (RtpSenderObserverInterface*), (override));
// RtpSenderInternal methods.
MOCK_METHOD1(SetMediaChannel, void(cricket::MediaSendChannelInterface*));
@ -106,6 +107,7 @@ class MockRtpSenderInternal : public RtpSenderInternal {
MOCK_METHOD1(DisableEncodingLayers,
RTCError(const std::vector<std::string>&));
MOCK_METHOD0(SetTransceiverAsStopped, void());
MOCK_METHOD(void, NotifyFirstPacketSent, (), (override));
};
} // namespace webrtc

View File

@ -44,6 +44,9 @@ struct RTC_EXPORT PacketInfo {
bool included_in_feedback = false;
bool included_in_allocation = false;
// `is_media` is true if this is an audio or video packet, excluding
// retransmissions.
bool is_media = false;
PacketType packet_type = PacketType::kUnknown;
PacketInfoProtocolType protocol = PacketInfoProtocolType::kUnknown;
// A unique id assigned by the network manager, and std::nullopt if not set.

View File

@ -207,6 +207,11 @@ INSTANTIATE_TEST_SUITE_P(
.expected_bwe_min = webrtc::DataRate::KilobitsPerSec(400),
}}));
class MockRtpSenderObserver : public RtpSenderObserverInterface {
public:
MOCK_METHOD(void, OnFirstPacketSent, (cricket::MediaType));
};
// Test that caller and callee BWE rampup even if no media packets are sent.
// - BandWidthEstimationSettings.allow_probe_without_media must be set.
// - A Video RtpTransceiver with RTX support needs to be negotiated.
@ -217,8 +222,12 @@ TEST_P(BweRampupWithInitialProbeTest, BweRampUpBothDirectionsWithoutMedia) {
PeerScenarioClient* caller = s.CreateClient({});
PeerScenarioClient* callee = s.CreateClient({});
auto video_result = caller->pc()->AddTransceiver(cricket::MEDIA_TYPE_VIDEO);
ASSERT_EQ(video_result.error().type(), RTCErrorType::NONE);
auto transceiver = caller->pc()->AddTransceiver(cricket::MEDIA_TYPE_VIDEO);
ASSERT_TRUE(transceiver.error().ok());
MockRtpSenderObserver observer;
EXPECT_CALL(observer, OnFirstPacketSent).Times(0);
transceiver.value()->sender()->SetObserver(&observer);
caller->pc()->ReconfigureBandwidthEstimation(
{.allow_probe_without_media = true});