Data Channel Benchmarking tool

Create a server using:
./data_channel_benchmark --server --port 12345
Start the flow of data from the server to a client using:
./data_channel_benchmark --port 12345 --transfer_size 100
The throughput is reported on the server console.

The negotiation does not require a 3rd party server and is done over a
gRPC transport. No TURN server is configured, so both peers need to be
reachable using STUN only.

Bug: webrtc:13288
Change-Id: Iac9a96cf390ab465ea45a46bf0b40950c56dfceb
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/235661
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36206}
This commit is contained in:
Florent Castelli 2022-03-15 16:01:52 +01:00 committed by WebRTC LUCI CQ
parent e486a7bdf7
commit 023be3c977
12 changed files with 1208 additions and 0 deletions

3
DEPS
View File

@ -156,6 +156,9 @@ deps = {
'url': 'https://chromium.googlesource.com/chromium/deps/findbugs.git@4275d9ac8610db6b1bc9a5e887f97e41b33fac67', 'url': 'https://chromium.googlesource.com/chromium/deps/findbugs.git@4275d9ac8610db6b1bc9a5e887f97e41b33fac67',
'condition': 'checkout_android', 'condition': 'checkout_android',
}, },
'src/third_party/grpc/src': {
'url': 'https://chromium.googlesource.com/external/github.com/grpc/grpc.git@f8a909e76fcd947949502832a7ab8e2cba2b8e27',
},
# Used for embedded builds. CrOS & Linux use the system version. # Used for embedded builds. CrOS & Linux use the system version.
'src/third_party/fontconfig/src': { 'src/third_party/fontconfig/src': {
'url': 'https://chromium.googlesource.com/external/fontconfig.git@452be8125f0e2a18a7dfef469e05d19374d36307', 'url': 'https://chromium.googlesource.com/external/fontconfig.git@452be8125f0e2a18a7dfef469e05d19374d36307',

View File

@ -47,6 +47,9 @@ group("rtc_tools") {
":unpack_aecdump", ":unpack_aecdump",
] ]
} }
if (!build_with_chromium && rtc_enable_grpc) {
deps += [ "data_channel_benchmark" ]
}
} }
rtc_library("video_file_reader") { rtc_library("video_file_reader") {

View File

@ -0,0 +1,62 @@
# Copyright 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import("//third_party/grpc/grpc_library.gni")
import("../../webrtc.gni")
grpc_library("signaling_grpc_proto") {
testonly = true
sources = [ "peer_connection_signaling.proto" ]
}
rtc_library("signaling_interface") {
sources = [ "signaling_interface.h" ]
deps = [ "../../api:libjingle_peerconnection_api" ]
}
rtc_library("grpc_signaling") {
testonly = true
sources = [
"grpc_signaling.cc",
"grpc_signaling.h",
]
deps = [
":signaling_grpc_proto",
":signaling_interface",
"../../api:libjingle_peerconnection_api",
"../../rtc_base:threading",
"//third_party/grpc:grpc++",
]
defines = [ "GPR_FORBID_UNREACHABLE_CODE=0" ]
}
rtc_executable("data_channel_benchmark") {
testonly = true
sources = [
"data_channel_benchmark.cc",
"peer_connection_client.cc",
"peer_connection_client.h",
]
deps = [
":grpc_signaling",
":signaling_interface",
"../../api:create_peerconnection_factory",
"../../api:libjingle_peerconnection_api",
"../../api:rtc_error",
"../../api:scoped_refptr",
"../../api/audio_codecs:builtin_audio_decoder_factory",
"../../api/audio_codecs:builtin_audio_encoder_factory",
"../../api/video_codecs:builtin_video_decoder_factory",
"../../api/video_codecs:builtin_video_encoder_factory",
"../../rtc_base",
"../../rtc_base:logging",
"../../rtc_base:refcount",
"../../rtc_base:threading",
"../../system_wrappers:field_trial",
"//third_party/abseil-cpp/absl/cleanup:cleanup",
"//third_party/abseil-cpp/absl/flags:flag",
"//third_party/abseil-cpp/absl/flags:parse",
]
}

View File

@ -0,0 +1,323 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*
* Data Channel Benchmarking tool.
*
* Create a server using: ./data_channel_benchmark --server --port 12345
* Start the flow of data from the server to a client using:
* ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196
* The throughput is reported on the server console.
*
* The negotiation does not require a 3rd party server and is done over a gRPC
* transport. No TURN server is configured, so both peers need to be reachable
* using STUN only.
*/
#include <inttypes.h>
#include <charconv>
#include "absl/cleanup/cleanup.h"
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "rtc_base/event.h"
#include "rtc_base/ssl_adapter.h"
#include "rtc_base/thread.h"
#include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
#include "rtc_tools/data_channel_benchmark/peer_connection_client.h"
#include "system_wrappers/include/field_trial.h"
ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)");
ABSL_FLAG(bool, server, false, "Server mode");
ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client");
ABSL_FLAG(std::string, address, "localhost", "Connect to server address");
ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)");
ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)");
ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size");
ABSL_FLAG(std::string,
force_fieldtrials,
"",
"Field trials control experimental feature code which can be forced. "
"E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/"
" will assign the group Enable to field trial WebRTC-FooFeature.");
struct SetupMessage {
size_t packet_size;
size_t transfer_size;
std::string ToString() {
char buffer[64];
rtc::SimpleStringBuilder sb(buffer);
sb << packet_size << "," << transfer_size;
return sb.str();
}
static SetupMessage FromString(absl::string_view sv) {
SetupMessage result;
auto parameters = rtc::split(sv, ',');
std::from_chars(parameters[0].data(),
parameters[0].data() + parameters[0].size(),
result.packet_size, 10);
std::from_chars(parameters[1].data(),
parameters[1].data() + parameters[1].size(),
result.transfer_size, 10);
return result;
}
};
class DataChannelObserverImpl : public webrtc::DataChannelObserver {
public:
explicit DataChannelObserverImpl(webrtc::DataChannelInterface* dc)
: dc_(dc), bytes_received_(0) {}
void OnStateChange() override {
RTC_LOG(LS_INFO) << "State changed to " << dc_->state();
switch (dc_->state()) {
case webrtc::DataChannelInterface::DataState::kOpen:
open_event_.Set();
break;
case webrtc::DataChannelInterface::DataState::kClosed:
closed_event_.Set();
break;
default:
break;
}
}
void OnMessage(const webrtc::DataBuffer& buffer) override {
bytes_received_ += buffer.data.size();
if (bytes_received_threshold_ &&
bytes_received_ >= bytes_received_threshold_) {
bytes_received_event_.Set();
}
if (setup_message_.empty() && !buffer.binary) {
setup_message_.assign(buffer.data.cdata<char>(), buffer.data.size());
setup_message_event_.Set();
}
}
void OnBufferedAmountChange(uint64_t sent_data_size) override {
if (dc_->buffered_amount() <
webrtc::DataChannelInterface::MaxSendQueueSize() / 2)
low_buffered_threshold_event_.Set();
else
low_buffered_threshold_event_.Reset();
}
bool WaitForOpenState(int duration_ms) {
return dc_->state() == webrtc::DataChannelInterface::DataState::kOpen ||
open_event_.Wait(duration_ms);
}
bool WaitForClosedState(int duration_ms) {
return dc_->state() == webrtc::DataChannelInterface::DataState::kClosed ||
closed_event_.Wait(duration_ms);
}
// Set how many received bytes are required until
// WaitForBytesReceivedThreshold return true.
void SetBytesReceivedThreshold(uint64_t bytes_received_threshold) {
bytes_received_threshold_ = bytes_received_threshold;
if (bytes_received_ >= bytes_received_threshold_)
bytes_received_event_.Set();
}
// Wait until the received byte count reaches the desired value.
bool WaitForBytesReceivedThreshold(int duration_ms) {
return (bytes_received_threshold_ &&
bytes_received_ >= bytes_received_threshold_) ||
bytes_received_event_.Wait(duration_ms);
}
bool WaitForLowbufferedThreshold(int duration_ms) {
return low_buffered_threshold_event_.Wait(duration_ms);
}
std::string SetupMessage() { return setup_message_; }
bool WaitForSetupMessage(int duration_ms) {
return setup_message_event_.Wait(duration_ms);
}
private:
webrtc::DataChannelInterface* dc_;
rtc::Event open_event_;
rtc::Event closed_event_;
rtc::Event bytes_received_event_;
absl::optional<uint64_t> bytes_received_threshold_;
uint64_t bytes_received_;
rtc::Event low_buffered_threshold_event_;
std::string setup_message_;
rtc::Event setup_message_event_;
};
int RunServer() {
bool oneshot = absl::GetFlag(FLAGS_oneshot);
uint16_t port = absl::GetFlag(FLAGS_port);
auto signaling_thread = rtc::Thread::Create();
signaling_thread->Start();
{
auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
signaling_thread.get());
auto grpc_server = webrtc::GrpcSignalingServerInterface::Create(
[factory = rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>(
factory)](webrtc::SignalingInterface* signaling) {
webrtc::PeerConnectionClient client(factory.get(), signaling);
client.StartPeerConnection();
auto peer_connection = client.peerConnection();
// Set up the data channel
auto dc_or_error =
peer_connection->CreateDataChannelOrError("benchmark", nullptr);
auto data_channel = dc_or_error.MoveValue();
auto data_channel_observer =
std::make_unique<DataChannelObserverImpl>(data_channel);
data_channel->RegisterObserver(data_channel_observer.get());
absl::Cleanup unregister_observer(
[data_channel] { data_channel->UnregisterObserver(); });
// Wait for a first message from the remote peer.
// It configures how much data should be sent and how big the packets
// should be.
// First message is "packet_size,transfer_size".
data_channel_observer->WaitForSetupMessage(rtc::Event::kForever);
auto parameters =
SetupMessage::FromString(data_channel_observer->SetupMessage());
// Wait for the sender and receiver peers to stabilize (send all ACKs)
// This makes it easier to isolate the sending part when profiling.
absl::SleepFor(absl::Seconds(1));
std::string data(parameters.packet_size, '0');
size_t remaining_data = parameters.transfer_size;
auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
while (remaining_data) {
if (remaining_data < data.size())
data.resize(remaining_data);
rtc::CopyOnWriteBuffer buffer(data);
webrtc::DataBuffer data_buffer(buffer, true);
if (!data_channel->Send(data_buffer)) {
// If the send() call failed, the buffers are full.
// We wait until there's more room.
data_channel_observer->WaitForLowbufferedThreshold(
rtc::Event::kForever);
continue;
}
remaining_data -= buffer.size();
fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n",
(parameters.transfer_size - remaining_data),
parameters.transfer_size,
(100 - remaining_data * 100 / parameters.transfer_size));
}
// Receiver signals the data channel close event when it has received
// all the data it requested.
data_channel_observer->WaitForClosedState(rtc::Event::kForever);
auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
auto duration_ms = (end_time - begin_time).ms<size_t>();
double throughput = (parameters.transfer_size / 1024. / 1024.) /
(duration_ms / 1000.);
printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput);
},
port, oneshot);
grpc_server->Start();
printf("Server listening on port %d\n", grpc_server->SelectedPort());
grpc_server->Wait();
}
signaling_thread->Quit();
return 0;
}
int RunClient() {
uint16_t port = absl::GetFlag(FLAGS_port);
std::string server_address = absl::GetFlag(FLAGS_address);
size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024;
size_t packet_size = absl::GetFlag(FLAGS_packet_size);
auto signaling_thread = rtc::Thread::Create();
signaling_thread->Start();
{
auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
signaling_thread.get());
auto grpc_client = webrtc::GrpcSignalingClientInterface::Create(
server_address + ":" + std::to_string(port));
webrtc::PeerConnectionClient client(factory.get(),
grpc_client->signaling_client());
// Set up the callback to receive the data channel from the sender.
rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel;
rtc::Event got_data_channel;
client.SetOnDataChannel(
[&data_channel, &got_data_channel](
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
data_channel = channel;
got_data_channel.Set();
});
// Connect to the server.
if (!grpc_client->Start()) {
fprintf(stderr, "Failed to connect to server\n");
return 1;
}
// Wait for the data channel to be received
got_data_channel.Wait(rtc::Event::kForever);
// DataChannel needs an observer to start draining the read queue
DataChannelObserverImpl observer(data_channel.get());
observer.SetBytesReceivedThreshold(transfer_size);
data_channel->RegisterObserver(&observer);
absl::Cleanup unregister_observer(
[data_channel] { data_channel->UnregisterObserver(); });
// Send a configuration string to the server to tell it to send
// 'packet_size' bytes packets and send a total of 'transfer_size' MB.
observer.WaitForOpenState(rtc::Event::kForever);
SetupMessage setup_message = {
.packet_size = packet_size,
.transfer_size = transfer_size,
};
if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) {
fprintf(stderr, "Failed to send parameter string\n");
return 1;
}
// Wait until we have received all the data
observer.WaitForBytesReceivedThreshold(rtc::Event::kForever);
// Close the data channel, signaling to the server we have received
// all the requested data.
data_channel->Close();
}
signaling_thread->Quit();
return 0;
}
int main(int argc, char** argv) {
rtc::InitializeSSL();
absl::ParseCommandLine(argc, argv);
// Make sure that higher severity number means more logs by reversing the
// rtc::LoggingSeverity values.
auto logging_severity =
std::max(0, rtc::LS_NONE - absl::GetFlag(FLAGS_verbose));
rtc::LogMessage::LogToDebug(
static_cast<rtc::LoggingSeverity>(logging_severity));
bool is_server = absl::GetFlag(FLAGS_server);
std::string field_trials = absl::GetFlag(FLAGS_force_fieldtrials);
webrtc::field_trial::InitFieldTrialsFromString(field_trials.c_str());
return is_server ? RunServer() : RunClient();
}

View File

@ -0,0 +1,267 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include <string>
#include <utility>
#include "api/jsep.h"
#include "api/jsep_ice_candidate.h"
#include "rtc_base/thread.h"
#include "rtc_tools/data_channel_benchmark/peer_connection_signaling.grpc.pb.h"
namespace webrtc {
namespace {
using GrpcSignaling::IceCandidate;
using GrpcSignaling::PeerConnectionSignaling;
using GrpcSignaling::SessionDescription;
using GrpcSignaling::SignalingMessage;
template <class T>
class SessionData : public webrtc::SignalingInterface {
public:
SessionData() {}
explicit SessionData(T* stream) : stream_(stream) {}
void SetStream(T* stream) { stream_ = stream; }
void SendIceCandidate(const IceCandidateInterface* candidate) override {
RTC_LOG(LS_INFO) << "SendIceCandidate";
std::string serialized_candidate;
if (!candidate->ToString(&serialized_candidate)) {
RTC_LOG(LS_ERROR) << "Failed to serialize ICE candidate";
return;
}
SignalingMessage message;
IceCandidate* proto_candidate = message.mutable_candidate();
proto_candidate->set_description(serialized_candidate);
proto_candidate->set_mid(candidate->sdp_mid());
proto_candidate->set_mline_index(candidate->sdp_mline_index());
stream_->Write(message);
}
void SendDescription(const SessionDescriptionInterface* sdp) override {
RTC_LOG(LS_INFO) << "SendDescription";
std::string serialized_sdp;
sdp->ToString(&serialized_sdp);
SignalingMessage message;
if (sdp->GetType() == SdpType::kOffer)
message.mutable_description()->set_type(SessionDescription::OFFER);
else if (sdp->GetType() == SdpType::kAnswer)
message.mutable_description()->set_type(SessionDescription::ANSWER);
message.mutable_description()->set_content(serialized_sdp);
stream_->Write(message);
}
void OnRemoteDescription(
std::function<void(std::unique_ptr<SessionDescriptionInterface> sdp)>
callback) override {
RTC_LOG(LS_INFO) << "OnRemoteDescription";
remote_description_callback_ = callback;
}
void OnIceCandidate(
std::function<void(std::unique_ptr<IceCandidateInterface> candidate)>
callback) override {
RTC_LOG(LS_INFO) << "OnIceCandidate";
ice_candidate_callback_ = callback;
}
T* stream_;
std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>)>
ice_candidate_callback_;
std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>)>
remote_description_callback_;
};
using ServerSessionData =
SessionData<grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>>;
using ClientSessionData =
SessionData<grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>;
template <class MessageType, class StreamReader, class SessionData>
void ProcessMessages(StreamReader* stream, SessionData* session) {
MessageType message;
while (stream->Read(&message)) {
switch (message.Content_case()) {
case SignalingMessage::ContentCase::kCandidate: {
webrtc::SdpParseError error;
auto jsep_candidate = std::make_unique<webrtc::JsepIceCandidate>(
message.candidate().mid(), message.candidate().mline_index());
if (!jsep_candidate->Initialize(message.candidate().description(),
&error)) {
RTC_LOG(LS_ERROR) << "Failed to deserialize ICE candidate '"
<< message.candidate().description() << "'";
RTC_LOG(LS_ERROR)
<< "Error at line " << error.line << ":" << error.description;
continue;
}
session->ice_candidate_callback_(std::move(jsep_candidate));
break;
}
case SignalingMessage::ContentCase::kDescription: {
auto& description = message.description();
auto content = description.content();
auto sdp = webrtc::CreateSessionDescription(
description.type() == SessionDescription::OFFER
? webrtc::SdpType::kOffer
: webrtc::SdpType::kAnswer,
description.content());
session->remote_description_callback_(std::move(sdp));
break;
}
default:
RTC_DCHECK_NOTREACHED();
}
}
}
class GrpcNegotiationServer : public GrpcSignalingServerInterface,
public PeerConnectionSignaling::Service {
public:
GrpcNegotiationServer(
std::function<void(webrtc::SignalingInterface*)> callback,
int port,
bool oneshot)
: connect_callback_(std::move(callback)),
requested_port_(port),
oneshot_(oneshot) {}
~GrpcNegotiationServer() override {
Stop();
if (server_stop_thread_)
server_stop_thread_->Stop();
}
void Start() override {
std::string server_address = "[::]";
grpc::ServerBuilder builder;
builder.AddListeningPort(
server_address + ":" + std::to_string(requested_port_),
grpc::InsecureServerCredentials(), &selected_port_);
builder.RegisterService(this);
server_ = builder.BuildAndStart();
}
void Wait() override { server_->Wait(); }
void Stop() override { server_->Shutdown(); }
int SelectedPort() override { return selected_port_; }
grpc::Status Connect(
grpc::ServerContext* context,
grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>* stream)
override {
if (oneshot_) {
// Request the termination of the server early so we don't serve another
// client in parallel.
server_stop_thread_ = rtc::Thread::Create();
server_stop_thread_->Start();
server_stop_thread_->PostTask([this] { Stop(); });
}
ServerSessionData session(stream);
auto reading_thread = rtc::Thread::Create();
reading_thread->Start();
reading_thread->PostTask([&session, &stream] {
ProcessMessages<SignalingMessage>(stream, &session);
});
connect_callback_(&session);
reading_thread->Stop();
return grpc::Status::OK;
}
private:
std::function<void(webrtc::SignalingInterface*)> connect_callback_;
int requested_port_;
int selected_port_;
bool oneshot_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<rtc::Thread> server_stop_thread_;
};
class GrpcNegotiationClient : public GrpcSignalingClientInterface {
public:
explicit GrpcNegotiationClient(const std::string& server) {
channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
stub_ = PeerConnectionSignaling::NewStub(channel_);
}
~GrpcNegotiationClient() override {
context_.TryCancel();
if (reading_thread_)
reading_thread_->Stop();
}
bool Start() override {
if (!channel_->WaitForConnected(
absl::ToChronoTime(absl::Now() + absl::Seconds(3)))) {
return false;
}
stream_ = stub_->Connect(&context_);
session_.SetStream(stream_.get());
reading_thread_ = rtc::Thread::Create();
reading_thread_->Start();
reading_thread_->PostTask([this] {
ProcessMessages<SignalingMessage>(stream_.get(), &session_);
});
return true;
}
webrtc::SignalingInterface* signaling_client() override { return &session_; }
private:
std::shared_ptr<grpc::Channel> channel_;
std::unique_ptr<PeerConnectionSignaling::Stub> stub_;
std::unique_ptr<rtc::Thread> reading_thread_;
grpc::ClientContext context_;
std::unique_ptr<
::grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>
stream_;
ClientSessionData session_;
};
} // namespace
std::unique_ptr<GrpcSignalingServerInterface>
GrpcSignalingServerInterface::Create(
std::function<void(webrtc::SignalingInterface*)> callback,
int port,
bool oneshot) {
return std::make_unique<GrpcNegotiationServer>(std::move(callback), port,
oneshot);
}
std::unique_ptr<GrpcSignalingClientInterface>
GrpcSignalingClientInterface::Create(const std::string& server) {
return std::make_unique<GrpcNegotiationClient>(server);
}
} // namespace webrtc

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_
#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_
#include <memory>
#include <string>
#include "api/jsep.h"
#include "rtc_tools/data_channel_benchmark/signaling_interface.h"
namespace webrtc {
// This class defines a server enabling clients to perform a PeerConnection
// negotiation directly over gRPC.
// When a client connects, a callback is run to handle the request.
class GrpcSignalingServerInterface {
public:
virtual ~GrpcSignalingServerInterface() = default;
// Start listening for connections.
virtual void Start() = 0;
// Wait for the gRPC server to terminate.
virtual void Wait() = 0;
// Stop the gRPC server instance.
virtual void Stop() = 0;
// The port the server is listening on.
virtual int SelectedPort() = 0;
// Create a gRPC server listening on |port| that will run |callback| on each
// request. If |oneshot| is true, it will terminate after serving one request.
static std::unique_ptr<GrpcSignalingServerInterface> Create(
std::function<void(webrtc::SignalingInterface*)> callback,
int port,
bool oneshot);
};
// This class defines a client that can connect to a server and perform a
// PeerConnection negotiation directly over gRPC.
class GrpcSignalingClientInterface {
public:
virtual ~GrpcSignalingClientInterface() = default;
// Connect the client to the gRPC server.
virtual bool Start() = 0;
virtual webrtc::SignalingInterface* signaling_client() = 0;
// Create a client to connnect to a server at |server_address|.
static std::unique_ptr<GrpcSignalingClientInterface> Create(
const std::string& server_address);
};
} // namespace webrtc
#endif // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_

View File

@ -0,0 +1,301 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_tools/data_channel_benchmark/peer_connection_client.h"
#include <memory>
#include <string>
#include <utility>
#include "api/audio_codecs/builtin_audio_decoder_factory.h"
#include "api/audio_codecs/builtin_audio_encoder_factory.h"
#include "api/create_peerconnection_factory.h"
#include "api/jsep.h"
#include "api/peer_connection_interface.h"
#include "api/rtc_error.h"
#include "api/scoped_refptr.h"
#include "api/set_remote_description_observer_interface.h"
#include "api/video_codecs/builtin_video_decoder_factory.h"
#include "api/video_codecs/builtin_video_encoder_factory.h"
#include "rtc_base/logging.h"
#include "rtc_base/thread.h"
namespace {
constexpr char kStunServer[] = "stun:stun.l.google.com:19302";
class SetLocalDescriptionObserverAdapter
: public webrtc::SetLocalDescriptionObserverInterface {
public:
using Callback = std::function<void(webrtc::RTCError)>;
static rtc::scoped_refptr<SetLocalDescriptionObserverAdapter> Create(
Callback callback) {
return rtc::scoped_refptr<SetLocalDescriptionObserverAdapter>(
new rtc::RefCountedObject<SetLocalDescriptionObserverAdapter>(
std::move(callback)));
}
explicit SetLocalDescriptionObserverAdapter(Callback callback)
: callback_(std::move(callback)) {}
~SetLocalDescriptionObserverAdapter() override = default;
private:
void OnSetLocalDescriptionComplete(webrtc::RTCError error) override {
callback_(std::move(error));
}
Callback callback_;
};
class SetRemoteDescriptionObserverAdapter
: public webrtc::SetRemoteDescriptionObserverInterface {
public:
using Callback = std::function<void(webrtc::RTCError)>;
static rtc::scoped_refptr<SetRemoteDescriptionObserverAdapter> Create(
Callback callback) {
return rtc::scoped_refptr<SetRemoteDescriptionObserverAdapter>(
new rtc::RefCountedObject<SetRemoteDescriptionObserverAdapter>(
std::move(callback)));
}
explicit SetRemoteDescriptionObserverAdapter(Callback callback)
: callback_(std::move(callback)) {}
~SetRemoteDescriptionObserverAdapter() override = default;
private:
void OnSetRemoteDescriptionComplete(webrtc::RTCError error) override {
callback_(std::move(error));
}
Callback callback_;
};
class CreateSessionDescriptionObserverAdapter
: public webrtc::CreateSessionDescriptionObserver {
public:
using Success = std::function<void(webrtc::SessionDescriptionInterface*)>;
using Failure = std::function<void(webrtc::RTCError)>;
static rtc::scoped_refptr<CreateSessionDescriptionObserverAdapter> Create(
Success success,
Failure failure) {
return rtc::scoped_refptr<CreateSessionDescriptionObserverAdapter>(
new rtc::RefCountedObject<CreateSessionDescriptionObserverAdapter>(
std::move(success), std::move(failure)));
}
CreateSessionDescriptionObserverAdapter(Success success, Failure failure)
: success_(std::move(success)), failure_(std::move(failure)) {}
~CreateSessionDescriptionObserverAdapter() override = default;
private:
void OnSuccess(webrtc::SessionDescriptionInterface* desc) override {
success_(desc);
}
void OnFailure(webrtc::RTCError error) override {
failure_(std::move(error));
}
Success success_;
Failure failure_;
};
} // namespace
namespace webrtc {
PeerConnectionClient::PeerConnectionClient(
webrtc::PeerConnectionFactoryInterface* factory,
webrtc::SignalingInterface* signaling)
: signaling_(signaling) {
signaling_->OnIceCandidate(
[&](std::unique_ptr<webrtc::IceCandidateInterface> candidate) {
AddIceCandidate(std::move(candidate));
});
signaling_->OnRemoteDescription(
[&](std::unique_ptr<webrtc::SessionDescriptionInterface> sdp) {
SetRemoteDescription(std::move(sdp));
});
InitializePeerConnection(factory);
}
PeerConnectionClient::~PeerConnectionClient() {
Disconnect();
}
rtc::scoped_refptr<PeerConnectionFactoryInterface>
PeerConnectionClient::CreateDefaultFactory(rtc::Thread* signaling_thread) {
auto factory = webrtc::CreatePeerConnectionFactory(
/*network_thread=*/nullptr, /*worker_thread=*/nullptr,
/*signaling_thread*/ signaling_thread,
/*default_adm=*/nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
webrtc::CreateBuiltinAudioDecoderFactory(),
webrtc::CreateBuiltinVideoEncoderFactory(),
webrtc::CreateBuiltinVideoDecoderFactory(),
/*audio_mixer=*/nullptr, /*audio_processing=*/nullptr);
if (!factory) {
RTC_LOG(LS_ERROR) << "Failed to initialize PeerConnectionFactory";
return nullptr;
}
return factory;
}
bool PeerConnectionClient::InitializePeerConnection(
webrtc::PeerConnectionFactoryInterface* factory) {
RTC_CHECK(factory)
<< "Must call InitializeFactory before InitializePeerConnection";
webrtc::PeerConnectionInterface::RTCConfiguration config;
webrtc::PeerConnectionInterface::IceServer server;
server.urls.push_back(kStunServer);
config.servers.push_back(server);
config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
config.enable_dtls_srtp = true;
webrtc::PeerConnectionDependencies dependencies(this);
auto result =
factory->CreatePeerConnectionOrError(config, std::move(dependencies));
if (!result.ok()) {
RTC_LOG(LS_ERROR) << "Failed to create PeerConnection: "
<< result.error().message();
DeletePeerConnection();
return false;
}
peer_connection_ = result.MoveValue();
RTC_LOG(LS_INFO) << "PeerConnection created successfully";
return true;
}
bool PeerConnectionClient::StartPeerConnection() {
RTC_LOG(LS_INFO) << "Creating offer";
peer_connection_->SetLocalDescription(
SetLocalDescriptionObserverAdapter::Create([this](
webrtc::RTCError error) {
if (error.ok())
signaling_->SendDescription(peer_connection_->local_description());
}));
return true;
}
bool PeerConnectionClient::IsConnected() {
return peer_connection_->peer_connection_state() ==
webrtc::PeerConnectionInterface::PeerConnectionState::kConnected;
}
// Disconnect from the call.
void PeerConnectionClient::Disconnect() {
for (auto& data_channel : data_channels_) {
data_channel->Close();
data_channel.release();
}
data_channels_.clear();
DeletePeerConnection();
}
// Delete the WebRTC PeerConnection.
void PeerConnectionClient::DeletePeerConnection() {
RTC_LOG(LS_INFO);
if (peer_connection_) {
peer_connection_->Close();
}
peer_connection_.release();
}
void PeerConnectionClient::OnIceConnectionChange(
webrtc::PeerConnectionInterface::IceConnectionState new_state) {
if (new_state == webrtc::PeerConnectionInterface::IceConnectionState::
kIceConnectionCompleted) {
RTC_LOG(LS_INFO) << "State is updating to connected";
} else if (new_state == webrtc::PeerConnectionInterface::IceConnectionState::
kIceConnectionDisconnected) {
RTC_LOG(LS_INFO) << "Disconnecting from peer";
Disconnect();
}
}
void PeerConnectionClient::OnIceGatheringChange(
webrtc::PeerConnectionInterface::IceGatheringState new_state) {
if (new_state == webrtc::PeerConnectionInterface::kIceGatheringComplete) {
RTC_LOG(LS_INFO) << "Client is ready to receive remote SDP";
}
}
void PeerConnectionClient::OnIceCandidate(
const webrtc::IceCandidateInterface* candidate) {
signaling_->SendIceCandidate(candidate);
}
void PeerConnectionClient::OnDataChannel(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
RTC_LOG(LS_INFO) << __FUNCTION__ << " remote datachannel created";
if (on_data_channel_callback_)
on_data_channel_callback_(channel);
data_channels_.push_back(channel);
}
void PeerConnectionClient::SetOnDataChannel(
std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)>
callback) {
on_data_channel_callback_ = callback;
}
void PeerConnectionClient::OnNegotiationNeededEvent(uint32_t event_id) {
RTC_LOG(LS_INFO) << "OnNegotiationNeededEvent";
peer_connection_->SetLocalDescription(
SetLocalDescriptionObserverAdapter::Create([this](
webrtc::RTCError error) {
if (error.ok())
signaling_->SendDescription(peer_connection_->local_description());
}));
}
bool PeerConnectionClient::SetRemoteDescription(
std::unique_ptr<webrtc::SessionDescriptionInterface> desc) {
RTC_LOG(LS_INFO) << "SetRemoteDescription";
auto type = desc->GetType();
peer_connection_->SetRemoteDescription(
std::move(desc),
SetRemoteDescriptionObserverAdapter::Create([&](webrtc::RTCError) {
RTC_LOG(LS_INFO) << "SetRemoteDescription done";
if (type == webrtc::SdpType::kOffer) {
// Got an offer from the remote, need to set an answer and send it.
peer_connection_->SetLocalDescription(
SetLocalDescriptionObserverAdapter::Create(
[this](webrtc::RTCError error) {
if (error.ok())
signaling_->SendDescription(
peer_connection_->local_description());
}));
}
}));
return true;
}
void PeerConnectionClient::AddIceCandidate(
std::unique_ptr<webrtc::IceCandidateInterface> candidate) {
RTC_LOG(LS_INFO) << "AddIceCandidate";
peer_connection_->AddIceCandidate(
std::move(candidate), [](const webrtc::RTCError& error) {
RTC_LOG(LS_INFO) << "Failed to add candidate: " << error.message();
});
}
} // namespace webrtc

View File

@ -0,0 +1,108 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_
#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_
#include <stdint.h>
#include <memory>
#include <string>
#include <vector>
#include "api/jsep.h"
#include "api/peer_connection_interface.h"
#include "api/rtp_receiver_interface.h"
#include "api/scoped_refptr.h"
#include "api/set_local_description_observer_interface.h"
#include "rtc_base/logging.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/thread.h"
#include "rtc_tools/data_channel_benchmark/signaling_interface.h"
namespace webrtc {
// Handles all the details for creating a PeerConnection and negotiation using a
// SignalingInterface object.
class PeerConnectionClient : public webrtc::PeerConnectionObserver {
public:
explicit PeerConnectionClient(webrtc::PeerConnectionFactoryInterface* factory,
webrtc::SignalingInterface* signaling);
~PeerConnectionClient() override;
PeerConnectionClient(const PeerConnectionClient&) = delete;
PeerConnectionClient& operator=(const PeerConnectionClient&) = delete;
// Set the local description and send offer using the SignalingInterface,
// initiating the negotiation process.
bool StartPeerConnection();
// Whether the peer connection is connected to the remote peer.
bool IsConnected();
// Disconnect from the call.
void Disconnect();
rtc::scoped_refptr<webrtc::PeerConnectionInterface> peerConnection() {
return peer_connection_;
}
// Set a callback to run when a DataChannel is created by the remote peer.
void SetOnDataChannel(
std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)>
callback);
std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>&
dataChannels() {
return data_channels_;
}
// Creates a default PeerConnectionFactory object.
static rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
CreateDefaultFactory(rtc::Thread* signaling_thread);
private:
void AddIceCandidate(
std::unique_ptr<webrtc::IceCandidateInterface> candidate);
bool SetRemoteDescription(
std::unique_ptr<webrtc::SessionDescriptionInterface> desc);
// Initialize the PeerConnection with a given PeerConnectionFactory.
bool InitializePeerConnection(
webrtc::PeerConnectionFactoryInterface* factory);
void DeletePeerConnection();
// PeerConnectionObserver implementation.
void OnSignalingChange(
webrtc::PeerConnectionInterface::SignalingState new_state) override {
RTC_LOG(LS_INFO) << __FUNCTION__ << " new state: " << new_state;
}
void OnDataChannel(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override;
void OnNegotiationNeededEvent(uint32_t event_id) override;
void OnIceConnectionChange(
webrtc::PeerConnectionInterface::IceConnectionState new_state) override;
void OnIceGatheringChange(
webrtc::PeerConnectionInterface::IceGatheringState new_state) override;
void OnIceCandidate(const webrtc::IceCandidateInterface* candidate) override;
void OnIceConnectionReceivingChange(bool receiving) override {
RTC_LOG(LS_INFO) << __FUNCTION__ << " receiving? " << receiving;
}
rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)>
on_data_channel_callback_;
std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>> data_channels_;
webrtc::SignalingInterface* signaling_;
};
} // namespace webrtc
#endif // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_

View File

@ -0,0 +1,29 @@
syntax = "proto3";
package webrtc.GrpcSignaling;
service PeerConnectionSignaling {
rpc Connect(stream SignalingMessage) returns (stream SignalingMessage) {}
}
message SignalingMessage {
oneof Content {
SessionDescription description = 1;
IceCandidate candidate = 2;
}
}
message SessionDescription {
enum SessionDescriptionType {
OFFER = 0;
ANSWER = 1;
}
SessionDescriptionType type = 1;
string content = 2;
}
message IceCandidate {
string mid = 1;
int32 mline_index = 2;
string description = 3;
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_
#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_
#include <memory>
#include "api/jsep.h"
namespace webrtc {
class SignalingInterface {
public:
virtual ~SignalingInterface() = default;
// Send an ICE candidate over the transport.
virtual void SendIceCandidate(
const webrtc::IceCandidateInterface* candidate) = 0;
// Send a local description over the transport.
virtual void SendDescription(
const webrtc::SessionDescriptionInterface* sdp) = 0;
// Set a callback when receiving a description from the transport.
virtual void OnRemoteDescription(
std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>
sdp)> callback) = 0;
// Set a callback when receiving an ICE candidate from the transport.
virtual void OnIceCandidate(
std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>
candidate)> callback) = 0;
};
} // namespace webrtc
#endif // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_

View File

@ -69,6 +69,9 @@ char kTSanDefaultSuppressions[] =
// http://crbug.com/244856 // http://crbug.com/244856
"race:libpulsecommon*.so\n" "race:libpulsecommon*.so\n"
// https://crbug.com/1158622
"race:absl::synchronization_internal::Waiter::Post\n"
// End of suppressions. // End of suppressions.
; // Please keep this semicolon. ; // Please keep this semicolon.

View File

@ -308,6 +308,9 @@ declare_args() {
# Enable the usrsctp backend for DataChannels and related unittests # Enable the usrsctp backend for DataChannels and related unittests
rtc_build_usrsctp = !build_with_mozilla && rtc_enable_sctp rtc_build_usrsctp = !build_with_mozilla && rtc_enable_sctp
# Enable gRPC used for negotiation in multiprocess tests
rtc_enable_grpc = rtc_enable_protobuf && (is_linux || is_mac)
} }
# Make it possible to provide custom locations for some libraries (move these # Make it possible to provide custom locations for some libraries (move these