Protected streams report RTP messages directly to the FlexFec streams

In preparation of making RTP packet demuxing many-to-one (one SSRC goes to one sink, but one sink may have multiple SSRCs), we need to remove FlexFEC's dependence on being able to register itself with the demuxer. Instead, we register FlexFEC streams with the streams they protect; when those streams get packets, they'll forward them to their associated FlexFEC streams, too.

BUG=webrtc:7135

Review-Url: https://codereview.webrtc.org/2974453002
Cr-Commit-Position: refs/heads/master@{#19219}
This commit is contained in:
eladalon 2017-08-02 07:39:07 -07:00 committed by Commit Bot
parent a67bd9d641
commit c0d481a4a6
19 changed files with 256 additions and 6 deletions

View File

@ -17,13 +17,14 @@
#include <vector>
#include "webrtc/api/call/transport.h"
#include "webrtc/call/rtp_packet_sink_interface.h"
#include "webrtc/config.h"
namespace webrtc {
class FlexfecReceiveStream {
class FlexfecReceiveStream : public RtpPacketSinkInterface {
public:
virtual ~FlexfecReceiveStream() = default;
~FlexfecReceiveStream() override = default;
struct Stats {
std::string ToString(int64_t time_ms) const;

View File

@ -157,8 +157,6 @@ FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
// problem too.
rtp_stream_receiver_ =
receiver_controller->CreateReceiver(config_.remote_ssrc, this);
for (uint32_t ssrc : config.protected_media_ssrcs)
receiver_controller->AddSink(ssrc, this);
}
FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {

View File

@ -30,8 +30,7 @@ class RtpRtcp;
class RtpStreamReceiverControllerInterface;
class RtpStreamReceiverInterface;
class FlexfecReceiveStreamImpl : public FlexfecReceiveStream,
public RtpPacketSinkInterface {
class FlexfecReceiveStreamImpl : public FlexfecReceiveStream {
public:
FlexfecReceiveStreamImpl(
RtpStreamReceiverControllerInterface* receiver_controller,

View File

@ -325,6 +325,7 @@ if (rtc_include_tests) {
"..:video_stream_api",
"../modules/audio_coding:rent_a_codec",
"../modules/audio_processing:audio_processing",
"../modules/rtp_rtcp:rtp_rtcp",
"../p2p:rtc_p2p",
]
sources = [

View File

@ -8,6 +8,7 @@ include_rules = [
"+webrtc/modules/audio_device",
"+webrtc/modules/audio_mixer",
"+webrtc/modules/audio_processing",
"+webrtc/modules/rtp_rtcp",
"+webrtc/modules/video_capture",
"+webrtc/modules/video_coding",
"+webrtc/p2p",

View File

@ -332,6 +332,12 @@ void FakeVideoReceiveStream::EnableEncodedFrameRecording(rtc::PlatformFile file,
rtc::ClosePlatformFile(file);
}
void FakeVideoReceiveStream::AddSecondarySink(
webrtc::RtpPacketSinkInterface* sink) {}
void FakeVideoReceiveStream::RemoveSecondarySink(
const webrtc::RtpPacketSinkInterface* sink) {}
FakeFlexfecReceiveStream::FakeFlexfecReceiveStream(
const webrtc::FlexfecReceiveStream::Config& config)
: config_(config), receiving_(false) {}
@ -354,6 +360,10 @@ webrtc::FlexfecReceiveStream::Stats FakeFlexfecReceiveStream::GetStats() const {
return webrtc::FlexfecReceiveStream::Stats();
}
void FakeFlexfecReceiveStream::OnRtpPacket(const webrtc::RtpPacketReceived&) {
RTC_NOTREACHED() << "Not implemented.";
}
FakeCall::FakeCall(const webrtc::Call::Config& config)
: config_(config),
audio_network_state_(webrtc::kNetworkUp),

View File

@ -29,6 +29,7 @@
#include "webrtc/call/audio_send_stream.h"
#include "webrtc/call/call.h"
#include "webrtc/call/flexfec_receive_stream.h"
#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
#include "webrtc/rtc_base/buffer.h"
#include "webrtc/video_receive_stream.h"
#include "webrtc/video_send_stream.h"
@ -198,6 +199,9 @@ class FakeVideoReceiveStream final : public webrtc::VideoReceiveStream {
void EnableEncodedFrameRecording(rtc::PlatformFile file,
size_t byte_limit) override;
void AddSecondarySink(webrtc::RtpPacketSinkInterface* sink) override;
void RemoveSecondarySink(const webrtc::RtpPacketSinkInterface* sink) override;
private:
// webrtc::VideoReceiveStream implementation.
void Start() override;
@ -226,6 +230,8 @@ class FakeFlexfecReceiveStream final : public webrtc::FlexfecReceiveStream {
webrtc::FlexfecReceiveStream::Stats GetStats() const override;
void OnRtpPacket(const webrtc::RtpPacketReceived& packet) override;
webrtc::FlexfecReceiveStream::Config config_;
bool receiving_;
};

View File

@ -2154,6 +2154,7 @@ WebRtcVideoChannel::WebRtcVideoReceiveStream::AllocatedDecoder::
WebRtcVideoChannel::WebRtcVideoReceiveStream::~WebRtcVideoReceiveStream() {
if (flexfec_stream_) {
MaybeDissociateFlexfecFromVideo();
call_->DestroyFlexfecReceiveStream(flexfec_stream_);
}
call_->DestroyVideoReceiveStream(stream_);
@ -2334,24 +2335,42 @@ void WebRtcVideoChannel::WebRtcVideoReceiveStream::SetRecvParameters(
void WebRtcVideoChannel::WebRtcVideoReceiveStream::
RecreateWebRtcVideoStream() {
if (stream_) {
MaybeDissociateFlexfecFromVideo();
call_->DestroyVideoReceiveStream(stream_);
stream_ = nullptr;
}
webrtc::VideoReceiveStream::Config config = config_.Copy();
config.rtp.protected_by_flexfec = (flexfec_stream_ != nullptr);
stream_ = call_->CreateVideoReceiveStream(std::move(config));
MaybeAssociateFlexfecWithVideo();
stream_->Start();
}
void WebRtcVideoChannel::WebRtcVideoReceiveStream::
MaybeRecreateWebRtcFlexfecStream() {
if (flexfec_stream_) {
MaybeDissociateFlexfecFromVideo();
call_->DestroyFlexfecReceiveStream(flexfec_stream_);
flexfec_stream_ = nullptr;
}
if (flexfec_config_.IsCompleteAndEnabled()) {
flexfec_stream_ = call_->CreateFlexfecReceiveStream(flexfec_config_);
flexfec_stream_->Start();
MaybeAssociateFlexfecWithVideo();
}
}
void WebRtcVideoChannel::WebRtcVideoReceiveStream::
MaybeAssociateFlexfecWithVideo() {
if (stream_ && flexfec_stream_) {
stream_->AddSecondarySink(flexfec_stream_);
}
}
void WebRtcVideoChannel::WebRtcVideoReceiveStream::
MaybeDissociateFlexfecFromVideo() {
if (stream_ && flexfec_stream_) {
stream_->RemoveSecondarySink(flexfec_stream_);
}
}

View File

@ -424,6 +424,9 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport {
void RecreateWebRtcVideoStream();
void MaybeRecreateWebRtcFlexfecStream();
void MaybeAssociateFlexfecWithVideo();
void MaybeDissociateFlexfecFromVideo();
void ConfigureCodecs(const std::vector<VideoCodecSettings>& recv_codecs,
std::vector<AllocatedDecoder>* old_codecs);
void ConfigureFlexfecCodec(int flexfec_payload_type);

View File

@ -59,6 +59,8 @@ FecPacketCounter FlexfecReceiver::GetPacketCounter() const {
return packet_counter_;
}
// TODO(eladalon): Consider using packet.recovered() to avoid processing
// recovered packets here.
bool FlexfecReceiver::AddReceivedPacket(const RtpPacketReceived& packet) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);

View File

@ -336,6 +336,8 @@ void CallTest::CreateVideoStreams() {
video_receive_streams_.push_back(receiver_call_->CreateVideoReceiveStream(
video_receive_configs_[i].Copy()));
}
AssociateFlexfecStreamsWithVideoStreams();
}
void CallTest::SetFakeVideoCaptureRotation(VideoRotation rotation) {
@ -356,9 +358,30 @@ void CallTest::CreateFlexfecStreams() {
receiver_call_->CreateFlexfecReceiveStream(
flexfec_receive_configs_[i]));
}
AssociateFlexfecStreamsWithVideoStreams();
}
void CallTest::AssociateFlexfecStreamsWithVideoStreams() {
// All FlexFEC streams protect all of the video streams.
for (FlexfecReceiveStream* flexfec_recv_stream : flexfec_receive_streams_) {
for (VideoReceiveStream* video_recv_stream : video_receive_streams_) {
video_recv_stream->AddSecondarySink(flexfec_recv_stream);
}
}
}
void CallTest::DissociateFlexfecStreamsFromVideoStreams() {
for (FlexfecReceiveStream* flexfec_recv_stream : flexfec_receive_streams_) {
for (VideoReceiveStream* video_recv_stream : video_receive_streams_) {
video_recv_stream->RemoveSecondarySink(flexfec_recv_stream);
}
}
}
void CallTest::DestroyStreams() {
DissociateFlexfecStreamsFromVideoStreams();
if (audio_send_stream_)
sender_call_->DestroyAudioSendStream(audio_send_stream_);
audio_send_stream_ = nullptr;

View File

@ -95,6 +95,10 @@ class CallTest : public ::testing::Test {
void CreateVideoStreams();
void CreateAudioStreams();
void CreateFlexfecStreams();
void AssociateFlexfecStreamsWithVideoStreams();
void DissociateFlexfecStreamsFromVideoStreams();
void Start();
void Stop();
void DestroyStreams();

View File

@ -10,6 +10,7 @@
#include "webrtc/video/rtp_video_stream_receiver.h"
#include <algorithm>
#include <utility>
#include <vector>
@ -192,6 +193,8 @@ RtpVideoStreamReceiver::RtpVideoStreamReceiver(
}
RtpVideoStreamReceiver::~RtpVideoStreamReceiver() {
RTC_DCHECK(secondary_sinks_.empty());
if (nack_module_) {
process_thread_->DeRegisterModule(nack_module_.get());
}
@ -309,6 +312,9 @@ int32_t RtpVideoStreamReceiver::OnInitializeDecoder(
// This method handles both regular RTP packets and packets recovered
// via FlexFEC.
void RtpVideoStreamReceiver::OnRtpPacket(const RtpPacketReceived& packet) {
// TODO(eladalon): https://bugs.chromium.org/p/webrtc/issues/detail?id=8056
// RTC_DCHECK_RUN_ON(&worker_thread_checker_);
{
rtc::CritScope lock(&receive_cs_);
if (!receiving_) {
@ -362,6 +368,10 @@ void RtpVideoStreamReceiver::OnRtpPacket(const RtpPacketReceived& packet) {
rtp_receive_statistics_->IncomingPacket(
header, packet.size(), IsPacketRetransmitted(header, in_order));
}
for (RtpPacketSinkInterface* secondary_sink : secondary_sinks_) {
secondary_sink->OnRtpPacket(packet);
}
}
int32_t RtpVideoStreamReceiver::RequestKeyFrame() {
@ -429,6 +439,29 @@ rtc::Optional<int64_t> RtpVideoStreamReceiver::LastReceivedKeyframePacketMs()
return packet_buffer_->LastReceivedKeyframePacketMs();
}
void RtpVideoStreamReceiver::AddSecondarySink(RtpPacketSinkInterface* sink) {
// TODO(eladalon): https://bugs.chromium.org/p/webrtc/issues/detail?id=8056
// RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_DCHECK(std::find(secondary_sinks_.cbegin(), secondary_sinks_.cend(),
sink) == secondary_sinks_.cend());
secondary_sinks_.push_back(sink);
}
void RtpVideoStreamReceiver::RemoveSecondarySink(
const RtpPacketSinkInterface* sink) {
// TODO(eladalon): https://bugs.chromium.org/p/webrtc/issues/detail?id=8056
// RTC_DCHECK_RUN_ON(&worker_thread_checker_);
auto it = std::find(secondary_sinks_.begin(), secondary_sinks_.end(), sink);
if (it == secondary_sinks_.end()) {
// We might be rolling-back a call whose setup failed mid-way. In such a
// case, it's simpler to remove "everything" rather than remember what
// has already been added.
LOG(LS_WARNING) << "Removal of unknown sink.";
return;
}
secondary_sinks_.erase(it);
}
void RtpVideoStreamReceiver::ReceivePacket(const uint8_t* packet,
size_t packet_length,
const RTPHeader& header,

View File

@ -31,6 +31,7 @@
#include "webrtc/modules/video_coding/sequence_number_util.h"
#include "webrtc/rtc_base/constructormagic.h"
#include "webrtc/rtc_base/criticalsection.h"
#include "webrtc/rtc_base/thread_checker.h"
#include "webrtc/typedefs.h"
#include "webrtc/video_receive_stream.h"
@ -142,6 +143,13 @@ class RtpVideoStreamReceiver : public RtpData,
rtc::Optional<int64_t> LastReceivedPacketMs() const;
rtc::Optional<int64_t> LastReceivedKeyframePacketMs() const;
// RtpDemuxer only forwards a given RTP packet to one sink. However, some
// sinks, such as FlexFEC, might wish to be informed of all of the packets
// a given sink receives (or any set of sinks). They may do so by registering
// themselves as secondary sinks.
void AddSecondarySink(RtpPacketSinkInterface* sink);
void RemoveSecondarySink(const RtpPacketSinkInterface* sink);
private:
bool AddReceiveCodec(const VideoCodec& video_codec);
void ReceivePacket(const uint8_t* packet,
@ -201,6 +209,11 @@ class RtpVideoStreamReceiver : public RtpData,
int16_t last_payload_type_ = -1;
bool has_received_frame_;
// TODO(eladalon): https://bugs.chromium.org/p/webrtc/issues/detail?id=8056
// rtc::ThreadChecker worker_thread_checker_;
std::vector<RtpPacketSinkInterface*> secondary_sinks_; // This needs
// to be GUARDED_BY(worker_thread_checker_).
};
} // namespace webrtc

View File

@ -14,6 +14,7 @@
#include "webrtc/common_video/h264/h264_common.h"
#include "webrtc/media/base/mediaconstants.h"
#include "webrtc/modules/pacing/packet_router.h"
#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
#include "webrtc/modules/utility/include/process_thread.h"
#include "webrtc/modules/video_coding/frame_object.h"
#include "webrtc/modules/video_coding/include/video_coding_defines.h"
@ -22,6 +23,7 @@
#include "webrtc/modules/video_coding/timing.h"
#include "webrtc/rtc_base/bytebuffer.h"
#include "webrtc/rtc_base/logging.h"
#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/system_wrappers/include/clock.h"
#include "webrtc/system_wrappers/include/field_trial_default.h"
#include "webrtc/test/field_trial.h"
@ -93,6 +95,27 @@ class MockOnCompleteFrameCallback
rtc::ByteBufferWriter buffer_;
};
class MockRtpPacketSink : public RtpPacketSinkInterface {
public:
MOCK_METHOD1(OnRtpPacket, void(const RtpPacketReceived&));
};
constexpr uint32_t kSsrc = 111;
constexpr uint16_t kSequenceNumber = 222;
std::unique_ptr<RtpPacketReceived> CreateRtpPacketReceived(
uint32_t ssrc = kSsrc,
uint16_t sequence_number = kSequenceNumber) {
auto packet = rtc::MakeUnique<RtpPacketReceived>();
packet->SetSsrc(ssrc);
packet->SetSequenceNumber(sequence_number);
return packet;
}
MATCHER_P(SamePacketAs, other, "") {
return arg.Ssrc() == other.Ssrc() &&
arg.SequenceNumber() == other.SequenceNumber();
}
} // namespace
class RtpVideoStreamReceiverTest : public testing::Test {
@ -344,4 +367,95 @@ TEST_F(RtpVideoStreamReceiverTest, RequestKeyframeIfFirstFrameIsDelta) {
&rtp_header);
}
TEST_F(RtpVideoStreamReceiverTest, SecondarySinksGetRtpNotifications) {
rtp_video_stream_receiver_->StartReceive();
MockRtpPacketSink secondary_sink_1;
MockRtpPacketSink secondary_sink_2;
rtp_video_stream_receiver_->AddSecondarySink(&secondary_sink_1);
rtp_video_stream_receiver_->AddSecondarySink(&secondary_sink_2);
auto rtp_packet = CreateRtpPacketReceived();
EXPECT_CALL(secondary_sink_1, OnRtpPacket(SamePacketAs(*rtp_packet)));
EXPECT_CALL(secondary_sink_2, OnRtpPacket(SamePacketAs(*rtp_packet)));
rtp_video_stream_receiver_->OnRtpPacket(*rtp_packet);
// Test tear-down.
rtp_video_stream_receiver_->StopReceive();
rtp_video_stream_receiver_->RemoveSecondarySink(&secondary_sink_1);
rtp_video_stream_receiver_->RemoveSecondarySink(&secondary_sink_2);
}
TEST_F(RtpVideoStreamReceiverTest, RemovedSecondarySinksGetNoRtpNotifications) {
rtp_video_stream_receiver_->StartReceive();
MockRtpPacketSink secondary_sink;
rtp_video_stream_receiver_->AddSecondarySink(&secondary_sink);
rtp_video_stream_receiver_->RemoveSecondarySink(&secondary_sink);
auto rtp_packet = CreateRtpPacketReceived();
EXPECT_CALL(secondary_sink, OnRtpPacket(_)).Times(0);
rtp_video_stream_receiver_->OnRtpPacket(*rtp_packet);
// Test tear-down.
rtp_video_stream_receiver_->StopReceive();
}
TEST_F(RtpVideoStreamReceiverTest,
OnlyRemovedSecondarySinksExcludedFromNotifications) {
rtp_video_stream_receiver_->StartReceive();
MockRtpPacketSink kept_secondary_sink;
MockRtpPacketSink removed_secondary_sink;
rtp_video_stream_receiver_->AddSecondarySink(&kept_secondary_sink);
rtp_video_stream_receiver_->AddSecondarySink(&removed_secondary_sink);
rtp_video_stream_receiver_->RemoveSecondarySink(&removed_secondary_sink);
auto rtp_packet = CreateRtpPacketReceived();
EXPECT_CALL(kept_secondary_sink, OnRtpPacket(SamePacketAs(*rtp_packet)));
rtp_video_stream_receiver_->OnRtpPacket(*rtp_packet);
// Test tear-down.
rtp_video_stream_receiver_->StopReceive();
rtp_video_stream_receiver_->RemoveSecondarySink(&kept_secondary_sink);
}
TEST_F(RtpVideoStreamReceiverTest,
SecondariesOfNonStartedStreamGetNoNotifications) {
// Explicitly showing that the stream is not in the |started| state,
// regardless of whether streams start out |started| or |stopped|.
rtp_video_stream_receiver_->StopReceive();
MockRtpPacketSink secondary_sink;
rtp_video_stream_receiver_->AddSecondarySink(&secondary_sink);
auto rtp_packet = CreateRtpPacketReceived();
EXPECT_CALL(secondary_sink, OnRtpPacket(_)).Times(0);
rtp_video_stream_receiver_->OnRtpPacket(*rtp_packet);
// Test tear-down.
rtp_video_stream_receiver_->RemoveSecondarySink(&secondary_sink);
}
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
TEST_F(RtpVideoStreamReceiverTest, RepeatedSecondarySinkDisallowed) {
MockRtpPacketSink secondary_sink;
rtp_video_stream_receiver_->AddSecondarySink(&secondary_sink);
EXPECT_DEATH(rtp_video_stream_receiver_->AddSecondarySink(&secondary_sink),
"");
// Test tear-down.
rtp_video_stream_receiver_->RemoveSecondarySink(&secondary_sink);
}
#endif
} // namespace webrtc

View File

@ -2088,6 +2088,9 @@ void VideoQualityTest::RunWithRenderers(const Params& params) {
video_send_stream_->Stop();
for (FlexfecReceiveStream* flexfec_receive_stream :
flexfec_receive_streams_) {
for (VideoReceiveStream* video_receive_stream : video_receive_streams_) {
video_receive_stream->RemoveSecondarySink(flexfec_receive_stream);
}
flexfec_receive_stream->Stop();
receiver_call_->DestroyFlexfecReceiveStream(flexfec_receive_stream);
}

View File

@ -367,6 +367,15 @@ void VideoReceiveStream::EnableEncodedFrameRecording(rtc::PlatformFile file,
}
}
void VideoReceiveStream::AddSecondarySink(RtpPacketSinkInterface* sink) {
rtp_video_stream_receiver_.AddSecondarySink(sink);
}
void VideoReceiveStream::RemoveSecondarySink(
const RtpPacketSinkInterface* sink) {
rtp_video_stream_receiver_.RemoveSecondarySink(sink);
}
// TODO(tommi): This method grabs a lock 6 times.
void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) {
int64_t sync_offset_ms;

View File

@ -82,6 +82,9 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
void EnableEncodedFrameRecording(rtc::PlatformFile file,
size_t byte_limit) override;
void AddSecondarySink(RtpPacketSinkInterface* sink) override;
void RemoveSecondarySink(const RtpPacketSinkInterface* sink) override;
// Implements rtc::VideoSinkInterface<VideoFrame>.
void OnFrame(const VideoFrame& video_frame) override;

View File

@ -25,6 +25,7 @@
namespace webrtc {
class RtpPacketSinkInterface;
class VideoDecoder;
class VideoReceiveStream {
@ -221,6 +222,13 @@ class VideoReceiveStream {
EnableEncodedFrameRecording(rtc::kInvalidPlatformFileValue, 0);
}
// RtpDemuxer only forwards a given RTP packet to one sink. However, some
// sinks, such as FlexFEC, might wish to be informed of all of the packets
// a given sink receives (or any set of sinks). They may do so by registering
// themselves as secondary sinks.
virtual void AddSecondarySink(RtpPacketSinkInterface* sink) = 0;
virtual void RemoveSecondarySink(const RtpPacketSinkInterface* sink) = 0;
protected:
virtual ~VideoReceiveStream() {}
};