Implement RTP keepalive in native stack.

BUG=webrtc:7907

Review-Url: https://codereview.webrtc.org/2960363002
Cr-Commit-Position: refs/heads/master@{#18912}
This commit is contained in:
sprang 2017-07-06 04:38:06 -07:00 committed by Commit Bot
parent 5c0d703382
commit 168794c43c
12 changed files with 269 additions and 35 deletions

View File

@ -912,6 +912,17 @@ enum NetworkState {
kNetworkDown,
};
struct RtpKeepAliveConfig {
// If no packet has been sent for |timeout_interval_ms|, send a keep-alive
// packet. The keep-alive packet is an empty (no payload) RTP packet with a
// payload type of 20 as long as the other end has not negotiated the use of
// this value. If this value has already been negotiated, then some other
// unused static payload type from table 5 of RFC 3551 shall be used and set
// in |payload_type|.
int64_t timeout_interval_ms = -1;
uint8_t payload_type = 20;
};
} // namespace webrtc
#endif // WEBRTC_COMMON_TYPES_H_

View File

@ -19,6 +19,7 @@
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/deprecation.h"
#include "webrtc/base/optional.h"
#include "webrtc/common_types.h"
#include "webrtc/modules/include/module.h"
#include "webrtc/modules/rtp_rtcp/include/flexfec_sender.h"
#include "webrtc/modules/rtp_rtcp/include/rtp_rtcp_defines.h"
@ -92,6 +93,7 @@ class RtpRtcp : public Module {
SendPacketObserver* send_packet_observer = nullptr;
RateLimiter* retransmission_rate_limiter = nullptr;
OverheadObserver* overhead_observer = nullptr;
RtpKeepAliveConfig keepalive_config;
private:
RTC_DISALLOW_COPY_AND_ASSIGN(Configuration);

View File

@ -12,6 +12,7 @@
#include <string.h>
#include <algorithm>
#include <set>
#include <string>
@ -26,6 +27,11 @@
#endif
namespace webrtc {
namespace {
const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5;
const int64_t kRtpRtcpRttProcessTimeMs = 1000;
const int64_t kRtpRtcpBitrateProcessTimeMs = 10;
} // namespace
RTPExtensionType StringToRtpExtensionType(const std::string& extension) {
if (extension == RtpExtension::kTimestampOffsetUri)
@ -89,9 +95,12 @@ ModuleRtpRtcpImpl::ModuleRtpRtcpImpl(const Configuration& configuration)
this),
clock_(configuration.clock),
audio_(configuration.audio),
last_process_time_(configuration.clock->TimeInMilliseconds()),
last_bitrate_process_time_(configuration.clock->TimeInMilliseconds()),
last_rtt_process_time_(configuration.clock->TimeInMilliseconds()),
keepalive_config_(configuration.keepalive_config),
last_bitrate_process_time_(clock_->TimeInMilliseconds()),
last_rtt_process_time_(clock_->TimeInMilliseconds()),
next_process_time_(clock_->TimeInMilliseconds() +
kRtpRtcpMaxIdleTimeProcessMs),
next_keepalive_time_(-1),
packet_overhead_(28), // IPV4 UDP.
nack_last_time_sent_full_(0),
nack_last_time_sent_full_prev_(0),
@ -118,6 +127,11 @@ ModuleRtpRtcpImpl::ModuleRtpRtcpImpl(const Configuration& configuration)
configuration.overhead_observer));
// Make sure rtcp sender use same timestamp offset as rtp sender.
rtcp_sender_.SetTimestampOffset(rtp_sender_->TimestampOffset());
if (keepalive_config_.timeout_interval_ms != -1) {
next_keepalive_time_ =
clock_->TimeInMilliseconds() + keepalive_config_.timeout_interval_ms;
}
}
// Set default packet size limit.
@ -130,24 +144,38 @@ ModuleRtpRtcpImpl::ModuleRtpRtcpImpl(const Configuration& configuration)
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
int64_t ModuleRtpRtcpImpl::TimeUntilNextProcess() {
const int64_t now = clock_->TimeInMilliseconds();
const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5;
return kRtpRtcpMaxIdleTimeProcessMs - (now - last_process_time_);
return std::max<int64_t>(0,
next_process_time_ - clock_->TimeInMilliseconds());
}
// Process any pending tasks such as timeouts (non time critical events).
void ModuleRtpRtcpImpl::Process() {
const int64_t now = clock_->TimeInMilliseconds();
last_process_time_ = now;
next_process_time_ = now + kRtpRtcpMaxIdleTimeProcessMs;
if (rtp_sender_) {
const int64_t kRtpRtcpBitrateProcessTimeMs = 10;
if (now >= last_bitrate_process_time_ + kRtpRtcpBitrateProcessTimeMs) {
rtp_sender_->ProcessBitrate();
last_bitrate_process_time_ = now;
next_process_time_ =
std::min(next_process_time_, now + kRtpRtcpBitrateProcessTimeMs);
}
if (keepalive_config_.timeout_interval_ms > 0 &&
now >= next_keepalive_time_) {
int64_t last_send_time_ms = rtp_sender_->LastTimestampTimeMs();
// If no packet has been sent, |last_send_time_ms| will be 0, and so the
// keep-alive will be triggered as expected.
if (now >= last_send_time_ms + keepalive_config_.timeout_interval_ms) {
rtp_sender_->SendKeepAlive(keepalive_config_.payload_type);
next_keepalive_time_ = now + keepalive_config_.timeout_interval_ms;
} else {
next_keepalive_time_ =
last_send_time_ms + keepalive_config_.timeout_interval_ms;
}
next_process_time_ = std::min(next_process_time_, next_keepalive_time_);
}
}
const int64_t kRtpRtcpRttProcessTimeMs = 1000;
bool process_rtt = now >= last_rtt_process_time_ + kRtpRtcpRttProcessTimeMs;
if (rtcp_sender_.Sending()) {
// Process RTT if we have received a receiver report and we haven't
@ -201,6 +229,8 @@ void ModuleRtpRtcpImpl::Process() {
// Get processed rtt.
if (process_rtt) {
last_rtt_process_time_ = now;
next_process_time_ = std::min(
next_process_time_, last_rtt_process_time_ + kRtpRtcpRttProcessTimeMs);
if (rtt_stats_) {
// Make sure we have a valid RTT before setting.
int64_t last_rtt = rtt_stats_->LastProcessedRtt();

View File

@ -335,9 +335,12 @@ class ModuleRtpRtcpImpl : public RtpRtcp, public RTCPReceiver::ModuleRtpRtcp {
const Clock* const clock_;
const bool audio_;
int64_t last_process_time_;
const RtpKeepAliveConfig keepalive_config_;
int64_t last_bitrate_process_time_;
int64_t last_rtt_process_time_;
int64_t next_process_time_;
int64_t next_keepalive_time_;
uint16_t packet_overhead_;
// Send side

View File

@ -53,11 +53,12 @@ class SendTransport : public Transport,
public RtpData {
public:
SendTransport()
: receiver_(NULL),
clock_(NULL),
: receiver_(nullptr),
clock_(nullptr),
delay_ms_(0),
rtp_packets_sent_(0) {
}
rtp_packets_sent_(0),
keepalive_payload_type_(0),
num_keepalive_sent_(0) {}
void SetRtpRtcpModule(ModuleRtpRtcpImpl* receiver) {
receiver_ = receiver;
@ -73,6 +74,8 @@ class SendTransport : public Transport,
std::unique_ptr<RtpHeaderParser> parser(RtpHeaderParser::Create());
EXPECT_TRUE(parser->Parse(static_cast<const uint8_t*>(data), len, &header));
++rtp_packets_sent_;
if (header.payloadType == keepalive_payload_type_)
++num_keepalive_sent_;
last_rtp_header_ = header;
return true;
}
@ -93,12 +96,18 @@ class SendTransport : public Transport,
const WebRtcRTPHeader* rtp_header) override {
return 0;
}
void SetKeepalivePayloadType(uint8_t payload_type) {
keepalive_payload_type_ = payload_type;
}
size_t NumKeepaliveSent() { return num_keepalive_sent_; }
ModuleRtpRtcpImpl* receiver_;
SimulatedClock* clock_;
int64_t delay_ms_;
int rtp_packets_sent_;
RTPHeader last_rtp_header_;
std::vector<uint16_t> last_nack_list_;
uint8_t keepalive_payload_type_;
size_t num_keepalive_sent_;
};
class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
@ -106,19 +115,9 @@ class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
explicit RtpRtcpModule(SimulatedClock* clock)
: receive_statistics_(ReceiveStatistics::Create(clock)),
remote_ssrc_(0),
retransmission_rate_limiter_(clock, kMaxRttMs) {
RtpRtcp::Configuration config;
config.audio = false;
config.clock = clock;
config.outgoing_transport = &transport_;
config.receive_statistics = receive_statistics_.get();
config.rtcp_packet_type_counter_observer = this;
config.rtt_stats = &rtt_stats_;
config.retransmission_rate_limiter = &retransmission_rate_limiter_;
impl_.reset(new ModuleRtpRtcpImpl(config));
impl_->SetRTCPStatus(RtcpMode::kCompound);
retransmission_rate_limiter_(clock, kMaxRttMs),
clock_(clock) {
CreateModuleImpl();
transport_.SimulateNetworkDelay(kOneWayNetworkDelayMs, clock);
}
@ -130,6 +129,7 @@ class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
std::unique_ptr<ModuleRtpRtcpImpl> impl_;
uint32_t remote_ssrc_;
RateLimiter retransmission_rate_limiter_;
RtpKeepAliveConfig keepalive_config_;
void SetRemoteSsrc(uint32_t ssrc) {
remote_ssrc_ = ssrc;
@ -160,8 +160,30 @@ class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
std::vector<uint16_t> LastNackListSent() {
return transport_.last_nack_list_;
}
void SetKeepaliveConfigAndReset(const RtpKeepAliveConfig& config) {
keepalive_config_ = config;
// Need to create a new module impl, since it's configured at creation.
CreateModuleImpl();
transport_.SetKeepalivePayloadType(config.payload_type);
}
private:
void CreateModuleImpl() {
RtpRtcp::Configuration config;
config.audio = false;
config.clock = clock_;
config.outgoing_transport = &transport_;
config.receive_statistics = receive_statistics_.get();
config.rtcp_packet_type_counter_observer = this;
config.rtt_stats = &rtt_stats_;
config.retransmission_rate_limiter = &retransmission_rate_limiter_;
config.keepalive_config = keepalive_config_;
impl_.reset(new ModuleRtpRtcpImpl(config));
impl_->SetRTCPStatus(RtcpMode::kCompound);
}
SimulatedClock* const clock_;
std::map<uint32_t, RtcpPacketTypeCounter> counter_map_;
};
} // namespace
@ -169,9 +191,9 @@ class RtpRtcpModule : public RtcpPacketTypeCounterObserver {
class RtpRtcpImplTest : public ::testing::Test {
protected:
RtpRtcpImplTest()
: clock_(133590000000000),
sender_(&clock_),
receiver_(&clock_) {
: clock_(133590000000000), sender_(&clock_), receiver_(&clock_) {}
void SetUp() override {
// Send module.
sender_.impl_->SetSSRC(kSenderSsrc);
EXPECT_EQ(0, sender_.impl_->SetSendingStatus(true));
@ -196,6 +218,7 @@ class RtpRtcpImplTest : public ::testing::Test {
sender_.transport_.SetRtpRtcpModule(receiver_.impl_.get());
receiver_.transport_.SetRtpRtcpModule(sender_.impl_.get());
}
SimulatedClock clock_;
RtpRtcpModule sender_;
RtpRtcpModule receiver_;
@ -567,4 +590,58 @@ TEST_F(RtpRtcpImplTest, UniqueNackRequests) {
EXPECT_EQ(6U, sender_.RtcpReceived().unique_nack_requests);
EXPECT_EQ(75, sender_.RtcpReceived().UniqueNackRequestsInPercent());
}
TEST_F(RtpRtcpImplTest, SendsKeepaliveAfterTimout) {
const int kTimeoutMs = 1500;
RtpKeepAliveConfig config;
config.timeout_interval_ms = kTimeoutMs;
// Recreate sender impl with new configuration, and redo setup.
sender_.SetKeepaliveConfigAndReset(config);
SetUp();
// Initial process call.
sender_.impl_->Process();
EXPECT_EQ(0U, sender_.transport_.NumKeepaliveSent());
// After one time, a single keep-alive packet should be sent.
clock_.AdvanceTimeMilliseconds(kTimeoutMs);
sender_.impl_->Process();
EXPECT_EQ(1U, sender_.transport_.NumKeepaliveSent());
// Process for the same timestamp again, no new packet should be sent.
sender_.impl_->Process();
EXPECT_EQ(1U, sender_.transport_.NumKeepaliveSent());
// Move ahead to the last ms before a keep-alive is expected, no action.
clock_.AdvanceTimeMilliseconds(kTimeoutMs - 1);
sender_.impl_->Process();
EXPECT_EQ(1U, sender_.transport_.NumKeepaliveSent());
// Move the final ms, timeout relative last KA. Should create new keep-alive.
clock_.AdvanceTimeMilliseconds(1);
sender_.impl_->Process();
EXPECT_EQ(2U, sender_.transport_.NumKeepaliveSent());
// Move ahead to the last ms before Christmas.
clock_.AdvanceTimeMilliseconds(kTimeoutMs - 1);
sender_.impl_->Process();
EXPECT_EQ(2U, sender_.transport_.NumKeepaliveSent());
// Send actual payload data, no keep-alive expected.
SendFrame(&sender_, 0);
sender_.impl_->Process();
EXPECT_EQ(2U, sender_.transport_.NumKeepaliveSent());
// Move ahead as far as possible again, timeout now relative payload. No KA.
clock_.AdvanceTimeMilliseconds(kTimeoutMs - 1);
sender_.impl_->Process();
EXPECT_EQ(2U, sender_.transport_.NumKeepaliveSent());
// Timeout relative payload, send new keep-alive.
clock_.AdvanceTimeMilliseconds(1);
sender_.impl_->Process();
EXPECT_EQ(3U, sender_.transport_.NumKeepaliveSent());
}
} // namespace webrtc

View File

@ -1289,4 +1289,24 @@ void RTPSender::UpdateRtpOverhead(const RtpPacketToSend& packet) {
overhead_observer_->OnOverheadChanged(overhead_bytes_per_packet);
}
int64_t RTPSender::LastTimestampTimeMs() const {
rtc::CritScope lock(&send_critsect_);
return last_timestamp_time_ms_;
}
void RTPSender::SendKeepAlive(uint8_t payload_type) {
std::unique_ptr<RtpPacketToSend> packet = AllocatePacket();
packet->SetPayloadType(payload_type);
// Set marker bit and timestamps in the same manner as plain padding packets.
packet->SetMarker(false);
{
rtc::CritScope lock(&send_critsect_);
packet->SetTimestamp(last_rtp_timestamp_);
packet->set_capture_time_ms(capture_time_ms_);
}
AssignSequenceNumber(packet.get());
SendToNetwork(std::move(packet), StorageType::kDontRetransmit,
RtpPacketSender::Priority::kLowPriority);
}
} // namespace webrtc

View File

@ -205,6 +205,9 @@ class RTPSender {
void SetRtxRtpState(const RtpState& rtp_state);
RtpState GetRtxRtpState() const;
int64_t LastTimestampTimeMs() const;
void SendKeepAlive(uint8_t payload_type);
protected:
int32_t CheckPayloadType(int8_t payload_type, RtpVideoCodecTypes* video_type);

View File

@ -57,6 +57,7 @@ const uint8_t kPayloadData[] = {47, 11, 32, 93, 89};
using ::testing::_;
using ::testing::ElementsAreArray;
using ::testing::Invoke;
uint64_t ConvertMsToAbsSendTime(int64_t time_ms) {
return (((time_ms << 18) + 500) / 1000) & 0x00ffffff;
@ -1711,6 +1712,40 @@ TEST_P(RtpSenderTest, SendAudioPadding) {
rtp_sender_->TimeToSendPadding(kMinPaddingSize - 5, PacedPacketInfo()));
}
TEST_P(RtpSenderTest, SendsKeepAlive) {
MockTransport transport;
rtp_sender_.reset(new RTPSender(false, &fake_clock_, &transport, nullptr,
nullptr, nullptr, nullptr, nullptr, nullptr,
nullptr, &mock_rtc_event_log_, nullptr,
&retransmission_rate_limiter_, nullptr));
rtp_sender_->SetSendPayloadType(kPayload);
rtp_sender_->SetSequenceNumber(kSeqNum);
rtp_sender_->SetTimestampOffset(0);
rtp_sender_->SetSSRC(kSsrc);
const uint8_t kKeepalivePayloadType = 20;
RTC_CHECK_NE(kKeepalivePayloadType, kPayload);
EXPECT_CALL(transport, SendRtp(_, _, _))
.WillOnce(
Invoke([&kKeepalivePayloadType](const uint8_t* packet, size_t len,
const PacketOptions& options) {
webrtc::RTPHeader rtp_header;
RtpUtility::RtpHeaderParser parser(packet, len);
EXPECT_TRUE(parser.Parse(&rtp_header, nullptr));
EXPECT_FALSE(rtp_header.markerBit);
EXPECT_EQ(0U, rtp_header.paddingLength);
EXPECT_EQ(kKeepalivePayloadType, rtp_header.payloadType);
EXPECT_EQ(kSeqNum, rtp_header.sequenceNumber);
EXPECT_EQ(kSsrc, rtp_header.ssrc);
EXPECT_EQ(0u, len - rtp_header.headerLength);
return true;
}));
rtp_sender_->SendKeepAlive(kKeepalivePayloadType);
EXPECT_EQ(kSeqNum + 1, rtp_sender_->SequenceNumber());
}
INSTANTIATE_TEST_CASE_P(WithAndWithoutOverhead,
RtpSenderTest,
::testing::Bool());

View File

@ -302,8 +302,8 @@ void RtpReplay() {
std::unique_ptr<RtpHeaderParser> parser(RtpHeaderParser::Create());
parser->Parse(packet.data, packet.length, &header);
fprintf(stderr, "Packet len=%zu pt=%u seq=%u ts=%u ssrc=0x%8x\n",
packet.length, header.payloadType, header.sequenceNumber,
header.timestamp, header.ssrc);
packet.length, header.payloadType, header.sequenceNumber,
header.timestamp, header.ssrc);
break;
}
}

View File

@ -58,7 +58,8 @@ std::vector<RtpRtcp*> CreateRtpRtcpModules(
RtcEventLog* event_log,
RateLimiter* retransmission_rate_limiter,
OverheadObserver* overhead_observer,
size_t num_modules) {
size_t num_modules,
RtpKeepAliveConfig keepalive_config) {
RTC_DCHECK_GT(num_modules, 0);
RtpRtcp::Configuration configuration;
ReceiveStatistics* null_receive_statistics = configuration.receive_statistics;
@ -83,6 +84,7 @@ std::vector<RtpRtcp*> CreateRtpRtcpModules(
configuration.event_log = event_log;
configuration.retransmission_rate_limiter = retransmission_rate_limiter;
configuration.overhead_observer = overhead_observer;
configuration.keepalive_config = keepalive_config;
std::vector<RtpRtcp*> modules;
for (size_t i = 0; i < num_modules; ++i) {
RtpRtcp* rtp_rtcp = RtpRtcp::CreateRtpRtcp(configuration);
@ -802,7 +804,8 @@ VideoSendStreamImpl::VideoSendStreamImpl(
event_log,
transport->send_side_cc()->GetRetransmissionRateLimiter(),
this,
config_->rtp.ssrcs.size())),
config_->rtp.ssrcs.size(),
config_->rtp.keep_alive)),
payload_router_(rtp_rtcp_modules_,
config_->encoder_settings.payload_type),
weak_ptr_factory_(this),

View File

@ -32,13 +32,14 @@
#include "webrtc/test/call_test.h"
#include "webrtc/test/configurable_frame_size_encoder.h"
#include "webrtc/test/fake_texture_frame.h"
#include "webrtc/test/field_trial.h"
#include "webrtc/test/frame_generator.h"
#include "webrtc/test/frame_generator_capturer.h"
#include "webrtc/test/frame_utils.h"
#include "webrtc/test/gtest.h"
#include "webrtc/test/null_transport.h"
#include "webrtc/test/rtcp_packet_parser.h"
#include "webrtc/test/testsupport/perf_test.h"
#include "webrtc/test/field_trial.h"
#include "webrtc/video/send_statistics_proxy.h"
#include "webrtc/video/transport_adapter.h"
@ -3407,4 +3408,51 @@ TEST_F(VideoSendStreamTest, RemoveOverheadFromBandwidth) {
RunBaseTest(&test);
}
TEST_F(VideoSendStreamTest, SendsKeepAlive) {
const int kTimeoutMs = 50; // Really short timeout for testing.
const int kPayloadType = 20;
class KeepaliveObserver : public test::SendTest {
public:
KeepaliveObserver() : SendTest(kDefaultTimeoutMs) {}
private:
Action OnSendRtp(const uint8_t* packet, size_t length) override {
RTPHeader header;
EXPECT_TRUE(parser_->Parse(packet, length, &header));
if (header.payloadType != kPayloadType) {
// The video stream has started. Stop it now.
if (capturer_)
capturer_->Stop();
} else {
observation_complete_.Set();
}
return SEND_PACKET;
}
void ModifyVideoConfigs(
VideoSendStream::Config* send_config,
std::vector<VideoReceiveStream::Config>* receive_configs,
VideoEncoderConfig* encoder_config) override {
send_config->rtp.keep_alive.timeout_interval_ms = kTimeoutMs;
send_config->rtp.keep_alive.payload_type = kPayloadType;
}
void PerformTest() override {
EXPECT_TRUE(Wait()) << "Timed out while waiting for keep-alive packet.";
}
void OnFrameGeneratorCapturerCreated(
test::FrameGeneratorCapturer* frame_generator_capturer) override {
capturer_ = frame_generator_capturer;
}
test::FrameGeneratorCapturer* capturer_ = nullptr;
} test;
RunBaseTest(&test);
}
} // namespace webrtc

View File

@ -168,6 +168,8 @@ class VideoSendStream {
int payload_type = -1;
} rtx;
RtpKeepAliveConfig keep_alive;
// RTCP CNAME, see RFC 3550.
std::string c_name;
} rtp;