Make Call::OnRecoveredPacket parse RTP header and call OnRtpPacket.

To make the distinction for stats, add a |recovered| flag to
RtpPacketReceived.

BUG=webrtc:7135

Review-Url: https://codereview.webrtc.org/2693123002
Cr-Commit-Position: refs/heads/master@{#18103}
This commit is contained in:
nisse 2017-05-11 08:00:58 -07:00 committed by Commit bot
parent 198c80d0af
commit d2ef314292
11 changed files with 74 additions and 74 deletions

View File

@ -137,7 +137,7 @@ class Call : public webrtc::Call,
const PacketTime& packet_time) override;
// Implements RecoveredPacketReceiver.
bool OnRecoveredPacket(const uint8_t* packet, size_t length) override;
void OnRecoveredPacket(const uint8_t* packet, size_t length) override;
void SetBitrateConfig(
const webrtc::Call::Config::BitrateConfig& bitrate_config) override;
@ -179,7 +179,7 @@ class Call : public webrtc::Call,
rtc::Optional<RtpPacketReceived> ParseRtpPacket(const uint8_t* packet,
size_t length,
const PacketTime& packet_time)
const PacketTime* packet_time)
SHARED_LOCKS_REQUIRED(receive_crit_);
void UpdateSendHistograms(int64_t first_sent_packet_ms)
@ -409,7 +409,7 @@ Call::~Call() {
rtc::Optional<RtpPacketReceived> Call::ParseRtpPacket(
const uint8_t* packet,
size_t length,
const PacketTime& packet_time) {
const PacketTime* packet_time) {
RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(packet, length))
return rtc::Optional<RtpPacketReceived>();
@ -419,8 +419,8 @@ rtc::Optional<RtpPacketReceived> Call::ParseRtpPacket(
parsed_packet.IdentifyExtensions(it->second.extensions);
int64_t arrival_time_ms;
if (packet_time.timestamp != -1) {
arrival_time_ms = (packet_time.timestamp + 500) / 1000;
if (packet_time && packet_time->timestamp != -1) {
arrival_time_ms = (packet_time->timestamp + 500) / 1000;
} else {
arrival_time_ms = clock_->TimeInMilliseconds();
}
@ -1189,7 +1189,7 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
// TODO(nisse): We should parse the RTP header only here, and pass
// on parsed_packet to the receive streams.
rtc::Optional<RtpPacketReceived> parsed_packet =
ParseRtpPacket(packet, length, packet_time);
ParseRtpPacket(packet, length, &packet_time);
if (!parsed_packet)
return DELIVERY_PACKET_ERROR;
@ -1255,13 +1255,20 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
// TODO(brandtr): Update this member function when we support protecting
// audio packets with FlexFEC.
bool Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(&packet[8]);
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
ReadLockScoped read_lock(*receive_crit_);
auto it = video_receive_ssrcs_.find(ssrc);
rtc::Optional<RtpPacketReceived> parsed_packet =
ParseRtpPacket(packet, length, nullptr);
if (!parsed_packet)
return;
parsed_packet->set_recovered(true);
auto it = video_receive_ssrcs_.find(parsed_packet->Ssrc());
if (it == video_receive_ssrcs_.end())
return false;
return it->second->OnRecoveredPacket(packet, length);
return;
it->second->OnRtpPacket(*parsed_packet);
}
void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,

View File

@ -27,7 +27,7 @@ namespace webrtc {
// should be able to demultiplex the recovered RTP packets based on SSRC.
class RecoveredPacketReceiver {
public:
virtual bool OnRecoveredPacket(const uint8_t* packet, size_t length) = 0;
virtual void OnRecoveredPacket(const uint8_t* packet, size_t length) = 0;
protected:
virtual ~RecoveredPacketReceiver() = default;

View File

@ -19,7 +19,7 @@ namespace webrtc {
class MockRecoveredPacketReceiver : public RecoveredPacketReceiver {
public:
MOCK_METHOD2(OnRecoveredPacket, bool(const uint8_t* packet, size_t length));
MOCK_METHOD2(OnRecoveredPacket, void(const uint8_t* packet, size_t length));
};
} // namespace webrtc

View File

@ -132,10 +132,8 @@ bool FlexfecReceiver::ProcessReceivedPackets() {
continue;
}
++packet_counter_.num_recovered_packets;
if (!recovered_packet_receiver_->OnRecoveredPacket(
recovered_packet->pkt->data, recovered_packet->pkt->length)) {
return false;
}
recovered_packet_receiver_->OnRecoveredPacket(
recovered_packet->pkt->data, recovered_packet->pkt->length);
recovered_packet->returned = true;
// Periodically log the incoming packets.
int64_t now_ms = clock_->TimeInMilliseconds();

View File

@ -227,8 +227,7 @@ TEST_F(FlexfecReceiverTest, RecoversFromSingleMediaLoss) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_it)->length))
.With(
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)))
.WillOnce(Return(true));
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)));
receiver_.OnRtpPacket(ParsePacket(*packet_with_rtp_header));
}
@ -250,8 +249,7 @@ TEST_F(FlexfecReceiverTest, RecoversFromDoubleMediaLoss) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_it)->length))
.With(
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)))
.WillOnce(Return(true));
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)));
receiver_.OnRtpPacket(ParsePacket(*packet_with_rtp_header));
// Receive second FEC packet and recover second lost media packet.
@ -261,8 +259,7 @@ TEST_F(FlexfecReceiverTest, RecoversFromDoubleMediaLoss) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_it)->length))
.With(
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)))
.WillOnce(Return(true));
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)));
receiver_.OnRtpPacket(ParsePacket(*packet_with_rtp_header));
}
@ -301,8 +298,7 @@ TEST_F(FlexfecReceiverTest, DoesNotCallbackTwice) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_it)->length))
.With(
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)))
.WillOnce(Return(true));
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)));
receiver_.OnRtpPacket(ParsePacket(*packet_with_rtp_header));
// Receive FEC packet again.
@ -348,8 +344,7 @@ TEST_F(FlexfecReceiverTest, RecoversFrom50PercentLoss) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_it)->length))
.With(Args<0, 1>(
ElementsAreArray((*media_it)->data, (*media_it)->length)))
.WillOnce(Return(true));
ElementsAreArray((*media_it)->data, (*media_it)->length)));
receiver_.OnRtpPacket(ParsePacket(*fec_packet_with_rtp_header));
++media_it;
}
@ -389,8 +384,7 @@ TEST_F(FlexfecReceiverTest, DelayedFecPacketDoesHelp) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_it)->length))
.With(
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)))
.WillOnce(Return(true));
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)));
receiver_.OnRtpPacket(ParsePacket(*packet_with_rtp_header));
}
@ -454,13 +448,11 @@ TEST_F(FlexfecReceiverTest, RecoversWithMediaPacketsOutOfOrder) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_packet1)->length))
.With(Args<0, 1>(
ElementsAreArray((*media_packet1)->data, (*media_packet1)->length)))
.WillOnce(Return(true));
ElementsAreArray((*media_packet1)->data, (*media_packet1)->length)));
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_packet4)->length))
.With(Args<0, 1>(
ElementsAreArray((*media_packet4)->data, (*media_packet4)->length)))
.WillOnce(Return(true));
ElementsAreArray((*media_packet4)->data, (*media_packet4)->length)));
// Add FEC packets.
auto fec_it = fec_packets.begin();
@ -492,8 +484,7 @@ TEST_F(FlexfecReceiverTest, CalculatesNumberOfPackets) {
EXPECT_CALL(recovered_packet_receiver_,
OnRecoveredPacket(_, (*media_it)->length))
.With(
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)))
.WillOnce(Return(true));
Args<0, 1>(ElementsAreArray((*media_it)->data, (*media_it)->length)));
receiver_.OnRtpPacket(ParsePacket(*packet_with_rtp_header));
// Check stats calculations.

View File

@ -36,9 +36,9 @@ class RtpPacketReceived : public rtp::Packet {
NtpTime capture_ntp_time() const { return capture_time_; }
void set_capture_ntp_time(NtpTime time) { capture_time_ = time; }
// Flag if packet arrived via rtx.
bool retransmit() const { return retransmit_; }
void set_retransmit(bool value) { retransmit_ = value; }
// Flag if packet was recovered via RTX or FEC.
bool recovered() const { return recovered_; }
void set_recovered(bool value) { recovered_ = value; }
int payload_type_frequency() const { return payload_type_frequency_; }
void set_payload_type_frequency(int value) {
@ -49,7 +49,7 @@ class RtpPacketReceived : public rtp::Packet {
NtpTime capture_time_;
int64_t arrival_time_ms_ = 0;
int payload_type_frequency_ = 0;
bool retransmit_ = false;
bool recovered_ = false;
};
} // namespace webrtc

View File

@ -20,7 +20,7 @@ namespace webrtc {
namespace {
class DummyCallback : public RecoveredPacketReceiver {
bool OnRecoveredPacket(const uint8_t* packet, size_t length) { return true; }
void OnRecoveredPacket(const uint8_t* packet, size_t length) override {}
};
} // namespace

View File

@ -275,6 +275,9 @@ int32_t RtpStreamReceiver::OnReceivedPayloadData(
return 0;
}
// TODO(nisse): Try to delete this method. Obstacles: It is used by
// ParseAndHandleEncapsulatingHeader, for handling Rtx packets. And
// it's part of the RtpData interface which we implement.
bool RtpStreamReceiver::OnRecoveredPacket(const uint8_t* rtp_packet,
size_t rtp_packet_length) {
RTPHeader header;
@ -302,36 +305,37 @@ void RtpStreamReceiver::OnIncomingSSRCChanged(const uint32_t ssrc) {
rtp_rtcp_->SetRemoteSSRC(ssrc);
}
// This method handles both regular RTP packets and packets recovered
// via FlexFEC.
void RtpStreamReceiver::OnRtpPacket(const RtpPacketReceived& packet) {
{
rtc::CritScope lock(&receive_cs_);
if (!receiving_) {
return;
}
}
int64_t now_ms = clock_->TimeInMilliseconds();
if (!packet.recovered()) {
int64_t now_ms = clock_->TimeInMilliseconds();
{
// Periodically log the RTP header of incoming packets.
rtc::CritScope lock(&receive_cs_);
if (now_ms - last_packet_log_ms_ > kPacketLogIntervalMs) {
std::stringstream ss;
ss << "Packet received on SSRC: " << packet.Ssrc()
<< " with payload type: " << static_cast<int>(packet.PayloadType())
<< ", timestamp: " << packet.Timestamp()
<< ", sequence number: " << packet.SequenceNumber()
<< ", arrival time: " << packet.arrival_time_ms();
int32_t time_offset;
if (packet.GetExtension<TransmissionOffset>(&time_offset)) {
ss << ", toffset: " << time_offset;
// Periodically log the RTP header of incoming packets.
if (now_ms - last_packet_log_ms_ > kPacketLogIntervalMs) {
std::stringstream ss;
ss << "Packet received on SSRC: " << packet.Ssrc()
<< " with payload type: " << static_cast<int>(packet.PayloadType())
<< ", timestamp: " << packet.Timestamp()
<< ", sequence number: " << packet.SequenceNumber()
<< ", arrival time: " << packet.arrival_time_ms();
int32_t time_offset;
if (packet.GetExtension<TransmissionOffset>(&time_offset)) {
ss << ", toffset: " << time_offset;
}
uint32_t send_time;
if (packet.GetExtension<AbsoluteSendTime>(&send_time)) {
ss << ", abs send time: " << send_time;
}
LOG(LS_INFO) << ss.str();
last_packet_log_ms_ = now_ms;
}
uint32_t send_time;
if (packet.GetExtension<AbsoluteSendTime>(&send_time)) {
ss << ", abs send time: " << send_time;
}
LOG(LS_INFO) << ss.str();
last_packet_log_ms_ = now_ms;
}
}
@ -343,13 +347,20 @@ void RtpStreamReceiver::OnRtpPacket(const RtpPacketReceived& packet) {
header.payload_type_frequency = kVideoPayloadTypeFrequency;
bool in_order = IsPacketInOrder(header);
rtp_payload_registry_.SetIncomingPayloadType(header);
if (!packet.recovered()) {
// TODO(nisse): Why isn't this done for recovered packets?
rtp_payload_registry_.SetIncomingPayloadType(header);
}
ReceivePacket(packet.data(), packet.size(), header, in_order);
// Update receive statistics after ReceivePacket.
// Receive statistics will be reset if the payload type changes (make sure
// that the first packet is included in the stats).
rtp_receive_statistics_->IncomingPacket(
header, packet.size(), IsPacketRetransmitted(header, in_order));
if (!packet.recovered()) {
// TODO(nisse): We should pass a recovered flag to stats, to aid
// fixing bug bugs.webrtc.org/6339.
rtp_receive_statistics_->IncomingPacket(
header, packet.size(), IsPacketRetransmitted(header, in_order));
}
}
int32_t RtpStreamReceiver::RequestKeyFrame() {

View File

@ -246,11 +246,6 @@ void VideoReceiveStream::OnRtpPacket(const RtpPacketReceived& packet) {
rtp_stream_receiver_.OnRtpPacket(packet);
}
bool VideoReceiveStream::OnRecoveredPacket(const uint8_t* packet,
size_t length) {
return rtp_stream_receiver_.OnRecoveredPacket(packet, length);
}
void VideoReceiveStream::SetSync(Syncable* audio_syncable) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
rtp_stream_sync_.ConfigureSync(audio_syncable);

View File

@ -60,8 +60,6 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
void SignalNetworkState(NetworkState state);
bool DeliverRtcp(const uint8_t* packet, size_t length);
bool OnRecoveredPacket(const uint8_t* packet, size_t length);
void SetSync(Syncable* audio_syncable);
// Implements webrtc::VideoReceiveStream.

View File

@ -129,9 +129,9 @@ TEST_F(VideoReceiveStreamTest, CreateFrameFromH264FmtpSpropAndIdr) {
EXPECT_CALL(mock_h264_video_decoder_, RegisterDecodeCompleteCallback(_));
video_receive_stream_->Start();
EXPECT_CALL(mock_h264_video_decoder_, Decode(_, false, _, _, _));
EXPECT_EQ(true,
video_receive_stream_->OnRecoveredPacket(rtppacket.data(),
rtppacket.size()));
RtpPacketReceived parsed_packet;
ASSERT_TRUE(parsed_packet.Parse(rtppacket.data(), rtppacket.size()));
video_receive_stream_->OnRtpPacket(parsed_packet);
EXPECT_CALL(mock_h264_video_decoder_, Release());
// Make sure the decoder thread had a chance to run.
init_decode_event_.Wait(kDefaultTimeOutMs);