Integrate FlexfecReceiveStream with Call.

Call demultiplexes received RTP packets and delivers these to the
appropriate {Video,Flexfec}ReceiveStreams. A single video stream
could conceivably be protected by multiple FlexFEC streams.

BUG=webrtc:5654

Review-Url: https://codereview.webrtc.org/2388303009
Cr-Commit-Position: refs/heads/master@{#14727}
This commit is contained in:
brandtr 2016-10-23 23:37:14 -07:00 committed by Commit bot
parent 764e364933
commit 25445d3d4b
5 changed files with 180 additions and 1 deletions

View File

@ -16,6 +16,7 @@
#include "webrtc/api/call/audio_receive_stream.h"
#include "webrtc/api/call/audio_send_stream.h"
#include "webrtc/api/call/audio_state.h"
#include "webrtc/api/call/flexfec_receive_stream.h"
#include "webrtc/base/networkroute.h"
#include "webrtc/base/platform_file.h"
#include "webrtc/base/socket.h"
@ -131,6 +132,11 @@ class Call {
virtual void DestroyVideoReceiveStream(
VideoReceiveStream* receive_stream) = 0;
virtual FlexfecReceiveStream* CreateFlexfecReceiveStream(
FlexfecReceiveStream::Config configuration) = 0;
virtual void DestroyFlexfecReceiveStream(
FlexfecReceiveStream* receive_stream) = 0;
// All received RTP and RTCP packets for the call should be inserted to this
// PacketReceiver. The PacketReceiver pointer is valid as long as the
// Call instance exists.

View File

@ -12,6 +12,7 @@
#include <algorithm>
#include <map>
#include <memory>
#include <utility>
#include <vector>
#include "webrtc/audio/audio_receive_stream.h"
@ -28,6 +29,7 @@
#include "webrtc/base/trace_event.h"
#include "webrtc/call.h"
#include "webrtc/call/bitrate_allocator.h"
#include "webrtc/call/flexfec_receive_stream.h"
#include "webrtc/config.h"
#include "webrtc/logging/rtc_event_log/rtc_event_log.h"
#include "webrtc/modules/bitrate_controller/include/bitrate_controller.h"
@ -66,6 +68,7 @@ class Call : public webrtc::Call,
explicit Call(const Call::Config& config);
virtual ~Call();
// Implements webrtc::Call.
PacketReceiver* Receiver() override;
webrtc::AudioSendStream* CreateAudioSendStream(
@ -87,8 +90,14 @@ class Call : public webrtc::Call,
void DestroyVideoReceiveStream(
webrtc::VideoReceiveStream* receive_stream) override;
webrtc::FlexfecReceiveStream* CreateFlexfecReceiveStream(
webrtc::FlexfecReceiveStream::Config configuration) override;
void DestroyFlexfecReceiveStream(
webrtc::FlexfecReceiveStream* receive_stream) override;
Stats GetStats() const override;
// Implements PacketReceiver.
DeliveryStatus DeliverPacket(MediaType media_type,
const uint8_t* packet,
size_t length,
@ -153,13 +162,22 @@ class Call : public webrtc::Call,
NetworkState video_network_state_;
std::unique_ptr<RWLockWrapper> receive_crit_;
// Audio and Video receive streams are owned by the client that creates them.
// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_
GUARDED_BY(receive_crit_);
std::map<uint32_t, VideoReceiveStream*> video_receive_ssrcs_
GUARDED_BY(receive_crit_);
std::set<VideoReceiveStream*> video_receive_streams_
GUARDED_BY(receive_crit_);
// Each media stream could conceivably be protected by multiple FlexFEC
// streams.
std::multimap<uint32_t, FlexfecReceiveStream*> flexfec_receive_ssrcs_media_
GUARDED_BY(receive_crit_);
std::map<uint32_t, FlexfecReceiveStream*> flexfec_receive_ssrcs_protection_
GUARDED_BY(receive_crit_);
std::set<FlexfecReceiveStream*> flexfec_receive_streams_
GUARDED_BY(receive_crit_);
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
GUARDED_BY(receive_crit_);
@ -578,6 +596,58 @@ void Call::DestroyVideoReceiveStream(
delete receive_stream_impl;
}
webrtc::FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
webrtc::FlexfecReceiveStream::Config configuration) {
TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
FlexfecReceiveStream* receive_stream =
new FlexfecReceiveStream(std::move(configuration), this);
const webrtc::FlexfecReceiveStream::Config& config = receive_stream->config();
{
WriteLockScoped write_lock(*receive_crit_);
for (auto ssrc : config.protected_media_ssrcs)
flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream));
RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.flexfec_ssrc) ==
flexfec_receive_ssrcs_protection_.end());
flexfec_receive_ssrcs_protection_[config.flexfec_ssrc] = receive_stream;
flexfec_receive_streams_.insert(receive_stream);
}
// TODO(brandtr): Store config in RtcEventLog here.
return receive_stream;
}
void Call::DestroyFlexfecReceiveStream(
webrtc::FlexfecReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
RTC_DCHECK(receive_stream != nullptr);
// There exist no other derived classes of webrtc::FlexfecReceiveStream,
// so this downcast is safe.
FlexfecReceiveStream* receive_stream_impl =
static_cast<FlexfecReceiveStream*>(receive_stream);
{
WriteLockScoped write_lock(*receive_crit_);
// Remove all SSRCs pointing to the FlexfecReceiveStream to be destroyed.
auto media_it = flexfec_receive_ssrcs_media_.begin();
while (media_it != flexfec_receive_ssrcs_media_.end()) {
if (media_it->second == receive_stream_impl)
media_it = flexfec_receive_ssrcs_media_.erase(media_it);
else
++media_it;
}
auto prot_it = flexfec_receive_ssrcs_protection_.begin();
while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
if (prot_it->second == receive_stream_impl)
prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it);
else
++prot_it;
}
flexfec_receive_streams_.erase(receive_stream_impl);
}
delete receive_stream_impl;
}
Call::Stats Call::GetStats() const {
// TODO(solenberg): Some test cases in EndToEndTest use this from a different
// thread. Re-enable once that is fixed.
@ -923,6 +993,21 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
auto status = it->second->DeliverRtp(packet, length, packet_time)
? DELIVERY_OK
: DELIVERY_PACKET_ERROR;
// Deliver media packets to FlexFEC subsystem.
auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
for (auto it = it_bounds.first; it != it_bounds.second; ++it)
it->second->AddAndProcessReceivedPacket(packet, length);
if (status == DELIVERY_OK)
event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
return status;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
if (it != flexfec_receive_ssrcs_protection_.end()) {
auto status = it->second->AddAndProcessReceivedPacket(packet, length)
? DELIVERY_OK
: DELIVERY_PACKET_ERROR;
if (status == DELIVERY_OK)
event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
return status;

View File

@ -117,4 +117,75 @@ TEST(CallTest, CreateDestroy_AudioReceiveStreams) {
streams.clear();
}
}
TEST(CallTest, CreateDestroy_FlexfecReceiveStream) {
CallHelper call;
FlexfecReceiveStream::Config config;
config.flexfec_payload_type = 118;
config.flexfec_ssrc = 38837212;
config.protected_media_ssrcs = {27273};
FlexfecReceiveStream* stream = call->CreateFlexfecReceiveStream(config);
EXPECT_NE(stream, nullptr);
call->DestroyFlexfecReceiveStream(stream);
}
TEST(CallTest, CreateDestroy_FlexfecReceiveStreams) {
CallHelper call;
FlexfecReceiveStream::Config config;
config.flexfec_payload_type = 118;
std::list<FlexfecReceiveStream*> streams;
for (int i = 0; i < 2; ++i) {
for (uint32_t ssrc = 0; ssrc < 1234567; ssrc += 34567) {
config.flexfec_ssrc = ssrc;
config.protected_media_ssrcs = {ssrc + 1};
FlexfecReceiveStream* stream = call->CreateFlexfecReceiveStream(config);
EXPECT_NE(stream, nullptr);
if (ssrc & 1) {
streams.push_back(stream);
} else {
streams.push_front(stream);
}
}
for (auto s : streams) {
call->DestroyFlexfecReceiveStream(s);
}
streams.clear();
}
}
TEST(CallTest, MultipleFlexfecReceiveStreamsProtectingSingleVideoStream) {
CallHelper call;
FlexfecReceiveStream::Config config;
config.flexfec_payload_type = 118;
config.protected_media_ssrcs = {1324234};
FlexfecReceiveStream* stream;
std::list<FlexfecReceiveStream*> streams;
config.flexfec_ssrc = 838383;
stream = call->CreateFlexfecReceiveStream(config);
EXPECT_NE(stream, nullptr);
streams.push_back(stream);
config.flexfec_ssrc = 424993;
stream = call->CreateFlexfecReceiveStream(config);
EXPECT_NE(stream, nullptr);
streams.push_back(stream);
config.flexfec_ssrc = 99383;
stream = call->CreateFlexfecReceiveStream(config);
EXPECT_NE(stream, nullptr);
streams.push_back(stream);
config.flexfec_ssrc = 5548;
stream = call->CreateFlexfecReceiveStream(config);
EXPECT_NE(stream, nullptr);
streams.push_back(stream);
for (auto s : streams) {
call->DestroyFlexfecReceiveStream(s);
}
}
} // namespace webrtc

View File

@ -433,6 +433,17 @@ void FakeCall::DestroyVideoReceiveStream(
}
}
webrtc::FlexfecReceiveStream* FakeCall::CreateFlexfecReceiveStream(
webrtc::FlexfecReceiveStream::Config config) {
// TODO(brandtr): Implement when adding integration with WebRtcVideoEngine2.
return nullptr;
}
void FakeCall::DestroyFlexfecReceiveStream(
webrtc::FlexfecReceiveStream* receive_stream) {
// TODO(brandtr): Implement when adding integration with WebRtcVideoEngine2.
}
webrtc::PacketReceiver* FakeCall::Receiver() {
return this;
}

View File

@ -228,6 +228,12 @@ class FakeCall final : public webrtc::Call, public webrtc::PacketReceiver {
webrtc::VideoReceiveStream::Config config) override;
void DestroyVideoReceiveStream(
webrtc::VideoReceiveStream* receive_stream) override;
webrtc::FlexfecReceiveStream* CreateFlexfecReceiveStream(
webrtc::FlexfecReceiveStream::Config config) override;
void DestroyFlexfecReceiveStream(
webrtc::FlexfecReceiveStream* receive_stream) override;
webrtc::PacketReceiver* Receiver() override;
DeliveryStatus DeliverPacket(webrtc::MediaType media_type,