Only handle each RTCP once.

Previously, each RTCP packet was handled several times in a row, once
per m-section. This caused various weirdness and log warning spam, in
particular when using unified plan.

The cause was that the packets were wired trough each BaseChannel
instance up to the Call class. With this fix, the RTCP packets are wired
once per RtpTransportInternal via the common peer connection class.

Bug: chromium:1002875
Change-Id: I41c4eb3b68e215ebe0f2c6fb93ae0ee73335b89a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/152668
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29226}
This commit is contained in:
Sebastian Jansson 2019-09-18 18:22:12 +02:00 committed by Commit Bot
parent 4bad650ba7
commit 1b83a9e400
18 changed files with 111 additions and 122 deletions

View File

@ -230,6 +230,11 @@ class RtpHelper : public Base {
num_network_route_changes_ = changes;
}
void OnRtcpPacketReceived(rtc::CopyOnWriteBuffer* packet,
int64_t packet_time_us) {
rtcp_packets_.push_back(std::string(packet->cdata<char>(), packet->size()));
}
protected:
bool MuteStream(uint32_t ssrc, bool mute) {
if (!HasSendStream(ssrc) && ssrc != 0) {
@ -271,10 +276,6 @@ class RtpHelper : public Base {
int64_t packet_time_us) {
rtp_packets_.push_back(std::string(packet.cdata<char>(), packet.size()));
}
virtual void OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
rtcp_packets_.push_back(std::string(packet.cdata<char>(), packet.size()));
}
virtual void OnReadyToSend(bool ready) { ready_to_send_ = ready; }
virtual void OnNetworkRouteChanged(const std::string& transport_name,

View File

@ -171,7 +171,8 @@ class FakeNetworkInterface : public MediaChannel::NetworkInterface,
if (msg->message_id == ST_RTP) {
dest_->OnPacketReceived(msg_data->data(), rtc::TimeMicros());
} else {
dest_->OnRtcpReceived(msg_data->data(), rtc::TimeMicros());
RTC_LOG(LS_VERBOSE) << "Dropping RTCP packet, they not handled by "
"MediaChannel anymore.";
}
}
delete msg_data;

View File

@ -202,9 +202,6 @@ class MediaChannel : public sigslot::has_slots<> {
// Called when a RTP packet is received.
virtual void OnPacketReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) = 0;
// Called when a RTCP packet is received.
virtual void OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) = 0;
// Called when the socket's ability to send has changed.
virtual void OnReadyToSend(bool ready) = 0;
// Called when the network route used for sending packets changed.

View File

@ -82,8 +82,6 @@ class RtpDataMediaChannel : public DataMediaChannel {
}
virtual void OnPacketReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us);
virtual void OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {}
virtual void OnReadyToSend(bool ready) {}
virtual bool SendData(const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,

View File

@ -1644,17 +1644,6 @@ void WebRtcVideoChannel::BackfillBufferedPackets(
<< " unknown: " << delivery_unknown_ssrc_cnt;
}
void WebRtcVideoChannel::OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(&thread_checker_);
// TODO(pbos): Check webrtc::PacketReceiver::DELIVERY_OK once we deliver
// for both audio and video on the same path. Since BundleFilter doesn't
// filter RTCP anymore incoming RTCP packets could've been going to audio (so
// logging failures spam the log).
call_->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO, packet,
packet_time_us);
}
void WebRtcVideoChannel::OnReadyToSend(bool ready) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready.");

View File

@ -163,8 +163,6 @@ class WebRtcVideoChannel : public VideoMediaChannel,
void OnPacketReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
void OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
void OnReadyToSend(bool ready) override;
void OnNetworkRouteChanged(const std::string& transport_name,
const rtc::NetworkRoute& network_route) override;

View File

@ -2099,15 +2099,6 @@ void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet,
RTC_DCHECK_NE(webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC, delivery_result);
}
void WebRtcVoiceMediaChannel::OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
// Forward packet to Call as well.
call_->Receiver()->DeliverPacket(webrtc::MediaType::AUDIO, packet,
packet_time_us);
}
void WebRtcVoiceMediaChannel::OnNetworkRouteChanged(
const std::string& transport_name,
const rtc::NetworkRoute& network_route) {

View File

@ -193,8 +193,6 @@ class WebRtcVoiceMediaChannel final : public VoiceMediaChannel,
void OnPacketReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
void OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
void OnNetworkRouteChanged(const std::string& transport_name,
const rtc::NetworkRoute& network_route) override;
void OnReadyToSend(bool ready) override;

View File

@ -3275,8 +3275,6 @@ TEST_F(WebRtcVoiceEngineTestFake, DeliverAudioPacket_Call) {
EXPECT_EQ(0, s->received_packets());
channel_->OnPacketReceived(kPcmuPacket, /* packet_time_us */ -1);
EXPECT_EQ(1, s->received_packets());
channel_->OnRtcpReceived(kRtcpPacket, /* packet_time_us */ -1);
EXPECT_EQ(2, s->received_packets());
}
// All receive channels should be associated with the first send channel,

View File

@ -171,8 +171,6 @@ bool BaseChannel::ConnectToRtpTransport() {
}
rtp_transport_->SignalReadyToSend.connect(
this, &BaseChannel::OnTransportReadyToSend);
rtp_transport_->SignalRtcpPacketReceived.connect(
this, &BaseChannel::OnRtcpPacketReceived);
// If media transport is used, it's responsible for providing network
// route changed callbacks.
@ -193,7 +191,6 @@ void BaseChannel::DisconnectFromRtpTransport() {
RTC_DCHECK(rtp_transport_);
rtp_transport_->UnregisterRtpDemuxerSink(this);
rtp_transport_->SignalReadyToSend.disconnect(this);
rtp_transport_->SignalRtcpPacketReceived.disconnect(this);
rtp_transport_->SignalNetworkRouteChanged.disconnect(this);
rtp_transport_->SignalWritableState.disconnect(this);
rtp_transport_->SignalSentPacket.disconnect(this);
@ -461,12 +458,40 @@ bool BaseChannel::SendPacket(bool rtcp,
void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) {
// Take packet time from the |parsed_packet|.
// RtpPacketReceived.arrival_time_ms = (timestamp_us + 500) / 1000;
int64_t timestamp_us = -1;
int64_t packet_time_us = -1;
if (parsed_packet.arrival_time_ms() > 0) {
timestamp_us = parsed_packet.arrival_time_ms() * 1000;
packet_time_us = parsed_packet.arrival_time_ms() * 1000;
}
OnPacketReceived(/*rtcp=*/false, parsed_packet.Buffer(), timestamp_us);
if (!has_received_packet_) {
has_received_packet_ = true;
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED);
}
if (!srtp_active() && srtp_required_) {
// Our session description indicates that SRTP is required, but we got a
// packet before our SRTP filter is active. This means either that
// a) we got SRTP packets before we received the SDES keys, in which case
// we can't decrypt it anyway, or
// b) we got SRTP packets before DTLS completed on both the RTP and RTCP
// transports, so we haven't yet extracted keys, even if DTLS did
// complete on the transport that the packets are being sent on. It's
// really good practice to wait for both RTP and RTCP to be good to go
// before sending media, to prevent weird failure modes, so it's fine
// for us to just eat packets here. This is all sidestepped if RTCP mux
// is used anyway.
RTC_LOG(LS_WARNING) << "Can't process incoming RTP packet when "
"SRTP is inactive and crypto is required";
return;
}
auto packet_buffer = parsed_packet.Buffer();
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, worker_thread_, [this, packet_buffer, packet_time_us] {
RTC_DCHECK(worker_thread_->IsCurrent());
media_channel_->OnPacketReceived(packet_buffer, packet_time_us);
});
}
void BaseChannel::UpdateRtpHeaderExtensionMap(
@ -492,50 +517,6 @@ bool BaseChannel::RegisterRtpDemuxerSink() {
});
}
void BaseChannel::OnRtcpPacketReceived(rtc::CopyOnWriteBuffer* packet,
int64_t packet_time_us) {
OnPacketReceived(/*rtcp=*/true, *packet, packet_time_us);
}
void BaseChannel::OnPacketReceived(bool rtcp,
const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us) {
if (!has_received_packet_ && !rtcp) {
has_received_packet_ = true;
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED);
}
if (!srtp_active() && srtp_required_) {
// Our session description indicates that SRTP is required, but we got a
// packet before our SRTP filter is active. This means either that
// a) we got SRTP packets before we received the SDES keys, in which case
// we can't decrypt it anyway, or
// b) we got SRTP packets before DTLS completed on both the RTP and RTCP
// transports, so we haven't yet extracted keys, even if DTLS did
// complete on the transport that the packets are being sent on. It's
// really good practice to wait for both RTP and RTCP to be good to go
// before sending media, to prevent weird failure modes, so it's fine
// for us to just eat packets here. This is all sidestepped if RTCP mux
// is used anyway.
RTC_LOG(LS_WARNING)
<< "Can't process incoming "
<< RtpPacketTypeToString(rtcp ? RtpPacketType::kRtcp
: RtpPacketType::kRtp)
<< " packet when SRTP is inactive and crypto is required";
return;
}
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, worker_thread_, [this, rtcp, packet, packet_time_us] {
RTC_DCHECK(worker_thread_->IsCurrent());
if (rtcp) {
media_channel_->OnRtcpReceived(packet, packet_time_us);
} else {
media_channel_->OnPacketReceived(packet, packet_time_us);
}
});
}
void BaseChannel::EnableMedia_w() {
RTC_DCHECK(worker_thread_ == rtc::Thread::Current());
if (enabled_)

View File

@ -221,13 +221,6 @@ class BaseChannel : public ChannelInterface,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options);
void OnRtcpPacketReceived(rtc::CopyOnWriteBuffer* packet,
int64_t packet_time_us);
void OnPacketReceived(bool rtcp,
const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us);
void EnableMedia_w();
void DisableMedia_w();

View File

@ -259,15 +259,7 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
rtc::Thread* network_thread,
std::unique_ptr<typename T::MediaChannel> ch,
webrtc::RtpTransportInternal* rtp_transport,
int flags) {
rtc::Thread* signaling_thread = rtc::Thread::Current();
auto channel = std::make_unique<typename T::Channel>(
worker_thread, network_thread, signaling_thread, std::move(ch),
cricket::CN_AUDIO, (flags & DTLS) != 0, webrtc::CryptoOptions(),
&ssrc_generator_);
channel->Init_w(rtp_transport, webrtc::MediaTransportConfig());
return channel;
}
int flags);
std::unique_ptr<webrtc::RtpTransportInternal> CreateRtpTransportBasedOnFlags(
rtc::PacketTransportInternal* rtp_packet_transport,
@ -1545,6 +1537,25 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
rtc::UniqueRandomIdGenerator ssrc_generator_;
};
template <>
std::unique_ptr<cricket::VoiceChannel> ChannelTest<VoiceTraits>::CreateChannel(
rtc::Thread* worker_thread,
rtc::Thread* network_thread,
std::unique_ptr<cricket::FakeVoiceMediaChannel> ch,
webrtc::RtpTransportInternal* rtp_transport,
int flags) {
rtp_transport->SignalRtcpPacketReceived.connect(
static_cast<cricket::RtpHelper<cricket::VoiceMediaChannel>*>(ch.get()),
&cricket::RtpHelper<cricket::VoiceMediaChannel>::OnRtcpPacketReceived);
rtc::Thread* signaling_thread = rtc::Thread::Current();
auto channel = std::make_unique<cricket::VoiceChannel>(
worker_thread, network_thread, signaling_thread, std::move(ch),
cricket::CN_AUDIO, (flags & DTLS) != 0, webrtc::CryptoOptions(),
&ssrc_generator_);
channel->Init_w(rtp_transport, webrtc::MediaTransportConfig());
return channel;
}
template <>
void ChannelTest<VoiceTraits>::CreateContent(
int flags,
@ -1619,6 +1630,9 @@ std::unique_ptr<cricket::VideoChannel> ChannelTest<VideoTraits>::CreateChannel(
std::unique_ptr<cricket::FakeVideoMediaChannel> ch,
webrtc::RtpTransportInternal* rtp_transport,
int flags) {
rtp_transport->SignalRtcpPacketReceived.connect(
static_cast<cricket::RtpHelper<cricket::VideoMediaChannel>*>(ch.get()),
&cricket::RtpHelper<cricket::VideoMediaChannel>::OnRtcpPacketReceived);
rtc::Thread* signaling_thread = rtc::Thread::Current();
auto channel = std::make_unique<cricket::VideoChannel>(
worker_thread, network_thread, signaling_thread, std::move(ch),
@ -1938,10 +1952,6 @@ TEST_F(VoiceChannelDoubleThreadTest, TestReceivePrAnswer) {
Base::TestReceivePrAnswer();
}
TEST_F(VoiceChannelDoubleThreadTest, TestFlushRtcp) {
Base::TestFlushRtcp();
}
TEST_F(VoiceChannelDoubleThreadTest, TestOnTransportReadyToSend) {
Base::TestOnTransportReadyToSend();
}
@ -2382,10 +2392,6 @@ TEST_F(VideoChannelDoubleThreadTest, TestReceivePrAnswer) {
Base::TestReceivePrAnswer();
}
TEST_F(VideoChannelDoubleThreadTest, TestFlushRtcp) {
Base::TestFlushRtcp();
}
TEST_F(VideoChannelDoubleThreadTest, SendBundleToBundle) {
Base::SendBundleToBundle(kVideoPts, arraysize(kVideoPts), false, false);
}
@ -2438,6 +2444,9 @@ std::unique_ptr<cricket::RtpDataChannel> ChannelTest<DataTraits>::CreateChannel(
std::unique_ptr<cricket::FakeDataMediaChannel> ch,
webrtc::RtpTransportInternal* rtp_transport,
int flags) {
rtp_transport->SignalRtcpPacketReceived.connect(
static_cast<cricket::RtpHelper<cricket::DataMediaChannel>*>(ch.get()),
&cricket::RtpHelper<cricket::DataMediaChannel>::OnRtcpPacketReceived);
rtc::Thread* signaling_thread = rtc::Thread::Current();
auto channel = std::make_unique<cricket::RtpDataChannel>(
worker_thread, network_thread, signaling_thread, std::move(ch),

View File

@ -89,6 +89,7 @@ JsepTransportController::JsepTransportController(
config_(config) {
// The |transport_observer| is assumed to be non-null.
RTC_DCHECK(config_.transport_observer);
RTC_DCHECK(config_.rtcp_handler);
}
JsepTransportController::~JsepTransportController() {
@ -1236,6 +1237,9 @@ RTCError JsepTransportController::MaybeCreateJsepTransport(
std::move(rtp_dtls_transport), std::move(rtcp_dtls_transport),
std::move(media_transport), std::move(datagram_transport));
jsep_transport->rtp_transport()->SignalRtcpPacketReceived.connect(
this, &JsepTransportController::OnRtcpPacketReceived_n);
jsep_transport->SignalRtcpMuxActive.connect(
this, &JsepTransportController::UpdateAggregateStates_n);
jsep_transport->SignalMediaTransportStateChanged.connect(
@ -1687,6 +1691,13 @@ void JsepTransportController::UpdateAggregateStates_n() {
}
}
void JsepTransportController::OnRtcpPacketReceived_n(
rtc::CopyOnWriteBuffer* packet,
int64_t packet_time_us) {
RTC_DCHECK(config_.rtcp_handler);
config_.rtcp_handler(*packet, packet_time_us);
}
void JsepTransportController::OnDtlsHandshakeError(
rtc::SSLHandshakeError error) {
SignalDtlsHandshakeError(error);

View File

@ -106,6 +106,11 @@ class JsepTransportController : public sigslot::has_slots<> {
// Used to inject the ICE/DTLS transports created externally.
cricket::TransportFactoryInterface* external_transport_factory = nullptr;
Observer* transport_observer = nullptr;
// Must be provided and valid for the lifetime of the
// JsepTransportController instance.
std::function<void(const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us)>
rtcp_handler;
bool active_reset_srtp_params = false;
RtcEventLog* event_log = nullptr;
@ -437,6 +442,9 @@ class JsepTransportController : public sigslot::has_slots<> {
void UpdateAggregateStates_n();
void OnRtcpPacketReceived_n(rtc::CopyOnWriteBuffer* packet,
int64_t packet_time_us);
void OnDtlsHandshakeError(rtc::SSLHandshakeError error);
rtc::Thread* const signaling_thread_ = nullptr;

View File

@ -90,6 +90,8 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
rtc::Thread* network_thread = rtc::Thread::Current(),
cricket::PortAllocator* port_allocator = nullptr) {
config.transport_observer = this;
config.rtcp_handler = [](const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us) { RTC_NOTREACHED(); };
// The tests only works with |fake_transport_factory|;
config.external_transport_factory = fake_transport_factory_.get();
// TODO(zstein): Provide an AsyncResolverFactory once it is required.

View File

@ -1077,6 +1077,25 @@ bool PeerConnection::Initialize(
? *configuration.crypto_options
: options.crypto_options;
config.transport_observer = this;
// It's safe to pass |this| and using |rtcp_invoker_| and the |call_| pointer
// since the JsepTransportController instance is owned by this PeerConnection
// instance and is destroyed before both |rtcp_invoker_| and the |call_|
// pointer.
config.rtcp_handler = [this](const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread());
rtcp_invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, worker_thread(), [this, packet, packet_time_us] {
RTC_DCHECK_RUN_ON(worker_thread());
// |call_| is reset on the worker thread in the PeerConnection
// destructor, so we check that it's still valid before propagating
// the packet.
if (call_) {
call_->Receiver()->DeliverPacket(MediaType::ANY, packet,
packet_time_us);
}
});
};
config.event_log = event_log_ptr_;
#if defined(ENABLE_EXTERNAL_AUTH)
config.enable_external_auth = true;

View File

@ -1288,6 +1288,8 @@ class PeerConnection : public PeerConnectionInternal,
// its own thread safety.
std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread());
rtc::AsyncInvoker rtcp_invoker_ RTC_GUARDED_BY(network_thread());
// Points to the same thing as `call_`. Since it's const, we may read the
// pointer from any thread.
Call* const call_ptr_;

View File

@ -48,8 +48,6 @@ class ScenarioIceConnectionImpl : public ScenarioIceConnection,
DataChannelTransportInterface* data_channel_transport,
JsepTransportController::NegotiationState negotiation_state) override;
void OnRtcpPacketReceived(rtc::CopyOnWriteBuffer* packet_ptr,
int64_t packet_time_us);
void OnRtpPacket(const RtpPacketReceived& packet) override;
void OnCandidates(const std::string& mid,
const std::vector<cricket::Candidate>& candidates);
@ -131,6 +129,11 @@ JsepTransportController::Config ScenarioIceConnectionImpl::CreateJsepConfig() {
config.transport_observer = this;
config.bundle_policy =
PeerConnectionInterface::BundlePolicy::kBundlePolicyMaxBundle;
config.rtcp_handler = [this](const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread_);
observer_->OnPacketReceived(packet);
};
return config;
}
@ -210,14 +213,11 @@ bool ScenarioIceConnectionImpl::OnTransportChanged(
JsepTransportController::NegotiationState negotiation_state) {
RTC_DCHECK_RUN_ON(network_thread_);
if (rtp_transport == nullptr) {
rtp_transport_->SignalRtcpPacketReceived.disconnect(this);
rtp_transport_->UnregisterRtpDemuxerSink(this);
} else {
RTC_DCHECK(rtp_transport_ == nullptr || rtp_transport_ == rtp_transport);
if (rtp_transport_ != rtp_transport) {
rtp_transport_ = rtp_transport;
rtp_transport_->SignalRtcpPacketReceived.connect(
this, &ScenarioIceConnectionImpl::OnRtcpPacketReceived);
}
RtpDemuxerCriteria criteria;
criteria.mid = mid;
@ -226,13 +226,6 @@ bool ScenarioIceConnectionImpl::OnTransportChanged(
return true;
}
void ScenarioIceConnectionImpl::OnRtcpPacketReceived(
rtc::CopyOnWriteBuffer* packet,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread_);
observer_->OnPacketReceived(*packet);
}
void ScenarioIceConnectionImpl::OnRtpPacket(const RtpPacketReceived& packet) {
RTC_DCHECK_RUN_ON(network_thread_);
observer_->OnPacketReceived(packet.Buffer());