Reland "Lets PacingController call PacketRouter directly."

This reverts commit 980cadd02c7384397a41c0e334e9f329f3cc5c65.

Reason for revert: Problematic code now fix.

Original change's description:
> Revert "Lets PacingController call PacketRouter directly."
> 
> This reverts commit 848ea9f0d3678118cb8926a2898454e5a4df58ae.
> 
> Reason for revert: Part of changes that may cause deadlock
> 
> Original change's description:
> > Lets PacingController call PacketRouter directly.
> > 
> > Since locking model has been cleaned up, PacingController can now call
> > PacketRouter directly - without having to go via PacedSender or
> > TaskQueuePacedSender.
> > 
> > Bug: webrtc:10809
> > Change-Id: I181f04167d677c35395286f8b246aefb4c3e7ec7
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175909
> > Reviewed-by: Sebastian Jansson <srte@webrtc.org>
> > Commit-Queue: Erik Språng <sprang@webrtc.org>
> > Cr-Commit-Position: refs/heads/master@{#31342}
> 
> TBR=sprang@webrtc.org,srte@webrtc.org
> 
> # Not skipping CQ checks because original CL landed > 1 day ago.
> 
> Bug: webrtc:10809
> Change-Id: I1d7d5217a03a51555b130ec5c2dd6a992b6e489e
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178021
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Reviewed-by: Sebastian Jansson <srte@webrtc.org>
> Commit-Queue: Erik Språng <sprang@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#31563}

TBR=sprang@webrtc.org,srte@webrtc.org

# Not skipping CQ checks because original CL landed > 1 day ago.

Bug: webrtc:10809
Change-Id: I8bea1a5b1b1f618b697e4b09d83c9aac08099593
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178389
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31600}
This commit is contained in:
Erik Språng 2020-06-30 11:53:37 +00:00 committed by Commit Bot
parent f14d2a500d
commit ed1fb19be2
12 changed files with 54 additions and 96 deletions

View File

@ -41,12 +41,11 @@ PacedSender::PacedSender(Clock* clock,
? PacingController::ProcessMode::kDynamic
: PacingController::ProcessMode::kPeriodic),
pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
packet_router,
event_log,
field_trials,
process_mode_),
clock_(clock),
packet_router_(packet_router),
process_thread_(process_thread) {
if (process_thread_)
process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
@ -205,13 +204,4 @@ void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
MaybeWakupProcessThread();
}
void PacedSender::SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
packet_router_->SendPacket(std::move(packet), cluster_info);
}
std::vector<std::unique_ptr<RtpPacketToSend>> PacedSender::GeneratePadding(
DataSize size) {
return packet_router_->GeneratePadding(size.bytes());
}
} // namespace webrtc

View File

@ -43,8 +43,7 @@ class RtcEventLog;
// updating dependencies.
class PacedSender : public Module,
public RtpPacketPacer,
public RtpPacketSender,
private PacingController::PacketSender {
public RtpPacketSender {
public:
// Expected max pacer delay in ms. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
@ -140,14 +139,6 @@ class PacedSender : public Module,
// In dynamic process mode, refreshes the next process time.
void MaybeWakupProcessThread();
// Methods implementing PacedSenderController:PacketSender.
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
// Private implementation of Module to not expose those implementation details
// publicly and control when the class is registered/deregistered.
class ModuleProxy : public Module {
@ -171,7 +162,6 @@ class PacedSender : public Module,
PacingController pacing_controller_ RTC_GUARDED_BY(critsect_);
Clock* const clock_;
PacketRouter* const packet_router_;
ProcessThread* const process_thread_;
};
} // namespace webrtc

View File

@ -46,7 +46,7 @@ class MockCallback : public PacketRouter {
(override));
MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
GeneratePadding,
(size_t target_size_bytes),
(DataSize target_size),
(override));
};

View File

@ -440,7 +440,7 @@ void PacingController::ProcessPackets() {
for (auto& packet : keepalive_packets) {
keepalive_data_sent +=
DataSize::Bytes(packet->payload_size() + packet->padding_size());
packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo());
packet_sender_->SendPacket(std::move(packet), PacedPacketInfo());
}
OnPaddingSent(keepalive_data_sent);
}
@ -559,7 +559,7 @@ void PacingController::ProcessPackets() {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
data_sent += packet_size;

View File

@ -55,7 +55,7 @@ class PacingController {
class PacketSender {
public:
virtual ~PacketSender() = default;
virtual void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
virtual void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) = 0;
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) = 0;

View File

@ -69,7 +69,7 @@ std::unique_ptr<RtpPacketToSend> BuildPacket(RtpPacketMediaType type,
// methods that focus on core aspects.
class MockPacingControllerCallback : public PacingController::PacketSender {
public:
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override {
SendPacket(packet->Ssrc(), packet->SequenceNumber(),
packet->capture_time_ms(),
@ -104,7 +104,7 @@ class MockPacingControllerCallback : public PacingController::PacketSender {
class MockPacketSender : public PacingController::PacketSender {
public:
MOCK_METHOD(void,
SendRtpPacket,
SendPacket,
(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info),
(override));
@ -120,7 +120,7 @@ class PacingControllerPadding : public PacingController::PacketSender {
PacingControllerPadding() : padding_sent_(0), total_bytes_sent_(0) {}
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) override {
total_bytes_sent_ += packet->payload_size();
}
@ -151,7 +151,7 @@ class PacingControllerProbing : public PacingController::PacketSender {
public:
PacingControllerProbing() : packets_sent_(0), padding_sent_(0) {}
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) override {
if (packet->packet_type() != RtpPacketMediaType::kPadding) {
++packets_sent_;
@ -1575,7 +1575,7 @@ TEST_P(PacingControllerTest, ProbeClusterId) {
// First probing cluster.
EXPECT_CALL(callback,
SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 0)))
SendPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 0)))
.Times(5);
for (int i = 0; i < 5; ++i) {
@ -1584,7 +1584,7 @@ TEST_P(PacingControllerTest, ProbeClusterId) {
// Second probing cluster.
EXPECT_CALL(callback,
SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 1)))
SendPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 1)))
.Times(5);
for (int i = 0; i < 5; ++i) {
@ -1602,7 +1602,7 @@ TEST_P(PacingControllerTest, ProbeClusterId) {
return padding_packets;
});
bool non_probe_packet_seen = false;
EXPECT_CALL(callback, SendRtpPacket)
EXPECT_CALL(callback, SendPacket)
.WillOnce([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
EXPECT_EQ(cluster_info.probe_cluster_id, kNotAProbe);
@ -1632,23 +1632,23 @@ TEST_P(PacingControllerTest, OwnedPacketPrioritizedOnType) {
::testing::InSequence seq;
EXPECT_CALL(
callback,
SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _));
EXPECT_CALL(callback,
SendRtpPacket(
Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _));
EXPECT_CALL(
callback,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
// FEC and video actually have the same priority, so will come out in
// insertion order.
EXPECT_CALL(callback,
SendRtpPacket(
Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _));
EXPECT_CALL(
callback,
SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _));
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _));
EXPECT_CALL(
callback,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _));
EXPECT_CALL(callback,
SendRtpPacket(
Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
EXPECT_CALL(
callback,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
while (pacer_->QueueSizePackets() > 0) {
if (PeriodicProcess()) {
@ -1683,7 +1683,7 @@ TEST_P(PacingControllerTest, SmallFirstProbePacket) {
size_t packets_sent = 0;
bool media_seen = false;
EXPECT_CALL(callback, SendRtpPacket)
EXPECT_CALL(callback, SendPacket)
.Times(::testing::AnyNumber())
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
@ -1821,7 +1821,7 @@ TEST_P(PacingControllerTest,
for (bool account_for_audio : {false, true}) {
uint16_t sequence_number = 1234;
MockPacketSender callback;
EXPECT_CALL(callback, SendRtpPacket).Times(::testing::AnyNumber());
EXPECT_CALL(callback, SendPacket).Times(::testing::AnyNumber());
pacer_ = std::make_unique<PacingController>(&clock_, &callback, nullptr,
nullptr, GetParam());
pacer_->SetAccountForAudioPackets(account_for_audio);

View File

@ -174,9 +174,9 @@ void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
}
std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
size_t target_size_bytes) {
DataSize size) {
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"PacketRouter::GeneratePadding", "bytes", target_size_bytes);
"PacketRouter::GeneratePadding", "bytes", size.bytes());
MutexLock lock(&modules_mutex_);
// First try on the last rtp module to have sent media. This increases the
@ -188,7 +188,7 @@ std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
if (last_send_module_ != nullptr &&
last_send_module_->SupportsRtxPayloadPadding()) {
padding_packets = last_send_module_->GeneratePadding(target_size_bytes);
padding_packets = last_send_module_->GeneratePadding(size.bytes());
}
if (padding_packets.empty()) {
@ -197,7 +197,7 @@ std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
// be taken into account by the bandwidth estimator, e.g. in FF.
for (RtpRtcpInterface* rtp_module : send_modules_list_) {
if (rtp_module->SupportsPadding()) {
padding_packets = rtp_module->GeneratePadding(target_size_bytes);
padding_packets = rtp_module->GeneratePadding(size.bytes());
if (!padding_packets.empty()) {
last_send_module_ = rtp_module;
break;

View File

@ -21,6 +21,7 @@
#include <vector>
#include "api/transport/network_types.h"
#include "modules/pacing/pacing_controller.h"
#include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtcp_packet.h"
@ -40,7 +41,8 @@ class RtpRtcpInterface;
// (receiver report). For the latter case, we also keep track of the
// receive modules.
class PacketRouter : public RemoteBitrateObserver,
public TransportFeedbackSenderInterface {
public TransportFeedbackSenderInterface,
public PacingController::PacketSender {
public:
PacketRouter();
explicit PacketRouter(uint16_t start_transport_seq);
@ -53,11 +55,10 @@ class PacketRouter : public RemoteBitrateObserver,
bool remb_candidate);
void RemoveReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender);
virtual void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info);
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
size_t target_size_bytes);
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override;
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override;
uint16_t CurrentTransportSequenceNumber() const;

View File

@ -68,7 +68,7 @@ class PacketRouterTest : public ::testing::Test {
};
TEST_F(PacketRouterTest, Sanity_NoModuleRegistered_GeneratePadding) {
constexpr size_t bytes = 300;
constexpr DataSize bytes = DataSize::Bytes(300);
const PacedPacketInfo paced_info(1, kProbeMinProbes, kProbeMinBytes);
EXPECT_TRUE(packet_router_.GeneratePadding(bytes).empty());
@ -122,7 +122,8 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesRtx) {
return std::vector<std::unique_ptr<RtpPacketToSend>>(
kExpectedPaddingPackets);
});
auto generated_padding = packet_router_.GeneratePadding(kPaddingSize);
auto generated_padding =
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize));
EXPECT_EQ(generated_padding.size(), kExpectedPaddingPackets);
packet_router_.RemoveSendRtpModule(&rtp_1);
@ -159,7 +160,7 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesVideo) {
packet_router_.AddSendRtpModule(&audio_module, false);
EXPECT_CALL(audio_module, GeneratePadding(kPaddingSize))
.WillOnce(generate_padding);
packet_router_.GeneratePadding(kPaddingSize);
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize));
// Add the video module, this should now be prioritized since we cannot
// guarantee that audio packets will be included in the BWE.
@ -167,7 +168,7 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesVideo) {
EXPECT_CALL(audio_module, GeneratePadding).Times(0);
EXPECT_CALL(video_module, GeneratePadding(kPaddingSize))
.WillOnce(generate_padding);
packet_router_.GeneratePadding(kPaddingSize);
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize));
// Remove and the add audio module again. Module order shouldn't matter;
// video should still be prioritized.
@ -176,14 +177,14 @@ TEST_F(PacketRouterTest, GeneratePaddingPrioritizesVideo) {
EXPECT_CALL(audio_module, GeneratePadding).Times(0);
EXPECT_CALL(video_module, GeneratePadding(kPaddingSize))
.WillOnce(generate_padding);
packet_router_.GeneratePadding(kPaddingSize);
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize));
// Remove and the video module, we should fall back to padding on the
// audio module again.
packet_router_.RemoveSendRtpModule(&video_module);
EXPECT_CALL(audio_module, GeneratePadding(kPaddingSize))
.WillOnce(generate_padding);
packet_router_.GeneratePadding(kPaddingSize);
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingSize));
packet_router_.RemoveSendRtpModule(&audio_module);
}
@ -243,7 +244,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) {
packets.push_back(BuildRtpPacket(kSsrc2));
return packets;
});
packet_router_.GeneratePadding(kPaddingBytes);
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingBytes));
// Send media on first module. Padding should be sent on that module.
packet_router_.SendPacket(BuildRtpPacket(kSsrc1), PacedPacketInfo());
@ -255,7 +256,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) {
packets.push_back(BuildRtpPacket(kSsrc1));
return packets;
});
packet_router_.GeneratePadding(kPaddingBytes);
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingBytes));
// Send media on second module. Padding should be sent there.
packet_router_.SendPacket(BuildRtpPacket(kSsrc2), PacedPacketInfo());
@ -285,7 +286,7 @@ TEST_F(PacketRouterTest, PadsOnLastActiveMediaStream) {
for (int i = 0; i < 2; ++i) {
last_send_module = nullptr;
packet_router_.GeneratePadding(kPaddingBytes);
packet_router_.GeneratePadding(DataSize::Bytes(kPaddingBytes));
EXPECT_NE(last_send_module, nullptr);
packet_router_.RemoveSendRtpModule(last_send_module);
}

View File

@ -39,9 +39,8 @@ TaskQueuePacedSender::TaskQueuePacedSender(
TimeDelta hold_back_window)
: clock_(clock),
hold_back_window_(hold_back_window),
packet_router_(packet_router),
pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
packet_router,
event_log,
field_trials,
PacingController::ProcessMode::kDynamic),
@ -238,17 +237,6 @@ void TaskQueuePacedSender::MaybeProcessPackets(
MaybeUpdateStats(false);
}
std::vector<std::unique_ptr<RtpPacketToSend>>
TaskQueuePacedSender::GeneratePadding(DataSize size) {
return packet_router_->GeneratePadding(size.bytes());
}
void TaskQueuePacedSender::SendRtpPacket(
std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
packet_router_->SendPacket(std::move(packet), cluster_info);
}
void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
if (is_shutdown_) {
if (is_scheduled_call) {

View File

@ -39,9 +39,7 @@ namespace webrtc {
class Clock;
class RtcEventLog;
class TaskQueuePacedSender : public RtpPacketPacer,
public RtpPacketSender,
private PacingController::PacketSender {
class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
public:
// The |hold_back_window| parameter sets a lower bound on time to sleep if
// there is currently a pacer queue and packets can't immediately be
@ -129,21 +127,11 @@ class TaskQueuePacedSender : public RtpPacketPacer,
// method again with desired (finite) scheduled process time.
void MaybeProcessPackets(Timestamp scheduled_process_time);
// Methods implementing PacedSenderController:PacketSender.
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override
RTC_RUN_ON(task_queue_);
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override RTC_RUN_ON(task_queue_);
void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_);
Stats GetStats() const;
Clock* const clock_;
const TimeDelta hold_back_window_;
PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_);
PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
// We want only one (valid) delayed process task in flight at a time.

View File

@ -45,7 +45,7 @@ class MockPacketRouter : public PacketRouter {
(override));
MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
GeneratePadding,
(size_t target_size_bytes),
(DataSize target_size),
(override));
};