Move RTP/RTCP demuxing logic from BaseChannel to RtpTransport.

BUG=webrtc:7013

Review-Url: https://codereview.webrtc.org/2890263003
Cr-Commit-Position: refs/heads/master@{#18391}
This commit is contained in:
zstein 2017-06-01 13:22:42 -07:00 committed by Commit Bot
parent 7d9a55b92d
commit 3dcf0e93fa
11 changed files with 277 additions and 215 deletions

View File

@ -279,6 +279,15 @@ bool IsValidRtpPayloadType(int payload_type) {
return payload_type >= 0 && payload_type <= 127;
}
bool IsValidRtpRtcpPacketSize(bool rtcp, size_t size) {
return (rtcp ? size >= kMinRtcpPacketLen : size >= kMinRtpPacketLen) &&
size <= kMaxRtpPacketLen;
}
const char* RtpRtcpStringLiteral(bool rtcp) {
return rtcp ? "RTCP" : "RTP";
}
bool ValidateRtpHeader(const uint8_t* rtp,
size_t length,
size_t* header_length) {

View File

@ -58,6 +58,14 @@ bool IsRtpPacket(const void* data, size_t len);
// True if |payload type| is 0-127.
bool IsValidRtpPayloadType(int payload_type);
// True if |size| is appropriate for the indicated packet type.
bool IsValidRtpRtcpPacketSize(bool rtcp, size_t size);
// TODO(zstein): Consider using an enum instead of a bool to differentiate
// between RTP and RTCP.
// Returns "RTCP" or "RTP" according to |rtcp|.
const char* RtpRtcpStringLiteral(bool rtcp);
// Verifies that a packet has a valid RTP header.
bool ValidateRtpHeader(const uint8_t* rtp,
size_t length,

View File

@ -105,15 +105,9 @@ struct DataChannelErrorMessageData : public rtc::MessageData {
DataMediaChannel::Error error;
};
static const char* PacketType(bool rtcp) {
return (!rtcp) ? "RTP" : "RTCP";
}
static bool ValidPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) {
// Check the packet size. We could check the header too if needed.
return (packet &&
packet->size() >= (!rtcp ? kMinRtpPacketLen : kMinRtcpPacketLen) &&
packet->size() <= kMaxRtpPacketLen);
return packet && IsValidRtpRtcpPacketSize(rtcp, packet->size());
}
static bool IsReceiveContentDirection(MediaContentDirection direction) {
@ -179,6 +173,11 @@ BaseChannel::BaseChannel(rtc::Thread* worker_thread,
#endif
rtp_transport_.SignalReadyToSend.connect(
this, &BaseChannel::OnTransportReadyToSend);
// TODO(zstein): RtpTransport::SignalPacketReceived will probably be replaced
// with a callback interface later so that the demuxer can select which
// channel to signal.
rtp_transport_.SignalPacketReceived.connect(this,
&BaseChannel::OnPacketReceived);
LOG(LS_INFO) << "Created channel for " << content_name;
}
@ -214,6 +213,9 @@ void BaseChannel::DisconnectTransportChannels_n() {
DisconnectFromPacketTransport(rtp_transport_.rtcp_packet_transport());
}
rtp_transport_.SetRtpPacketTransport(nullptr);
rtp_transport_.SetRtcpPacketTransport(nullptr);
// Clear pending read packets/messages.
network_thread_->Clear(&invoker_);
network_thread_->Clear(this);
@ -397,7 +399,6 @@ void BaseChannel::ConnectToDtlsTransport(DtlsTransportInternal* transport) {
// TODO(zstein): de-dup with ConnectToPacketTransport
transport->SignalWritableState.connect(this, &BaseChannel::OnWritableState);
transport->SignalReadPacket.connect(this, &BaseChannel::OnPacketRead);
transport->SignalDtlsState.connect(this, &BaseChannel::OnDtlsState);
transport->SignalSentPacket.connect(this, &BaseChannel::SignalSentPacket_n);
transport->ice_transport()->SignalSelectedCandidatePairChanged.connect(
@ -411,7 +412,6 @@ void BaseChannel::DisconnectFromDtlsTransport(
false);
transport->SignalWritableState.disconnect(this);
transport->SignalReadPacket.disconnect(this);
transport->SignalDtlsState.disconnect(this);
transport->SignalSentPacket.disconnect(this);
transport->ice_transport()->SignalSelectedCandidatePairChanged.disconnect(
@ -422,7 +422,6 @@ void BaseChannel::ConnectToPacketTransport(
rtc::PacketTransportInternal* transport) {
RTC_DCHECK_RUN_ON(network_thread_);
transport->SignalWritableState.connect(this, &BaseChannel::OnWritableState);
transport->SignalReadPacket.connect(this, &BaseChannel::OnPacketRead);
transport->SignalSentPacket.connect(this, &BaseChannel::SignalSentPacket_n);
}
@ -430,7 +429,6 @@ void BaseChannel::DisconnectFromPacketTransport(
rtc::PacketTransportInternal* transport) {
RTC_DCHECK_RUN_ON(network_thread_);
transport->SignalWritableState.disconnect(this);
transport->SignalReadPacket.disconnect(this);
transport->SignalSentPacket.disconnect(this);
}
@ -576,22 +574,6 @@ void BaseChannel::OnWritableState(rtc::PacketTransportInternal* transport) {
UpdateWritableState_n();
}
void BaseChannel::OnPacketRead(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const rtc::PacketTime& packet_time,
int flags) {
TRACE_EVENT0("webrtc", "BaseChannel::OnPacketRead");
// OnPacketRead gets called from P2PSocket; now pass data to MediaEngine
RTC_DCHECK(network_thread_->IsCurrent());
// When using RTCP multiplexing we might get RTCP packets on the RTP
// transport. We feed RTP traffic into the demuxer to determine if it is RTCP.
bool rtcp = PacketIsRtcp(transport, data, len);
rtc::CopyOnWriteBuffer packet(data, len);
HandlePacket(rtcp, &packet, packet_time);
}
void BaseChannel::OnDtlsState(DtlsTransportInternal* transport,
DtlsTransportState state) {
if (!ShouldSetupDtlsSrtp_n()) {
@ -641,13 +623,6 @@ void BaseChannel::OnTransportReadyToSend(bool ready) {
Bind(&MediaChannel::OnReadyToSend, media_channel_, ready));
}
bool BaseChannel::PacketIsRtcp(const rtc::PacketTransportInternal* transport,
const char* data,
size_t len) {
return (transport == rtp_transport_.rtcp_packet_transport() ||
rtcp_mux_filter_.DemuxRtcp(data, static_cast<int>(len)));
}
bool BaseChannel::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
@ -680,7 +655,7 @@ bool BaseChannel::SendPacket(bool rtcp,
// Protect ourselves against crazy data.
if (!ValidPacket(rtcp, packet)) {
LOG(LS_ERROR) << "Dropping outgoing " << content_name_ << " "
<< PacketType(rtcp)
<< RtpRtcpStringLiteral(rtcp)
<< " packet: wrong size=" << packet->size();
return false;
}
@ -772,31 +747,13 @@ bool BaseChannel::SendPacket(bool rtcp,
return rtp_transport_.SendPacket(rtcp, packet, updated_options, flags);
}
bool BaseChannel::WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) {
// Protect ourselves against crazy data.
if (!ValidPacket(rtcp, packet)) {
LOG(LS_ERROR) << "Dropping incoming " << content_name_ << " "
<< PacketType(rtcp)
<< " packet: wrong size=" << packet->size();
return false;
}
if (rtcp) {
// Permit all (seemingly valid) RTCP packets.
return true;
}
// Check whether we handle this payload.
return bundle_filter_.DemuxPacket(packet->data(), packet->size());
bool BaseChannel::HandlesPayloadType(int packet_type) const {
return rtp_transport_.HandlesPayloadType(packet_type);
}
void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
const rtc::PacketTime& packet_time) {
RTC_DCHECK(network_thread_->IsCurrent());
if (!WantsPacket(rtcp, packet)) {
return;
}
// We are only interested in the first rtp packet because that
// indicates the media has started flowing.
void BaseChannel::OnPacketReceived(bool rtcp,
rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time) {
if (!has_received_packet_ && !rtcp) {
has_received_packet_ = true;
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED);
@ -805,8 +762,8 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
// Unprotect the packet, if needed.
if (srtp_filter_.IsActive()) {
TRACE_EVENT0("webrtc", "SRTP Decode");
char* data = packet->data<char>();
int len = static_cast<int>(packet->size());
char* data = packet.data<char>();
int len = static_cast<int>(packet.size());
bool res;
if (!rtcp) {
res = srtp_filter_.UnprotectRtp(data, len, &len);
@ -816,8 +773,8 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
GetRtpSeqNum(data, len, &seq_num);
GetRtpSsrc(data, len, &ssrc);
LOG(LS_ERROR) << "Failed to unprotect " << content_name_
<< " RTP packet: size=" << len
<< ", seqnum=" << seq_num << ", SSRC=" << ssrc;
<< " RTP packet: size=" << len << ", seqnum=" << seq_num
<< ", SSRC=" << ssrc;
return;
}
} else {
@ -831,7 +788,7 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
}
}
packet->SetSize(len);
packet.SetSize(len);
} else if (srtp_required_) {
// Our session description indicates that SRTP is required, but we got a
// packet before our SRTP filter is active. This means either that
@ -844,20 +801,21 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
// before sending media, to prevent weird failure modes, so it's fine
// for us to just eat packets here. This is all sidestepped if RTCP mux
// is used anyway.
LOG(LS_WARNING) << "Can't process incoming " << PacketType(rtcp)
LOG(LS_WARNING) << "Can't process incoming " << RtpRtcpStringLiteral(rtcp)
<< " packet when SRTP is inactive and crypto is required";
return;
}
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, worker_thread_,
Bind(&BaseChannel::OnPacketReceived, this, rtcp, *packet, packet_time));
Bind(&BaseChannel::ProcessPacket, this, rtcp, packet, packet_time));
}
void BaseChannel::OnPacketReceived(bool rtcp,
const rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time) {
void BaseChannel::ProcessPacket(bool rtcp,
const rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time) {
RTC_DCHECK(worker_thread_->IsCurrent());
// Need to copy variable because OnRtcpReceived/OnPacketReceived
// requires non-const pointer to buffer. This doesn't memcpy the actual data.
rtc::CopyOnWriteBuffer data(packet);
@ -987,7 +945,7 @@ bool BaseChannel::SetupDtlsSrtp_n(bool rtcp) {
}
LOG(LS_INFO) << "Installing keys from DTLS-SRTP on " << content_name() << " "
<< PacketType(rtcp);
<< RtpRtcpStringLiteral(rtcp);
int key_len;
int salt_len;
@ -1448,6 +1406,10 @@ void BaseChannel::OnMessage(rtc::Message *pmsg) {
}
}
void BaseChannel::AddHandledPayloadType(int payload_type) {
rtp_transport_.AddHandledPayloadType(payload_type);
}
void BaseChannel::FlushRtcpMessages_n() {
// Flush all remaining RTCP messages. This should only be called in
// destructor.
@ -1659,15 +1621,13 @@ void VoiceChannel::GetActiveStreams_w(AudioInfo::StreamList* actives) {
media_channel()->GetActiveStreams(actives);
}
void VoiceChannel::OnPacketRead(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const rtc::PacketTime& packet_time,
int flags) {
BaseChannel::OnPacketRead(transport, data, len, packet_time, flags);
void VoiceChannel::OnPacketReceived(bool rtcp,
rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time) {
BaseChannel::OnPacketReceived(rtcp, packet, packet_time);
// Set a flag when we've received an RTP packet. If we're waiting for early
// media, this will disable the timeout.
if (!received_media_ && !PacketIsRtcp(transport, data, len)) {
if (!received_media_ && !rtcp) {
received_media_ = true;
}
}
@ -1766,7 +1726,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
return false;
}
for (const AudioCodec& codec : audio->codecs()) {
bundle_filter()->AddPayloadType(codec.id);
AddHandledPayloadType(codec.id);
}
last_recv_params_ = recv_params;
@ -2039,7 +1999,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
return false;
}
for (const VideoCodec& codec : video->codecs()) {
bundle_filter()->AddPayloadType(codec.id);
AddHandledPayloadType(codec.id);
}
last_recv_params_ = recv_params;
@ -2234,7 +2194,7 @@ bool RtpDataChannel::SetLocalContent_w(const MediaContentDescription* content,
return false;
}
for (const DataCodec& codec : data->codecs()) {
bundle_filter()->AddPayloadType(codec.id);
AddHandledPayloadType(codec.id);
}
last_recv_params_ = recv_params;

View File

@ -36,7 +36,6 @@
#include "webrtc/p2p/base/transportcontroller.h"
#include "webrtc/p2p/client/socketmonitor.h"
#include "webrtc/pc/audiomonitor.h"
#include "webrtc/pc/bundlefilter.h"
#include "webrtc/pc/mediamonitor.h"
#include "webrtc/pc/mediasession.h"
#include "webrtc/pc/rtcpmuxfilter.h"
@ -149,8 +148,6 @@ class BaseChannel
// For ConnectionStatsGetter, used by ConnectionMonitor
bool GetConnectionStats(ConnectionInfos* infos) override;
BundleFilter* bundle_filter() { return &bundle_filter_; }
const std::vector<StreamParams>& local_streams() const {
return local_streams_;
}
@ -198,6 +195,11 @@ class BaseChannel
// This function returns true if we require SRTP for call setup.
bool srtp_required_for_testing() const { return srtp_required_; }
// Public for testing.
// TODO(zstein): Remove this once channels register themselves with
// an RtpTransport in a more explicit way.
bool HandlesPayloadType(int payload_type) const;
protected:
virtual MediaChannel* media_channel() const { return media_channel_; }
@ -248,11 +250,6 @@ class BaseChannel
// From TransportChannel
void OnWritableState(rtc::PacketTransportInternal* transport);
virtual void OnPacketRead(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const rtc::PacketTime& packet_time,
int flags);
void OnDtlsState(DtlsTransportInternal* transport, DtlsTransportState state);
@ -272,9 +269,13 @@ class BaseChannel
bool WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet);
void HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
const rtc::PacketTime& packet_time);
void OnPacketReceived(bool rtcp,
const rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time);
// TODO(zstein): packet can be const once the RtpTransport handles protection.
virtual void OnPacketReceived(bool rtcp,
rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time);
void ProcessPacket(bool rtcp,
const rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time);
void EnableMedia_w();
void DisableMedia_w();
@ -357,6 +358,8 @@ class BaseChannel
return worker_thread_->Invoke<bool>(posted_from, functor);
}
void AddHandledPayloadType(int payload_type);
private:
bool InitNetwork_n(DtlsTransportInternal* rtp_dtls_transport,
DtlsTransportInternal* rtcp_dtls_transport,
@ -394,7 +397,6 @@ class BaseChannel
std::vector<std::pair<rtc::Socket::Option, int> > rtcp_socket_options_;
SrtpFilter srtp_filter_;
RtcpMuxFilter rtcp_mux_filter_;
BundleFilter bundle_filter_;
bool writable_ = false;
bool was_ever_writable_ = false;
bool has_received_packet_ = false;
@ -496,11 +498,9 @@ class VoiceChannel : public BaseChannel {
private:
// overrides from BaseChannel
void OnPacketRead(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const rtc::PacketTime& packet_time,
int flags) override;
void OnPacketReceived(bool rtcp,
rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time) override;
void UpdateMediaSendRecvState_w() override;
const ContentInfo* GetFirstContent(const SessionDescription* sdesc) override;
bool SetLocalContent_w(const MediaContentDescription* content,

View File

@ -1560,10 +1560,10 @@ class ChannelTest : public testing::Test, public sigslot::has_slots<> {
EXPECT_TRUE(SendAccept());
EXPECT_EQ(rtcp_mux, !channel1_->NeedsRtcpTransport());
EXPECT_EQ(rtcp_mux, !channel2_->NeedsRtcpTransport());
EXPECT_TRUE(channel1_->bundle_filter()->FindPayloadType(pl_type1));
EXPECT_TRUE(channel2_->bundle_filter()->FindPayloadType(pl_type1));
EXPECT_FALSE(channel1_->bundle_filter()->FindPayloadType(pl_type2));
EXPECT_FALSE(channel2_->bundle_filter()->FindPayloadType(pl_type2));
EXPECT_TRUE(channel1_->HandlesPayloadType(pl_type1));
EXPECT_TRUE(channel2_->HandlesPayloadType(pl_type1));
EXPECT_FALSE(channel1_->HandlesPayloadType(pl_type2));
EXPECT_FALSE(channel2_->HandlesPayloadType(pl_type2));
// Both channels can receive pl_type1 only.
SendCustomRtp1(kSsrc1, ++sequence_number1_1, pl_type1);

View File

@ -108,26 +108,6 @@ bool RtcpMuxFilter::SetAnswer(bool answer_enable, ContentSource src) {
return true;
}
// Check the RTP payload type. If 63 < payload type < 96, it's RTCP.
// For additional details, see http://tools.ietf.org/html/rfc5761.
bool IsRtcp(const char* data, int len) {
if (len < 2) {
return false;
}
char pt = data[1] & 0x7F;
return (63 < pt) && (pt < 96);
}
bool RtcpMuxFilter::DemuxRtcp(const char* data, int len) {
// If we're muxing RTP/RTCP, we must inspect each packet delivered
// and determine whether it is RTP or RTCP. We do so by looking at
// the RTP payload type (see IsRtcp). Note that if we offer RTCP
// mux, we may receive muxed RTCP before we receive the answer, so
// we operate in that state too.
bool offered_mux = ((state_ == ST_SENTOFFER) && offer_enable_);
return (IsActive() || offered_mux) && IsRtcp(data, len);
}
bool RtcpMuxFilter::ExpectOffer(bool offer_enable, ContentSource source) {
return ((state_ == ST_INIT) ||
(state_ == ST_ACTIVE && offer_enable == offer_enable_) ||

View File

@ -45,9 +45,6 @@ class RtcpMuxFilter {
// Specifies whether the answer indicates the use of RTCP mux.
bool SetAnswer(bool answer_enable, ContentSource src);
// Determines whether the specified packet is RTCP.
bool DemuxRtcp(const char* data, int len);
private:
bool ExpectOffer(bool offer_enable, ContentSource source);
bool ExpectAnswer(ContentSource source);

View File

@ -12,72 +12,6 @@
#include "webrtc/media/base/testutils.h"
#include "webrtc/pc/rtcpmuxfilter.h"
TEST(RtcpMuxFilterTest, DemuxRtcpSender) {
cricket::RtcpMuxFilter filter;
const char data[] = { 0, 73, 0, 0 };
const int len = 4;
// Init state - refuse to demux
EXPECT_FALSE(filter.DemuxRtcp(data, len));
// After sent offer, demux should be enabled
filter.SetOffer(true, cricket::CS_LOCAL);
EXPECT_TRUE(filter.DemuxRtcp(data, len));
// Remote accepted, demux should be enabled
filter.SetAnswer(true, cricket::CS_REMOTE);
EXPECT_TRUE(filter.DemuxRtcp(data, len));
}
TEST(RtcpMuxFilterTest, DemuxRtcpReceiver) {
cricket::RtcpMuxFilter filter;
const char data[] = { 0, 73, 0, 0 };
const int len = 4;
// Init state - refuse to demux
EXPECT_FALSE(filter.DemuxRtcp(data, len));
// After received offer, demux should not be enabled
filter.SetOffer(true, cricket::CS_REMOTE);
EXPECT_FALSE(filter.DemuxRtcp(data, len));
// We accept, demux is now enabled
filter.SetAnswer(true, cricket::CS_LOCAL);
EXPECT_TRUE(filter.DemuxRtcp(data, len));
}
TEST(RtcpMuxFilterTest, DemuxRtcpSenderProvisionalAnswer) {
cricket::RtcpMuxFilter filter;
const char data[] = { 0, 73, 0, 0 };
const int len = 4;
filter.SetOffer(true, cricket::CS_REMOTE);
// Received provisional answer without mux enabled.
filter.SetProvisionalAnswer(false, cricket::CS_LOCAL);
EXPECT_FALSE(filter.DemuxRtcp(data, len));
// Received provisional answer with mux enabled.
filter.SetProvisionalAnswer(true, cricket::CS_LOCAL);
EXPECT_TRUE(filter.DemuxRtcp(data, len));
// Remote accepted, demux should be enabled.
filter.SetAnswer(true, cricket::CS_LOCAL);
EXPECT_TRUE(filter.DemuxRtcp(data, len));
}
TEST(RtcpMuxFilterTest, DemuxRtcpReceiverProvisionalAnswer) {
cricket::RtcpMuxFilter filter;
const char data[] = { 0, 73, 0, 0 };
const int len = 4;
filter.SetOffer(true, cricket::CS_LOCAL);
// Received provisional answer without mux enabled.
filter.SetProvisionalAnswer(false, cricket::CS_REMOTE);
// After sent offer, demux should be enabled until we have received a
// final answer.
EXPECT_TRUE(filter.DemuxRtcp(data, len));
// Received provisional answer with mux enabled.
filter.SetProvisionalAnswer(true, cricket::CS_REMOTE);
EXPECT_TRUE(filter.DemuxRtcp(data, len));
// Remote accepted, demux should be enabled.
filter.SetAnswer(true, cricket::CS_REMOTE);
EXPECT_TRUE(filter.DemuxRtcp(data, len));
}
TEST(RtcpMuxFilterTest, IsActiveSender) {
cricket::RtcpMuxFilter filter;
// Init state - not active
@ -222,12 +156,9 @@ TEST(RtcpMuxFilterTest, KeepFilterDisabledDuringUpdate) {
// Test that we can SetActive and then can't deactivate.
TEST(RtcpMuxFilterTest, SetActiveCantDeactivate) {
cricket::RtcpMuxFilter filter;
const char data[] = { 0, 73, 0, 0 };
const int len = 4;
filter.SetActive();
EXPECT_TRUE(filter.IsActive());
EXPECT_TRUE(filter.DemuxRtcp(data, len));
EXPECT_FALSE(filter.SetOffer(false, cricket::CS_LOCAL));
EXPECT_TRUE(filter.IsActive());

View File

@ -12,6 +12,8 @@
#include "webrtc/base/checks.h"
#include "webrtc/base/copyonwritebuffer.h"
#include "webrtc/base/trace_event.h"
#include "webrtc/media/base/rtputils.h"
#include "webrtc/p2p/base/packettransportinterface.h"
namespace webrtc {
@ -28,10 +30,13 @@ void RtpTransport::SetRtpPacketTransport(
}
if (rtp_packet_transport_) {
rtp_packet_transport_->SignalReadyToSend.disconnect(this);
rtp_packet_transport_->SignalReadPacket.disconnect(this);
}
if (new_packet_transport) {
new_packet_transport->SignalReadyToSend.connect(
this, &RtpTransport::OnReadyToSend);
new_packet_transport->SignalReadPacket.connect(this,
&RtpTransport::OnReadPacket);
}
rtp_packet_transport_ = new_packet_transport;
@ -48,10 +53,13 @@ void RtpTransport::SetRtcpPacketTransport(
}
if (rtcp_packet_transport_) {
rtcp_packet_transport_->SignalReadyToSend.disconnect(this);
rtcp_packet_transport_->SignalReadPacket.disconnect(this);
}
if (new_packet_transport) {
new_packet_transport->SignalReadyToSend.connect(
this, &RtpTransport::OnReadyToSend);
new_packet_transport->SignalReadPacket.connect(this,
&RtpTransport::OnReadPacket);
}
rtcp_packet_transport_ = new_packet_transport;
@ -87,6 +95,18 @@ bool RtpTransport::SendPacket(bool rtcp,
return true;
}
bool RtpTransport::HandlesPacket(const uint8_t* data, size_t len) {
return bundle_filter_.DemuxPacket(data, len);
}
bool RtpTransport::HandlesPayloadType(int payload_type) const {
return bundle_filter_.FindPayloadType(payload_type);
}
void RtpTransport::AddHandledPayloadType(int payload_type) {
bundle_filter_.AddPayloadType(payload_type);
}
PacketTransportInterface* RtpTransport::GetRtpPacketTransport() const {
return rtp_packet_transport_;
}
@ -142,4 +162,51 @@ void RtpTransport::MaybeSignalReadyToSend() {
}
}
// Check the RTP payload type. If 63 < payload type < 96, it's RTCP.
// For additional details, see http://tools.ietf.org/html/rfc5761.
bool IsRtcp(const char* data, int len) {
if (len < 2) {
return false;
}
char pt = data[1] & 0x7F;
return (63 < pt) && (pt < 96);
}
void RtpTransport::OnReadPacket(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const rtc::PacketTime& packet_time,
int flags) {
TRACE_EVENT0("webrtc", "RtpTransport::OnReadPacket");
// When using RTCP multiplexing we might get RTCP packets on the RTP
// transport. We check the RTP payload type to determine if it is RTCP.
bool rtcp = transport == rtcp_packet_transport() ||
IsRtcp(data, static_cast<int>(len));
rtc::CopyOnWriteBuffer packet(data, len);
if (!WantsPacket(rtcp, &packet)) {
return;
}
// This mutates |packet| if it is protected.
SignalPacketReceived(rtcp, packet, packet_time);
}
bool RtpTransport::WantsPacket(bool rtcp,
const rtc::CopyOnWriteBuffer* packet) {
// Protect ourselves against crazy data.
if (!packet || !cricket::IsValidRtpRtcpPacketSize(rtcp, packet->size())) {
LOG(LS_ERROR) << "Dropping incoming " << cricket::RtpRtcpStringLiteral(rtcp)
<< " packet: wrong size=" << packet->size();
return false;
}
if (rtcp) {
// Permit all (seemingly valid) RTCP packets.
return true;
}
// Check whether we handle this payload.
return HandlesPacket(packet->data(), packet->size());
}
} // namespace webrtc

View File

@ -13,11 +13,13 @@
#include "webrtc/api/ortc/rtptransportinterface.h"
#include "webrtc/base/sigslot.h"
#include "webrtc/pc/bundlefilter.h"
namespace rtc {
class CopyOnWriteBuffer;
struct PacketOptions;
struct PacketTime;
class PacketTransportInternal;
} // namespace rtc
@ -64,11 +66,23 @@ class RtpTransport : public RtpTransportInterface, public sigslot::has_slots<> {
const rtc::PacketOptions& options,
int flags);
bool HandlesPayloadType(int payload_type) const;
void AddHandledPayloadType(int payload_type);
// TODO(zstein): Consider having two signals - RtcPacketReceived and
// RtcpPacketReceived.
// The first argument is true for RTCP packets and false for RTP packets.
sigslot::signal3<bool, rtc::CopyOnWriteBuffer&, const rtc::PacketTime&>
SignalPacketReceived;
protected:
// TODO(zstein): Remove this when we remove RtpTransportAdapter.
RtpTransportAdapter* GetInternal() override;
private:
bool HandlesPacket(const uint8_t* data, size_t len);
void OnReadyToSend(rtc::PacketTransportInternal* transport);
// Updates "ready to send" for an individual channel and fires
@ -77,6 +91,14 @@ class RtpTransport : public RtpTransportInterface, public sigslot::has_slots<> {
void MaybeSignalReadyToSend();
void OnReadPacket(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const rtc::PacketTime& packet_time,
int flags);
bool WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet);
bool rtcp_mux_enabled_;
rtc::PacketTransportInternal* rtp_packet_transport_ = nullptr;
@ -87,6 +109,8 @@ class RtpTransport : public RtpTransportInterface, public sigslot::has_slots<> {
bool rtcp_ready_to_send_ = false;
RtcpParameters rtcp_parameters_;
cricket::BundleFilter bundle_filter_;
};
} // namespace webrtc

View File

@ -16,12 +16,10 @@
namespace webrtc {
class RtpTransportTest : public testing::Test {};
constexpr bool kMuxDisabled = false;
constexpr bool kMuxEnabled = true;
TEST_F(RtpTransportTest, SetRtcpParametersCantDisableRtcpMux) {
TEST(RtpTransportTest, SetRtcpParametersCantDisableRtcpMux) {
RtpTransport transport(kMuxDisabled);
RtcpParameters params;
transport.SetRtcpParameters(params);
@ -29,7 +27,7 @@ TEST_F(RtpTransportTest, SetRtcpParametersCantDisableRtcpMux) {
EXPECT_FALSE(transport.SetRtcpParameters(params).ok());
}
TEST_F(RtpTransportTest, SetRtcpParametersEmptyCnameUsesExisting) {
TEST(RtpTransportTest, SetRtcpParametersEmptyCnameUsesExisting) {
static const char kName[] = "name";
RtpTransport transport(kMuxDisabled);
RtcpParameters params_with_name;
@ -47,11 +45,14 @@ class SignalObserver : public sigslot::has_slots<> {
explicit SignalObserver(RtpTransport* transport) {
transport->SignalReadyToSend.connect(this, &SignalObserver::OnReadyToSend);
}
bool ready() const { return ready_; }
void OnReadyToSend(bool ready) { ready_ = ready; }
private:
bool ready_ = false;
};
TEST_F(RtpTransportTest, SettingRtcpAndRtpSignalsReady) {
TEST(RtpTransportTest, SettingRtcpAndRtpSignalsReady) {
RtpTransport transport(kMuxDisabled);
SignalObserver observer(&transport);
rtc::FakePacketTransport fake_rtcp("fake_rtcp");
@ -60,12 +61,12 @@ TEST_F(RtpTransportTest, SettingRtcpAndRtpSignalsReady) {
fake_rtp.SetWritable(true);
transport.SetRtcpPacketTransport(&fake_rtcp); // rtcp ready
EXPECT_FALSE(observer.ready_);
EXPECT_FALSE(observer.ready());
transport.SetRtpPacketTransport(&fake_rtp); // rtp ready
EXPECT_TRUE(observer.ready_);
EXPECT_TRUE(observer.ready());
}
TEST_F(RtpTransportTest, SettingRtpAndRtcpSignalsReady) {
TEST(RtpTransportTest, SettingRtpAndRtcpSignalsReady) {
RtpTransport transport(kMuxDisabled);
SignalObserver observer(&transport);
rtc::FakePacketTransport fake_rtcp("fake_rtcp");
@ -74,45 +75,45 @@ TEST_F(RtpTransportTest, SettingRtpAndRtcpSignalsReady) {
fake_rtp.SetWritable(true);
transport.SetRtpPacketTransport(&fake_rtp); // rtp ready
EXPECT_FALSE(observer.ready_);
EXPECT_FALSE(observer.ready());
transport.SetRtcpPacketTransport(&fake_rtcp); // rtcp ready
EXPECT_TRUE(observer.ready_);
EXPECT_TRUE(observer.ready());
}
TEST_F(RtpTransportTest, SettingRtpWithRtcpMuxEnabledSignalsReady) {
TEST(RtpTransportTest, SettingRtpWithRtcpMuxEnabledSignalsReady) {
RtpTransport transport(kMuxEnabled);
SignalObserver observer(&transport);
rtc::FakePacketTransport fake_rtp("fake_rtp");
fake_rtp.SetWritable(true);
transport.SetRtpPacketTransport(&fake_rtp); // rtp ready
EXPECT_TRUE(observer.ready_);
EXPECT_TRUE(observer.ready());
}
TEST_F(RtpTransportTest, DisablingRtcpMuxSignalsNotReady) {
TEST(RtpTransportTest, DisablingRtcpMuxSignalsNotReady) {
RtpTransport transport(kMuxEnabled);
SignalObserver observer(&transport);
rtc::FakePacketTransport fake_rtp("fake_rtp");
fake_rtp.SetWritable(true);
transport.SetRtpPacketTransport(&fake_rtp); // rtp ready
EXPECT_TRUE(observer.ready_);
EXPECT_TRUE(observer.ready());
transport.SetRtcpMuxEnabled(false);
EXPECT_FALSE(observer.ready_);
EXPECT_FALSE(observer.ready());
}
TEST_F(RtpTransportTest, EnablingRtcpMuxSignalsReady) {
TEST(RtpTransportTest, EnablingRtcpMuxSignalsReady) {
RtpTransport transport(kMuxDisabled);
SignalObserver observer(&transport);
rtc::FakePacketTransport fake_rtp("fake_rtp");
fake_rtp.SetWritable(true);
transport.SetRtpPacketTransport(&fake_rtp); // rtp ready
EXPECT_FALSE(observer.ready_);
EXPECT_FALSE(observer.ready());
transport.SetRtcpMuxEnabled(true);
EXPECT_TRUE(observer.ready_);
EXPECT_TRUE(observer.ready());
}
class SignalCounter : public sigslot::has_slots<> {
@ -120,11 +121,14 @@ class SignalCounter : public sigslot::has_slots<> {
explicit SignalCounter(RtpTransport* transport) {
transport->SignalReadyToSend.connect(this, &SignalCounter::OnReadyToSend);
}
int count() const { return count_; }
void OnReadyToSend(bool ready) { ++count_; }
private:
int count_ = 0;
};
TEST_F(RtpTransportTest, ChangingReadyToSendStateOnlySignalsWhenChanged) {
TEST(RtpTransportTest, ChangingReadyToSendStateOnlySignalsWhenChanged) {
RtpTransport transport(kMuxEnabled);
SignalCounter observer(&transport);
rtc::FakePacketTransport fake_rtp("fake_rtp");
@ -132,19 +136,101 @@ TEST_F(RtpTransportTest, ChangingReadyToSendStateOnlySignalsWhenChanged) {
// State changes, so we should signal.
transport.SetRtpPacketTransport(&fake_rtp);
EXPECT_EQ(observer.count_, 1);
EXPECT_EQ(observer.count(), 1);
// State does not change, so we should not signal.
transport.SetRtpPacketTransport(&fake_rtp);
EXPECT_EQ(observer.count_, 1);
EXPECT_EQ(observer.count(), 1);
// State does not change, so we should not signal.
transport.SetRtcpMuxEnabled(true);
EXPECT_EQ(observer.count_, 1);
EXPECT_EQ(observer.count(), 1);
// State changes, so we should signal.
transport.SetRtcpMuxEnabled(false);
EXPECT_EQ(observer.count_, 2);
EXPECT_EQ(observer.count(), 2);
}
class SignalPacketReceivedCounter : public sigslot::has_slots<> {
public:
explicit SignalPacketReceivedCounter(RtpTransport* transport) {
transport->SignalPacketReceived.connect(
this, &SignalPacketReceivedCounter::OnPacketReceived);
}
int rtcp_count() const { return rtcp_count_; }
int rtp_count() const { return rtp_count_; }
private:
void OnPacketReceived(bool rtcp,
rtc::CopyOnWriteBuffer&,
const rtc::PacketTime&) {
if (rtcp) {
++rtcp_count_;
} else {
++rtp_count_;
}
}
int rtcp_count_ = 0;
int rtp_count_ = 0;
};
// Test that SignalPacketReceived fires with rtcp=true when a RTCP packet is
// received.
TEST(RtpTransportTest, SignalDemuxedRtcp) {
RtpTransport transport(kMuxDisabled);
SignalPacketReceivedCounter observer(&transport);
rtc::FakePacketTransport fake_rtp("fake_rtp");
fake_rtp.SetDestination(&fake_rtp, true);
transport.SetRtpPacketTransport(&fake_rtp);
// An rtcp packet.
const char data[] = {0, 73, 0, 0};
const int len = 4;
const rtc::PacketOptions options;
const int flags = 0;
fake_rtp.SendPacket(data, len, options, flags);
EXPECT_EQ(0, observer.rtp_count());
EXPECT_EQ(1, observer.rtcp_count());
}
static const unsigned char kRtpData[] = {0x80, 0x11, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0};
static const int kRtpLen = 12;
// Test that SignalPacketReceived fires with rtcp=false when a RTP packet with a
// handled payload type is received.
TEST(RtpTransportTest, SignalHandledRtpPayloadType) {
RtpTransport transport(kMuxDisabled);
SignalPacketReceivedCounter observer(&transport);
rtc::FakePacketTransport fake_rtp("fake_rtp");
fake_rtp.SetDestination(&fake_rtp, true);
transport.SetRtpPacketTransport(&fake_rtp);
transport.AddHandledPayloadType(0x11);
// An rtp packet.
const rtc::PacketOptions options;
const int flags = 0;
rtc::Buffer rtp_data(kRtpData, kRtpLen);
fake_rtp.SendPacket(rtp_data.data<char>(), kRtpLen, options, flags);
EXPECT_EQ(1, observer.rtp_count());
EXPECT_EQ(0, observer.rtcp_count());
}
// Test that SignalPacketReceived does not fire when a RTP packet with an
// unhandled payload type is received.
TEST(RtpTransportTest, DontSignalUnhandledRtpPayloadType) {
RtpTransport transport(kMuxDisabled);
SignalPacketReceivedCounter observer(&transport);
rtc::FakePacketTransport fake_rtp("fake_rtp");
fake_rtp.SetDestination(&fake_rtp, true);
transport.SetRtpPacketTransport(&fake_rtp);
const rtc::PacketOptions options;
const int flags = 0;
rtc::Buffer rtp_data(kRtpData, kRtpLen);
fake_rtp.SendPacket(rtp_data.data<char>(), kRtpLen, options, flags);
EXPECT_EQ(0, observer.rtp_count());
EXPECT_EQ(0, observer.rtcp_count());
}
} // namespace webrtc