Remove lock for process thread pointer from PacedSender.

Also adding code in preparation of hiding the Module
implementation in PacedSender. The implementation details of
how the PacedSender+ProcessThread interaction works, has now
been moved into PacedSender (and out of RtpTransportControllerSend).

Instead of adding a "GetModuleImplementationForTesting" method
to the PacedSender class (which would have been the lazy way
out), I incorporated MockedProcessThread in the PacedSender tests.
This means more boilerplate code but the Module functionality
can be tested separately from the PacedSender and down the line
I think it would be a good idea to start using a separate thread
in the test, which is how the class under test is really used
in production.

Bug: none
Change-Id: Iec1b7c97cb0b363b331143ca70545e6ebafe2cd4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/149176
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29011}
This commit is contained in:
Tommi 2019-08-29 18:32:31 +02:00 committed by Commit Bot
parent 25eb47ccf1
commit 55dd72c54b
6 changed files with 63 additions and 27 deletions

View File

@ -22,7 +22,6 @@
#include "call/rtp_video_sender.h"
#include "logging/rtc_event_log/events/rtc_event_route_change.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/rate_limiter.h"
#include "system_wrappers/include/field_trial.h"
@ -67,9 +66,9 @@ RtpTransportControllerSend::RtpTransportControllerSend(
TaskQueueFactory* task_queue_factory)
: clock_(clock),
event_log_(event_log),
pacer_(clock, &packet_router_, event_log),
bitrate_configurator_(bitrate_config),
process_thread_(std::move(process_thread)),
pacer_(clock, &packet_router_, event_log, nullptr, process_thread_.get()),
observer_(nullptr),
controller_factory_override_(controller_factory),
controller_factory_fallback_(
@ -96,13 +95,11 @@ RtpTransportControllerSend::RtpTransportControllerSend(
pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps),
DataRate::Zero());
process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
process_thread_->Start();
}
RtpTransportControllerSend::~RtpTransportControllerSend() {
process_thread_->Stop();
process_thread_->DeRegisterModule(&pacer_);
}
RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(

View File

@ -144,10 +144,10 @@ class RtpTransportControllerSend final
const FieldTrialBasedConfig trial_based_config_;
PacketRouter packet_router_;
std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_;
PacedSender pacer_;
RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_;
const std::unique_ptr<ProcessThread> process_thread_;
PacedSender pacer_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);

View File

@ -85,6 +85,7 @@ if (rtc_include_tests) {
":pacing",
"../../api/units:data_rate",
"../../api/units:time_delta",
"../../modules/utility:mock_process_thread",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_approved",
"../../rtc_base:rtc_base_tests_utils",

View File

@ -18,6 +18,7 @@
#include "api/rtc_event_log/rtc_event_log.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/clock.h"
@ -29,15 +30,22 @@ const float PacedSender::kDefaultPaceMultiplier = 2.5f;
PacedSender::PacedSender(Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials)
const WebRtcKeyValueConfig* field_trials,
ProcessThread* process_thread)
: pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
event_log,
field_trials),
packet_router_(packet_router),
process_thread_(nullptr) {}
process_thread_(process_thread) {
if (process_thread_)
process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
}
PacedSender::~PacedSender() = default;
PacedSender::~PacedSender() {
if (process_thread_)
process_thread_->DeRegisterModule(&module_proxy_);
}
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
rtc::CritScope cs(&critsect_);
@ -49,11 +57,11 @@ void PacedSender::Pause() {
rtc::CritScope cs(&critsect_);
pacing_controller_.Pause();
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new (longer) estimate for when to call Process().
if (process_thread_)
process_thread_->WakeUp(this);
process_thread_->WakeUp(&module_proxy_);
}
void PacedSender::Resume() {
@ -61,11 +69,11 @@ void PacedSender::Resume() {
rtc::CritScope cs(&critsect_);
pacing_controller_.Resume();
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to
// refresh the estimate for when to call Process().
if (process_thread_)
process_thread_->WakeUp(this);
process_thread_->WakeUp(&module_proxy_);
}
void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
@ -146,8 +154,7 @@ void PacedSender::Process() {
void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << process_thread;
rtc::CritScope cs(&process_thread_lock_);
process_thread_ = process_thread;
RTC_DCHECK(!process_thread || process_thread == process_thread_);
}
void PacedSender::SetQueueTimeLimit(TimeDelta limit) {

View File

@ -39,6 +39,8 @@ namespace webrtc {
class Clock;
class RtcEventLog;
// TODO(bugs.webrtc.org/10937): Remove the inheritance from Module after
// updating dependencies.
class PacedSender : public Module,
public RtpPacketPacer,
public RtpPacketSender,
@ -56,10 +58,13 @@ class PacedSender : public Module,
// overshoots from the encoder.
static const float kDefaultPaceMultiplier;
// TODO(bugs.webrtc.org/10937): Make the |process_thread| argument be non
// optional once all callers have been updated.
PacedSender(Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials = nullptr);
const WebRtcKeyValueConfig* field_trials = nullptr,
ProcessThread* process_thread = nullptr);
~PacedSender() override;
@ -109,15 +114,23 @@ class PacedSender : public Module,
// Below are methods specific to this implementation, such as things related
// to module processing thread specifics or methods exposed for test.
private:
// Methods implementing Module.
// TODO(bugs.webrtc.org/10937): Remove the inheritance from Module once all
// use of it has been cleared up.
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
int64_t TimeUntilNextProcess() override;
// TODO(bugs.webrtc.org/10937): Make this private (and non virtual) once
// dependencies have been updated to not call this via the PacedSender
// interface.
public:
// Process any pending packets in the queue(s).
void Process() override;
private:
// Called when the prober is associated with a process thread.
void ProcessThreadAttached(ProcessThread* process_thread) override;
@ -131,18 +144,29 @@ class PacedSender : public Module,
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
// Private implementation of Module to not expose those implementation details
// publicly and control when the class is registered/deregistered.
class ModuleProxy : public Module {
public:
explicit ModuleProxy(PacedSender* delegate) : delegate_(delegate) {}
private:
int64_t TimeUntilNextProcess() override {
return delegate_->TimeUntilNextProcess();
}
void Process() override { return delegate_->Process(); }
void ProcessThreadAttached(ProcessThread* process_thread) override {
return delegate_->ProcessThreadAttached(process_thread);
}
PacedSender* const delegate_;
} module_proxy_{this};
rtc::CriticalSection critsect_;
PacingController pacing_controller_ RTC_GUARDED_BY(critsect_);
PacketRouter* const packet_router_;
// Lock to avoid race when attaching process thread. This can happen due to
// the Call class setting network state on RtpTransportControllerSend, which
// in turn calls Pause/Resume on Pacedsender, before actually starting the
// pacer process thread. If RtpTransportControllerSend is running on a task
// queue separate from the thread used by Call, this causes a race.
rtc::CriticalSection process_thread_lock_;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_);
ProcessThread* const process_thread_;
};
} // namespace webrtc
#endif // MODULES_PACING_PACED_SENDER_H_

View File

@ -18,6 +18,7 @@
#include "absl/memory/memory.h"
#include "modules/pacing/packet_router.h"
#include "modules/utility/include/mock/mock_process_thread.h"
#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/field_trial.h"
#include "test/field_trial.h"
@ -26,6 +27,7 @@
using ::testing::_;
using ::testing::Return;
using ::testing::SaveArg;
namespace {
constexpr uint32_t kAudioSsrc = 12345;
@ -76,7 +78,12 @@ std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
TEST(PacedSenderTest, PacesPackets) {
SimulatedClock clock(0);
MockCallback callback;
PacedSender pacer(&clock, &callback, nullptr, nullptr);
MockProcessThread process_thread;
Module* paced_module = nullptr;
EXPECT_CALL(process_thread, RegisterModule(_, _))
.WillOnce(SaveArg<0>(&paced_module));
PacedSender pacer(&clock, &callback, nullptr, nullptr, &process_thread);
EXPECT_CALL(process_thread, DeRegisterModule(paced_module)).Times(1);
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
@ -88,7 +95,7 @@ TEST(PacedSenderTest, PacesPackets) {
// Expect all of them to be sent.
size_t packets_sent = 0;
clock.AdvanceTimeMilliseconds(pacer.TimeUntilNextProcess());
clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess());
EXPECT_CALL(callback, SendPacket)
.WillRepeatedly(
[&](std::unique_ptr<RtpPacketToSend> packet,
@ -97,8 +104,8 @@ TEST(PacedSenderTest, PacesPackets) {
const Timestamp start_time = clock.CurrentTime();
while (packets_sent < kPacketsToSend) {
clock.AdvanceTimeMilliseconds(pacer.TimeUntilNextProcess());
pacer.Process();
clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess());
paced_module->Process();
}
// Packets should be sent over a period of close to 1s. Expect a little lower