DegradedCall: fake network using TaskQueue instead of ProcessThread

Tested: Manually in Chrome webrtc calls.
Bug: webrtc:10851
Change-Id: I6bc6b7625101b39e4dd8b0efa5db213ab57980a8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/148077
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28776}
This commit is contained in:
Erik Språng 2019-08-06 15:54:23 +02:00 committed by Commit Bot
parent bb1f2454cd
commit c64881925f
3 changed files with 79 additions and 97 deletions

View File

@ -78,8 +78,8 @@ Call* CallFactory::CreateCall(const Call::Config& config) {
if (send_degradation_config || receive_degradation_config) {
return new DegradedCall(std::unique_ptr<Call>(Call::Create(config)),
send_degradation_config,
receive_degradation_config);
send_degradation_config, receive_degradation_config,
config.task_queue_factory);
}
return Call::Create(config);

View File

@ -17,64 +17,68 @@
namespace webrtc {
namespace {
constexpr int64_t kDoNothingProcessIntervalMs = 5000;
} // namespace
FakeNetworkPipeModule::~FakeNetworkPipeModule() = default;
FakeNetworkPipeModule::FakeNetworkPipeModule(
DegradedCall::FakeNetworkPipeOnTaskQueue::FakeNetworkPipeOnTaskQueue(
TaskQueueFactory* task_queue_factory,
Clock* clock,
std::unique_ptr<NetworkBehaviorInterface> network_behavior,
Transport* transport)
: pipe_(clock, std::move(network_behavior), transport) {}
: clock_(clock),
task_queue_(task_queue_factory->CreateTaskQueue(
"DegradedSendQueue",
TaskQueueFactory::Priority::NORMAL)),
pipe_(clock, std::move(network_behavior), transport) {}
void FakeNetworkPipeModule::SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options) {
void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtp(
const uint8_t* packet,
size_t length,
const PacketOptions& options) {
pipe_.SendRtp(packet, length, options);
MaybeResumeProcess();
Process();
}
void FakeNetworkPipeModule::SendRtcp(const uint8_t* packet, size_t length) {
void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtcp(const uint8_t* packet,
size_t length) {
pipe_.SendRtcp(packet, length);
MaybeResumeProcess();
Process();
}
void FakeNetworkPipeModule::MaybeResumeProcess() {
rtc::CritScope cs(&process_thread_lock_);
if (!pending_process_ && pipe_.TimeUntilNextProcess() && process_thread_) {
process_thread_->WakeUp(nullptr);
}
}
int64_t FakeNetworkPipeModule::TimeUntilNextProcess() {
auto delay = pipe_.TimeUntilNextProcess();
rtc::CritScope cs(&process_thread_lock_);
pending_process_ = delay.has_value();
return delay.value_or(kDoNothingProcessIntervalMs);
}
void FakeNetworkPipeModule::ProcessThreadAttached(
ProcessThread* process_thread) {
rtc::CritScope cs(&process_thread_lock_);
process_thread_ = process_thread;
}
void FakeNetworkPipeModule::Process() {
bool DegradedCall::FakeNetworkPipeOnTaskQueue::Process() {
pipe_.Process();
auto time_to_next = pipe_.TimeUntilNextProcess();
if (!time_to_next) {
// Packet was probably sent immediately.
return false;
}
task_queue_.PostTask([this, time_to_next]() {
RTC_DCHECK_RUN_ON(&task_queue_);
int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds();
if (!next_process_ms_ || next_process_time < *next_process_ms_) {
next_process_ms_ = next_process_time;
task_queue_.PostDelayedTask(
[this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (!Process()) {
next_process_ms_.reset();
}
},
*time_to_next);
}
});
return true;
}
DegradedCall::DegradedCall(
std::unique_ptr<Call> call,
absl::optional<BuiltInNetworkBehaviorConfig> send_config,
absl::optional<BuiltInNetworkBehaviorConfig> receive_config)
absl::optional<BuiltInNetworkBehaviorConfig> receive_config,
TaskQueueFactory* task_queue_factory)
: clock_(Clock::GetRealTimeClock()),
call_(std::move(call)),
task_queue_factory_(task_queue_factory),
send_config_(send_config),
send_process_thread_(
send_config_ ? ProcessThread::Create("DegradedSendThread") : nullptr),
num_send_streams_(0),
send_simulated_network_(nullptr),
receive_config_(receive_config) {
if (receive_config_) {
auto network = absl::make_unique<SimulatedNetwork>(*receive_config_);
@ -83,19 +87,9 @@ DegradedCall::DegradedCall(
absl::make_unique<webrtc::FakeNetworkPipe>(clock_, std::move(network));
receive_pipe_->SetReceiver(call_->Receiver());
}
if (send_process_thread_) {
send_process_thread_->Start();
}
}
DegradedCall::~DegradedCall() {
if (send_pipe_) {
send_process_thread_->DeRegisterModule(send_pipe_.get());
}
if (send_process_thread_) {
send_process_thread_->Stop();
}
}
DegradedCall::~DegradedCall() = default;
AudioSendStream* DegradedCall::CreateAudioSendStream(
const AudioSendStream::Config& config) {
@ -122,12 +116,10 @@ VideoSendStream* DegradedCall::CreateVideoSendStream(
if (send_config_ && !send_pipe_) {
auto network = absl::make_unique<SimulatedNetwork>(*send_config_);
send_simulated_network_ = network.get();
send_pipe_ = absl::make_unique<FakeNetworkPipeModule>(
clock_, std::move(network), config.send_transport);
send_pipe_ = absl::make_unique<FakeNetworkPipeOnTaskQueue>(
task_queue_factory_, clock_, std::move(network), config.send_transport);
config.send_transport = this;
send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
}
++num_send_streams_;
return call_->CreateVideoSendStream(std::move(config),
std::move(encoder_config));
}
@ -139,25 +131,16 @@ VideoSendStream* DegradedCall::CreateVideoSendStream(
if (send_config_ && !send_pipe_) {
auto network = absl::make_unique<SimulatedNetwork>(*send_config_);
send_simulated_network_ = network.get();
send_pipe_ = absl::make_unique<FakeNetworkPipeModule>(
clock_, std::move(network), config.send_transport);
send_pipe_ = absl::make_unique<FakeNetworkPipeOnTaskQueue>(
task_queue_factory_, clock_, std::move(network), config.send_transport);
config.send_transport = this;
send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
}
++num_send_streams_;
return call_->CreateVideoSendStream(
std::move(config), std::move(encoder_config), std::move(fec_controller));
}
void DegradedCall::DestroyVideoSendStream(VideoSendStream* send_stream) {
call_->DestroyVideoSendStream(send_stream);
if (send_pipe_ && num_send_streams_ > 0) {
--num_send_streams_;
if (num_send_streams_ == 0) {
send_process_thread_->DeRegisterModule(send_pipe_.get());
send_pipe_.reset();
}
}
}
VideoReceiveStream* DegradedCall::CreateVideoReceiveStream(

View File

@ -33,44 +33,20 @@
#include "call/simulated_network.h"
#include "call/video_receive_stream.h"
#include "call/video_send_stream.h"
#include "modules/include/module.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/network/sent_packet.h"
#include "rtc_base/task_queue.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
class FakeNetworkPipeModule : public Module {
public:
FakeNetworkPipeModule(
Clock* clock,
std::unique_ptr<NetworkBehaviorInterface> network_behavior,
Transport* transport);
~FakeNetworkPipeModule() override;
void SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options);
void SendRtcp(const uint8_t* packet, size_t length);
// Implements Module interface
int64_t TimeUntilNextProcess() override;
void ProcessThreadAttached(ProcessThread* process_thread) override;
void Process() override;
private:
void MaybeResumeProcess();
FakeNetworkPipe pipe_;
rtc::CriticalSection process_thread_lock_;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr;
bool pending_process_ RTC_GUARDED_BY(process_thread_lock_) = false;
};
class DegradedCall : public Call, private Transport, private PacketReceiver {
public:
explicit DegradedCall(
std::unique_ptr<Call> call,
absl::optional<BuiltInNetworkBehaviorConfig> send_config,
absl::optional<BuiltInNetworkBehaviorConfig> receive_config);
absl::optional<BuiltInNetworkBehaviorConfig> receive_config,
TaskQueueFactory* task_queue_factory);
~DegradedCall() override;
// Implements Call.
@ -125,17 +101,40 @@ class DegradedCall : public Call, private Transport, private PacketReceiver {
int64_t packet_time_us) override;
private:
class FakeNetworkPipeOnTaskQueue {
public:
FakeNetworkPipeOnTaskQueue(
TaskQueueFactory* task_queue_factory,
Clock* clock,
std::unique_ptr<NetworkBehaviorInterface> network_behavior,
Transport* transport);
void SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options);
void SendRtcp(const uint8_t* packet, size_t length);
private:
// Try to process packets on the fake network queue.
// Returns true if call resulted in a delayed process, false if queue empty.
bool Process();
Clock* const clock_;
rtc::TaskQueue task_queue_;
FakeNetworkPipe pipe_;
absl::optional<int64_t> next_process_ms_ RTC_GUARDED_BY(&task_queue_);
};
Clock* const clock_;
const std::unique_ptr<Call> call_;
TaskQueueFactory* const task_queue_factory_;
void MediaTransportChange(MediaTransportInterface* media_transport) override;
void SetClientBitratePreferences(
const webrtc::BitrateSettings& preferences) override {}
const absl::optional<BuiltInNetworkBehaviorConfig> send_config_;
const std::unique_ptr<ProcessThread> send_process_thread_;
SimulatedNetwork* send_simulated_network_;
std::unique_ptr<FakeNetworkPipeModule> send_pipe_;
size_t num_send_streams_;
std::unique_ptr<FakeNetworkPipeOnTaskQueue> send_pipe_;
const absl::optional<BuiltInNetworkBehaviorConfig> receive_config_;
SimulatedNetwork* receive_simulated_network_;