New method for precise packet reception time measurement.

Bug: webrtc:9054
Change-Id: I43a32122e9af992b5e0ba8b187c9ad4f22aba80d
Reviewed-on: https://webrtc-review.googlesource.com/c/104503
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Christoffer Rodbro <crodbro@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25167}
This commit is contained in:
Christoffer Rodbro 2018-10-12 11:15:09 +02:00 committed by Commit Bot
parent 2c7149bb23
commit 76ad154eef
7 changed files with 341 additions and 251 deletions

View File

@ -217,6 +217,7 @@ rtc_static_library("call") {
"../rtc_base:rtc_task_queue",
"../rtc_base:safe_minmax",
"../rtc_base:sequenced_task_checker",
"../rtc_base/experiments:field_trial_parser",
"../rtc_base/synchronization:rw_lock_wrapper",
"../system_wrappers",
"../system_wrappers:field_trial",

View File

@ -56,6 +56,7 @@
#include "rtc_base/synchronization/rw_lock_wrapper.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/timeutils.h"
#include "rtc_base/trace_event.h"
#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/cpu_info.h"
@ -1215,8 +1216,10 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
if (packet_time_us != -1) {
if (receive_time_calculator_) {
int64_t system_time_us =
rtc::SystemTimeNanos() / rtc::kNumNanosecsPerMicrosec;
packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
packet_time_us, clock_->TimeInMicroseconds());
packet_time_us, system_time_us, clock_->TimeInMicroseconds());
}
parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000);
} else {

View File

@ -9,59 +9,112 @@
*/
#include "call/receive_time_calculator.h"
#include <string>
#include "absl/memory/memory.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_minmax.h"
#include "system_wrappers/include/field_trial.h"
namespace webrtc {
namespace {
using ::webrtc::field_trial::FindFullName;
using ::webrtc::field_trial::IsEnabled;
const char kBweReceiveTimeCorrection[] = "WebRTC-BweReceiveTimeCorrection";
const char kBweReceiveTimeCorrection[] = "WebRTC-Bwe-ReceiveTimeFix";
} // namespace
ReceiveTimeCalculator::ReceiveTimeCalculator(int64_t min_delta_ms,
int64_t max_delta_diff_ms)
: min_delta_us_(min_delta_ms * 1000),
max_delta_diff_us_(max_delta_diff_ms * 1000) {}
ReceiveTimeCalculatorConfig::ReceiveTimeCalculatorConfig()
: max_packet_time_repair("maxrep", TimeDelta::ms(2000)),
stall_threshold("stall", TimeDelta::ms(5)),
tolerance("tol", TimeDelta::ms(1)),
max_stall("maxstall", TimeDelta::seconds(5)) {
std::string trial_string =
field_trial::FindFullName(kBweReceiveTimeCorrection);
ParseFieldTrial(
{&max_packet_time_repair, &stall_threshold, &tolerance, &max_stall},
trial_string);
}
ReceiveTimeCalculatorConfig::ReceiveTimeCalculatorConfig(
const ReceiveTimeCalculatorConfig&) = default;
ReceiveTimeCalculatorConfig::~ReceiveTimeCalculatorConfig() = default;
ReceiveTimeCalculator::ReceiveTimeCalculator()
: config_(ReceiveTimeCalculatorConfig()) {}
std::unique_ptr<ReceiveTimeCalculator>
ReceiveTimeCalculator::CreateFromFieldTrial() {
if (!IsEnabled(kBweReceiveTimeCorrection))
return nullptr;
int min, max;
if (sscanf(FindFullName(kBweReceiveTimeCorrection).c_str(), "Enabled,%d,%d",
&min, &max) != 2) {
RTC_LOG(LS_WARNING) << "Invalid number of parameters provided.";
return nullptr;
}
return absl::make_unique<ReceiveTimeCalculator>(min, max);
return absl::make_unique<ReceiveTimeCalculator>();
}
int64_t ReceiveTimeCalculator::ReconcileReceiveTimes(int64_t packet_time_us_,
int64_t safe_time_us_) {
if (!receive_time_offset_us_) {
receive_time_offset_us_ = safe_time_us_ - packet_time_us_;
} else {
int64_t safe_delta_us = safe_time_us_ - last_safe_time_us_;
int64_t packet_delta_us_ = packet_time_us_ - last_packet_time_us_;
int64_t delta_diff = packet_delta_us_ - safe_delta_us;
// Packet time should not decrease significantly, a large decrease indicates
// a reset of the packet time clock and we should reset the offest
// parameter. The safe reference time can increase in large jumps if the
// thread measuring it is backgrounded for longer periods. But if the packet
// time increases significantly more than the safe time, it indicates a
// clock reset and we should reset the offset.
int64_t ReceiveTimeCalculator::ReconcileReceiveTimes(int64_t packet_time_us,
int64_t system_time_us,
int64_t safe_time_us) {
int64_t stall_time_us = system_time_us - packet_time_us;
if (total_system_time_passed_us_ < config_.stall_threshold->us()) {
stall_time_us = rtc::SafeMin(stall_time_us, config_.max_stall->us());
}
int64_t corrected_time_us = safe_time_us - stall_time_us;
if (packet_delta_us_ < min_delta_us_ || delta_diff > max_delta_diff_us_) {
RTC_LOG(LS_WARNING) << "Received a clock jump of " << delta_diff
<< " resetting offset";
receive_time_offset_us_ = safe_time_us_ - packet_time_us_;
if (last_packet_time_us_ == -1 && stall_time_us < 0) {
static_clock_offset_us_ = stall_time_us;
corrected_time_us += static_clock_offset_us_;
} else if (last_packet_time_us_ > 0) {
// All repairs depend on variables being intialized
int64_t packet_time_delta_us = packet_time_us - last_packet_time_us_;
int64_t system_time_delta_us = system_time_us - last_system_time_us_;
int64_t safe_time_delta_us = safe_time_us - last_safe_time_us_;
// Repair backwards clock resets during initial stall. In this case, the
// reset is observed only in packet time but never in system time.
if (system_time_delta_us < 0)
total_system_time_passed_us_ += config_.stall_threshold->us();
else
total_system_time_passed_us_ += system_time_delta_us;
if (packet_time_delta_us < 0 &&
total_system_time_passed_us_ < config_.stall_threshold->us()) {
static_clock_offset_us_ -= packet_time_delta_us;
}
corrected_time_us += static_clock_offset_us_;
// Detect resets inbetween clock readings in socket and app.
bool forward_clock_reset =
corrected_time_us + config_.tolerance->us() < last_corrected_time_us_;
bool obvious_backward_clock_reset = system_time_us < packet_time_us;
// Harder case with backward clock reset during stall, the reset being
// smaller than the stall. Compensate throughout the stall.
bool small_backward_clock_reset =
!obvious_backward_clock_reset &&
safe_time_delta_us > system_time_delta_us + config_.tolerance->us();
bool stall_start =
packet_time_delta_us >= 0 &&
system_time_delta_us > packet_time_delta_us + config_.tolerance->us();
bool stall_is_over = safe_time_delta_us > config_.stall_threshold->us();
bool packet_time_caught_up =
packet_time_delta_us < 0 && system_time_delta_us >= 0;
if (stall_start && small_backward_clock_reset)
small_reset_during_stall_ = true;
else if (stall_is_over || packet_time_caught_up)
small_reset_during_stall_ = false;
// If resets are detected, advance time by (capped) packet time increase.
if (forward_clock_reset || obvious_backward_clock_reset ||
small_reset_during_stall_) {
corrected_time_us = last_corrected_time_us_ +
rtc::SafeClamp(packet_time_delta_us, 0,
config_.max_packet_time_repair->us());
}
}
last_packet_time_us_ = packet_time_us_;
last_safe_time_us_ = safe_time_us_;
return packet_time_us_ + *receive_time_offset_us_;
last_corrected_time_us_ = corrected_time_us;
last_packet_time_us_ = packet_time_us;
last_system_time_us_ = system_time_us;
last_safe_time_us_ = safe_time_us;
return corrected_time_us;
}
} // namespace webrtc

View File

@ -10,13 +10,24 @@
#ifndef CALL_RECEIVE_TIME_CALCULATOR_H_
#define CALL_RECEIVE_TIME_CALCULATOR_H_
#include <stdint.h>
#include <memory>
#include "absl/types/optional.h"
#include "rtc_base/experiments/field_trial_units.h"
namespace webrtc {
struct ReceiveTimeCalculatorConfig {
ReceiveTimeCalculatorConfig();
ReceiveTimeCalculatorConfig(const ReceiveTimeCalculatorConfig&);
ReceiveTimeCalculatorConfig& operator=(const ReceiveTimeCalculatorConfig&) =
default;
~ReceiveTimeCalculatorConfig();
FieldTrialParameter<TimeDelta> max_packet_time_repair;
FieldTrialParameter<TimeDelta> stall_threshold;
FieldTrialParameter<TimeDelta> tolerance;
FieldTrialParameter<TimeDelta> max_stall;
};
// The receive time calculator serves the purpose of combining packet time
// stamps with a safely incremental clock. This assumes that the packet time
// stamps are based on lower layer timestamps that have more accurate time
@ -28,20 +39,20 @@ namespace webrtc {
class ReceiveTimeCalculator {
public:
static std::unique_ptr<ReceiveTimeCalculator> CreateFromFieldTrial();
// The min delta is used to correct for jumps backwards in time, to allow some
// packet reordering a small negative value is appropriate to use. The max
// delta difference is used as margin when detecting when packet time
// increases more than the safe clock. This should be larger than the largest
// expected sysmtem induced delay in the safe clock timestamp.
ReceiveTimeCalculator(int64_t min_delta_ms, int64_t max_delta_diff_ms);
int64_t ReconcileReceiveTimes(int64_t packet_time_us_, int64_t safe_time_us_);
ReceiveTimeCalculator();
int64_t ReconcileReceiveTimes(int64_t packet_time_us_,
int64_t system_time_us_,
int64_t safe_time_us_);
private:
const int64_t min_delta_us_;
const int64_t max_delta_diff_us_;
absl::optional<int64_t> receive_time_offset_us_;
int64_t last_packet_time_us_ = 0;
int64_t last_safe_time_us_ = 0;
int64_t last_corrected_time_us_ = -1;
int64_t last_packet_time_us_ = -1;
int64_t last_system_time_us_ = -1;
int64_t last_safe_time_us_ = -1;
int64_t total_system_time_passed_us_ = 0;
int64_t static_clock_offset_us_ = 0;
int64_t small_reset_during_stall_ = false;
ReceiveTimeCalculatorConfig config_;
};
} // namespace webrtc
#endif // CALL_RECEIVE_TIME_CALCULATOR_H_

View File

@ -10,65 +10,237 @@
#include "call/receive_time_calculator.h"
#include <algorithm>
#include <iostream>
#include <tuple>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/types/optional.h"
#include "rtc_base/random.h"
#include "rtc_base/timeutils.h"
#include "test/gtest.h"
namespace webrtc {
namespace test {
namespace {
int64_t ReconcileMs(ReceiveTimeCalculator* calc,
int64_t packet_time_ms,
int64_t safe_time_ms) {
return calc->ReconcileReceiveTimes(packet_time_ms * 1000,
safe_time_ms * 1000) /
1000;
class EmulatedClock {
public:
explicit EmulatedClock(int seed, float drift = 0.0f)
: random_(seed), clock_us_(random_.Rand<uint32_t>()), drift_(drift) {}
virtual ~EmulatedClock() = default;
int64_t GetClockUs() const { return clock_us_; }
protected:
int64_t UpdateClock(int64_t time_us) {
if (!last_query_us_)
last_query_us_ = time_us;
int64_t skip_us = time_us - *last_query_us_;
accumulated_drift_us_ += skip_us * drift_;
int64_t drift_correction_us = static_cast<int64_t>(accumulated_drift_us_);
accumulated_drift_us_ -= drift_correction_us;
clock_us_ += skip_us + drift_correction_us;
last_query_us_ = time_us;
return skip_us;
}
Random random_;
private:
int64_t clock_us_;
absl::optional<int64_t> last_query_us_;
float drift_;
float accumulated_drift_us_ = 0;
};
class EmulatedMonotoneousClock : public EmulatedClock {
public:
explicit EmulatedMonotoneousClock(int seed) : EmulatedClock(seed) {}
~EmulatedMonotoneousClock() = default;
int64_t Query(int64_t time_us) {
int64_t skip_us = UpdateClock(time_us);
// In a stall
if (stall_recovery_time_us_ > 0) {
if (GetClockUs() > stall_recovery_time_us_) {
stall_recovery_time_us_ = 0;
return GetClockUs();
} else {
return stall_recovery_time_us_;
}
}
// Check if we enter a stall
for (int k = 0; k < skip_us; ++k) {
if (random_.Rand<double>() < kChanceOfStallPerUs) {
int64_t stall_duration_us =
static_cast<int64_t>(random_.Rand<float>() * kMaxStallDurationUs);
stall_recovery_time_us_ = GetClockUs() + stall_duration_us;
return stall_recovery_time_us_;
}
}
return GetClockUs();
}
void ForceStallUs() {
int64_t stall_duration_us =
static_cast<int64_t>(random_.Rand<float>() * kMaxStallDurationUs);
stall_recovery_time_us_ = GetClockUs() + stall_duration_us;
}
bool Stalled() const { return stall_recovery_time_us_ > 0; }
int64_t GetRemainingStall(int64_t time_us) const {
return stall_recovery_time_us_ > 0 ? stall_recovery_time_us_ - GetClockUs()
: 0;
}
const int64_t kMaxStallDurationUs = rtc::kNumMicrosecsPerSec;
private:
const float kChanceOfStallPerUs = 5e-6f;
int64_t stall_recovery_time_us_ = 0;
};
class EmulatedNonMonotoneousClock : public EmulatedClock {
public:
EmulatedNonMonotoneousClock(int seed, int64_t duration_us, float drift = 0)
: EmulatedClock(seed, drift) {
Pregenerate(duration_us);
}
~EmulatedNonMonotoneousClock() = default;
void Pregenerate(int64_t duration_us) {
int64_t time_since_reset_us = kMinTimeBetweenResetsUs;
int64_t clock_offset_us = 0;
for (int64_t time_us = 0; time_us < duration_us; time_us += kResolutionUs) {
int64_t skip_us = UpdateClock(time_us);
time_since_reset_us += skip_us;
int64_t reset_us = 0;
if (time_since_reset_us >= kMinTimeBetweenResetsUs) {
for (int k = 0; k < skip_us; ++k) {
if (random_.Rand<double>() < kChanceOfResetPerUs) {
reset_us = static_cast<int64_t>(2 * random_.Rand<float>() *
kMaxAbsResetUs) -
kMaxAbsResetUs;
clock_offset_us += reset_us;
time_since_reset_us = 0;
break;
}
}
}
pregenerated_clock_.emplace_back(GetClockUs() + clock_offset_us);
resets_us_.emplace_back(reset_us);
}
}
int64_t Query(int64_t time_us) {
size_t ixStart =
(last_reset_query_time_us_ + (kResolutionUs >> 1)) / kResolutionUs + 1;
size_t ixEnd = (time_us + (kResolutionUs >> 1)) / kResolutionUs;
if (ixEnd >= pregenerated_clock_.size())
return -1;
last_reset_size_us_ = 0;
for (size_t ix = ixStart; ix <= ixEnd; ++ix) {
if (resets_us_[ix] != 0) {
last_reset_size_us_ = resets_us_[ix];
}
}
last_reset_query_time_us_ = time_us;
return pregenerated_clock_[ixEnd];
}
bool WasReset() const { return last_reset_size_us_ != 0; }
bool WasNegativeReset() const { return last_reset_size_us_ < 0; }
int64_t GetLastResetUs() const { return last_reset_size_us_; }
private:
const float kChanceOfResetPerUs = 1e-6f;
const int64_t kMaxAbsResetUs = rtc::kNumMicrosecsPerSec;
const int64_t kMinTimeBetweenResetsUs = 3 * rtc::kNumMicrosecsPerSec;
const int64_t kResolutionUs = rtc::kNumMicrosecsPerMillisec;
int64_t last_reset_query_time_us_ = 0;
int64_t last_reset_size_us_ = 0;
std::vector<int64_t> pregenerated_clock_;
std::vector<int64_t> resets_us_;
};
TEST(ClockRepair, NoClockDrift) {
const int kSeeds = 10;
const int kFirstSeed = 1;
const int64_t kRuntimeUs = 10 * rtc::kNumMicrosecsPerSec;
const float kDrift = 0.0f;
const int64_t kMaxPacketInterarrivalUs = 50 * rtc::kNumMicrosecsPerMillisec;
for (int seed = kFirstSeed; seed < kSeeds + kFirstSeed; ++seed) {
EmulatedMonotoneousClock monotone_clock(seed);
EmulatedNonMonotoneousClock non_monotone_clock(
seed + 1, kRuntimeUs + rtc::kNumMicrosecsPerSec, kDrift);
ReceiveTimeCalculator reception_time_tracker;
int64_t corrected_clock_0 = 0;
int64_t reset_during_stall_tol_us = 0;
bool initial_clock_stall = true;
int64_t accumulated_upper_bound_tolerance_us = 0;
int64_t accumulated_lower_bound_tolerance_us = 0;
Random random(1);
monotone_clock.ForceStallUs();
int64_t last_time_us = 0;
bool add_tolerance_on_next_packet = false;
int64_t monotone_noise_us = 1000;
for (int64_t time_us = 0; time_us < kRuntimeUs;
time_us += static_cast<int64_t>(random.Rand<float>() *
kMaxPacketInterarrivalUs)) {
int64_t socket_time_us = non_monotone_clock.Query(time_us);
int64_t monotone_us = monotone_clock.Query(time_us) +
2 * random.Rand<float>() * monotone_noise_us -
monotone_noise_us;
int64_t system_time_us = non_monotone_clock.Query(
time_us + monotone_clock.GetRemainingStall(time_us));
int64_t corrected_clock_us = reception_time_tracker.ReconcileReceiveTimes(
socket_time_us, system_time_us, monotone_us);
if (time_us == 0)
corrected_clock_0 = corrected_clock_us;
if (add_tolerance_on_next_packet)
accumulated_lower_bound_tolerance_us -= (time_us - last_time_us);
// Perfect repair cannot be achiveved if non-monotone clock resets during
// a monotone clock stall.
add_tolerance_on_next_packet = false;
if (monotone_clock.Stalled() && non_monotone_clock.WasReset()) {
reset_during_stall_tol_us =
std::max(reset_during_stall_tol_us, time_us - last_time_us);
if (non_monotone_clock.WasNegativeReset()) {
add_tolerance_on_next_packet = true;
}
if (initial_clock_stall && !non_monotone_clock.WasNegativeReset()) {
// Positive resets during an initial clock stall cannot be repaired
// and error will propagate through rest of trace.
accumulated_upper_bound_tolerance_us +=
std::abs(non_monotone_clock.GetLastResetUs());
}
} else {
reset_during_stall_tol_us = 0;
initial_clock_stall = false;
}
int64_t err = corrected_clock_us - corrected_clock_0 - time_us;
// Resets during stalls may lead to small errors temporarily.
int64_t lower_tol_us = accumulated_lower_bound_tolerance_us -
reset_during_stall_tol_us - monotone_noise_us -
2 * rtc::kNumMicrosecsPerMillisec;
EXPECT_GE(err, lower_tol_us);
int64_t upper_tol_us = accumulated_upper_bound_tolerance_us +
monotone_noise_us +
2 * rtc::kNumMicrosecsPerMillisec;
EXPECT_LE(err, upper_tol_us);
last_time_us = time_us;
}
}
}
} // namespace
TEST(ReceiveTimeCalculatorTest, UsesSmallerIncrements) {
int64_t kMinDeltaMs = -20;
int64_t kMaxDeltaDiffMs = 100;
ReceiveTimeCalculator calc(kMinDeltaMs, kMaxDeltaDiffMs);
// Initialize offset.
ReconcileMs(&calc, 10000, 20000);
EXPECT_EQ(ReconcileMs(&calc, 10010, 20100), 20010);
EXPECT_EQ(ReconcileMs(&calc, 10020, 20100), 20020);
EXPECT_EQ(ReconcileMs(&calc, 10030, 20100), 20030);
EXPECT_EQ(ReconcileMs(&calc, 10110, 20200), 20110);
EXPECT_EQ(ReconcileMs(&calc, 10120, 20200), 20120);
EXPECT_EQ(ReconcileMs(&calc, 10130, 20200), 20130);
// Small jumps backwards are let trough, they might be due to reordering.
EXPECT_EQ(ReconcileMs(&calc, 10120, 20200), 20120);
// The safe clock might be smaller than the packet clock.
EXPECT_EQ(ReconcileMs(&calc, 10210, 20200), 20210);
EXPECT_EQ(ReconcileMs(&calc, 10240, 20200), 20240);
}
TEST(ReceiveTimeCalculatorTest, CorrectsJumps) {
int64_t kMinDeltaMs = -20;
int64_t kMaxDeltaDiffMs = 100;
ReceiveTimeCalculator calc(kMinDeltaMs, kMaxDeltaDiffMs);
// Initialize offset.
ReconcileMs(&calc, 10000, 20000);
EXPECT_EQ(ReconcileMs(&calc, 10010, 20100), 20010);
EXPECT_EQ(ReconcileMs(&calc, 10020, 20100), 20020);
EXPECT_EQ(ReconcileMs(&calc, 10030, 20100), 20030);
// Jump forward in time.
EXPECT_EQ(ReconcileMs(&calc, 10240, 20200), 20200);
EXPECT_EQ(ReconcileMs(&calc, 10250, 20200), 20210);
EXPECT_EQ(ReconcileMs(&calc, 10260, 20200), 20220);
// Jump backward in time.
EXPECT_EQ(ReconcileMs(&calc, 10230, 20300), 20300);
EXPECT_EQ(ReconcileMs(&calc, 10240, 20300), 20310);
EXPECT_EQ(ReconcileMs(&calc, 10250, 20300), 20320);
}
} // namespace test
} // namespace webrtc

View File

@ -399,7 +399,6 @@ if (rtc_include_tests) {
"end_to_end_tests/multi_stream_tests.cc",
"end_to_end_tests/network_state_tests.cc",
"end_to_end_tests/probing_tests.cc",
"end_to_end_tests/receive_time_tests.cc",
"end_to_end_tests/retransmission_tests.cc",
"end_to_end_tests/rtp_rtcp_tests.cc",
"end_to_end_tests/ssrc_tests.cc",

View File

@ -1,149 +0,0 @@
/*
* Copyright 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "api/test/simulated_network.h"
#include "call/fake_network_pipe.h"
#include "call/simulated_network.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/timeutils.h"
#include "test/call_test.h"
#include "test/field_trial.h"
#include "test/rtcp_packet_parser.h"
namespace webrtc {
namespace {
// This tester simulates a series of clock reset events where different offsets
// are added to the receive time. It detects jumps in the resulting reported
// receive times of more than 200 ms.
class ReportedReceiveTimeTester : public test::EndToEndTest {
public:
struct TimeJump {
int64_t at_send_time_ms;
int64_t add_offset_ms;
static constexpr int64_t kStop = 0;
};
ReportedReceiveTimeTester()
: EndToEndTest(test::CallTest::kDefaultTimeoutMs) {
// These should be let trough without correction and filtered if correction
// is enabled.
jumps_.push({500, 2000});
jumps_.push({1000, -400});
jumps_.push({1500, 2000000});
jumps_.push({1700, TimeJump::kStop});
}
bool JumpInReportedTimes() { return jump_in_reported_times_; }
protected:
Action OnReceiveRtcp(const uint8_t* data, size_t length) override {
test::RtcpPacketParser parser;
EXPECT_TRUE(parser.Parse(data, length));
const auto& fb = parser.transport_feedback();
if (fb->num_packets() > 0) {
int64_t arrival_time_us = fb->GetBaseTimeUs();
for (const auto& pkt : fb->GetReceivedPackets()) {
arrival_time_us += pkt.delta_us();
if (last_arrival_time_us_ != 0) {
int64_t delta_us = arrival_time_us - last_arrival_time_us_;
rtc::CritScope crit(&send_times_crit_);
if (send_times_us_.size() >= 2) {
int64_t ground_truth_delta_us =
send_times_us_[1] - send_times_us_[0];
send_times_us_.pop_front();
int64_t delta_diff_ms = (delta_us - ground_truth_delta_us) / 1000;
if (std::abs(delta_diff_ms) > 200) {
jump_in_reported_times_ = true;
observation_complete_.Set();
}
}
}
last_arrival_time_us_ = arrival_time_us;
}
}
return SEND_PACKET;
}
Action OnSendRtp(const uint8_t* data, size_t length) override {
{
rtc::CritScope crit(&send_times_crit_);
send_times_us_.push_back(rtc::TimeMicros());
}
int64_t now_ms = rtc::TimeMillis();
if (!first_send_time_ms_)
first_send_time_ms_ = now_ms;
int64_t send_time_ms = now_ms - first_send_time_ms_;
if (send_time_ms >= jumps_.front().at_send_time_ms) {
if (jumps_.front().add_offset_ms == TimeJump::kStop) {
observation_complete_.Set();
jumps_.pop();
return SEND_PACKET;
}
clock_offset_ms_ += jumps_.front().add_offset_ms;
send_pipe_->SetClockOffset(clock_offset_ms_);
jumps_.pop();
}
return SEND_PACKET;
}
test::PacketTransport* CreateSendTransport(
test::SingleThreadedTaskQueueForTesting* task_queue,
Call* sender_call) override {
auto pipe = absl::make_unique<FakeNetworkPipe>(
Clock::GetRealTimeClock(),
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
send_pipe_ = pipe.get();
return send_transport_ = new test::PacketTransport(
task_queue, sender_call, this, test::PacketTransport::kSender,
test::CallTest::payload_type_map_, std::move(pipe));
}
void PerformTest() override {
observation_complete_.Wait(test::CallTest::kDefaultTimeoutMs);
}
size_t GetNumVideoStreams() const override { return 1; }
size_t GetNumAudioStreams() const override { return 0; }
private:
int64_t last_arrival_time_us_ = 0;
int64_t first_send_time_ms_ = 0;
rtc::CriticalSection send_times_crit_;
std::deque<int64_t> send_times_us_ RTC_GUARDED_BY(send_times_crit_);
bool jump_in_reported_times_ = false;
FakeNetworkPipe* send_pipe_;
test::PacketTransport* send_transport_;
int64_t clock_offset_ms_ = 0;
std::queue<TimeJump> jumps_;
};
} // namespace
class ReceiveTimeEndToEndTest : public test::CallTest {
public:
ReceiveTimeEndToEndTest() {}
virtual ~ReceiveTimeEndToEndTest() {}
};
TEST_F(ReceiveTimeEndToEndTest, ReceiveTimeJumpsWithoutFieldTrial) {
// Without the field trial, the jumps in clock offset should be let trough and
// be detected.
ReportedReceiveTimeTester test;
RunBaseTest(&test);
EXPECT_TRUE(test.JumpInReportedTimes());
}
TEST_F(ReceiveTimeEndToEndTest, ReceiveTimeSteadyWithFieldTrial) {
// Since all the added jumps by the tester are outside the interval of -100 ms
// to 1000 ms, they should all be filtered by the field trial below, and no
// jumps should be detected.
test::ScopedFieldTrials field_trial(
"WebRTC-BweReceiveTimeCorrection/Enabled,-100,1000/");
ReportedReceiveTimeTester test;
RunBaseTest(&test);
EXPECT_FALSE(test.JumpInReportedTimes());
}
} // namespace webrtc