Migrate rtc_base and rtc_tools to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: I71abe3db7a23ad33bd175297e23fa8e927fa9628
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268768
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37553}
This commit is contained in:
Danil Chapovalov 2022-07-18 17:04:56 +02:00 committed by WebRTC LUCI CQ
parent 049dde6c8e
commit 5286dcfab6
19 changed files with 116 additions and 143 deletions

View File

@ -948,7 +948,6 @@ rtc_library("threading") {
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/units:time_delta",
"synchronization:mutex",
"system:no_unique_address",
@ -1092,8 +1091,8 @@ rtc_library("rtc_base") {
"../api/numerics",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/transport:field_trial_based_config",
"../api/units:time_delta",
"../system_wrappers:field_trial",
"memory:always_valid_pointer",
"network:sent_packet",
@ -1378,7 +1377,6 @@ rtc_library("rtc_base_tests_utils") {
":stringutils",
":threading",
":timeutils",
"../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../api/units:timestamp",
"../test:scoped_key_value_config",
@ -1407,11 +1405,14 @@ rtc_library("task_queue_for_test") {
":macromagic",
":rtc_event",
":rtc_task_queue",
"../api:function_view",
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
"../api/task_queue:to_queued_task",
]
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
absl_deps = [
"//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/strings",
]
}
if (rtc_include_tests) {
@ -1594,7 +1595,6 @@ if (rtc_include_tests) {
"../api:make_ref_counted",
"../api:scoped_refptr",
"../api/numerics",
"../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../system_wrappers",
"../test:fileutils",
@ -1631,6 +1631,7 @@ if (rtc_include_tests) {
":rtc_task_queue",
":task_queue_for_test",
":timeutils",
"../api/units:time_delta",
"../test:test_main",
"../test:test_support",
]
@ -1736,7 +1737,6 @@ if (rtc_include_tests) {
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue_test",
"../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../test:field_trial",
"../test:fileutils",
@ -1770,6 +1770,7 @@ if (rtc_include_tests) {
}
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",

View File

@ -34,7 +34,6 @@
#endif // defined(WEBRTC_POSIX) && !defined(__native_client__)
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/ip_address.h"
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
@ -51,17 +50,17 @@ namespace rtc {
namespace {
void GlobalGcdRunTask(void* context) {
std::unique_ptr<webrtc::QueuedTask> task(
static_cast<webrtc::QueuedTask*>(context));
task->Run();
std::unique_ptr<absl::AnyInvocable<void() &&>> task(
static_cast<absl::AnyInvocable<void() &&>*>(context));
std::move (*task)();
}
// Post a task into the system-defined global concurrent queue.
void PostTaskToGlobalQueue(std::unique_ptr<webrtc::QueuedTask> task) {
void PostTaskToGlobalQueue(
std::unique_ptr<absl::AnyInvocable<void() &&>> task) {
dispatch_queue_global_t global_queue =
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
webrtc::QueuedTask* context = task.release();
dispatch_async_f(global_queue, context, &GlobalGcdRunTask);
dispatch_async_f(global_queue, task.release(), &GlobalGcdRunTask);
}
} // namespace
@ -156,7 +155,7 @@ void AsyncResolver::Start(const SocketAddress& addr) {
int error = ResolveHostname(addr.hostname(), addr.family(), &addresses);
webrtc::MutexLock lock(&state->mutex);
if (state->status == State::Status::kLive) {
caller_task_queue->PostTask(webrtc::ToQueuedTask(
caller_task_queue->PostTask(
[this, error, addresses = std::move(addresses), state] {
bool live;
{
@ -169,11 +168,12 @@ void AsyncResolver::Start(const SocketAddress& addr) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
ResolveDone(std::move(addresses), error);
}
}));
});
}
};
#if defined(WEBRTC_MAC) || defined(WEBRTC_IOS)
PostTaskToGlobalQueue(webrtc::ToQueuedTask(std::move(thread_function)));
PostTaskToGlobalQueue(
std::make_unique<absl::AnyInvocable<void() &&>>(thread_function));
#else
PlatformThread::SpawnDetached(std::move(thread_function), "AsyncResolver");
#endif

View File

@ -16,7 +16,6 @@
#include <string>
#include "absl/strings/string_view.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/ip_address.h"
#include "rtc_base/location.h"
#include "rtc_base/mdns_responder_interface.h"
@ -41,8 +40,7 @@ class FakeMdnsResponder : public MdnsResponderInterface {
name = std::to_string(next_available_id_++) + ".local";
addr_name_map_[addr] = name;
}
thread_->PostTask(
ToQueuedTask([callback, addr, name]() { callback(addr, name); }));
thread_->PostTask([callback, addr, name]() { callback(addr, name); });
}
void RemoveNameForAddress(const rtc::IPAddress& addr,
NameRemovedCallback callback) override {
@ -51,7 +49,7 @@ class FakeMdnsResponder : public MdnsResponderInterface {
addr_name_map_.erase(it);
}
bool result = it != addr_name_map_.end();
thread_->PostTask(ToQueuedTask([callback, result]() { callback(result); }));
thread_->PostTask([callback, result]() { callback(result); });
}
rtc::IPAddress GetMappedAddressForName(absl::string_view name) const {

View File

@ -36,7 +36,6 @@ rtc_library("fifo_buffer") {
"..:rtc_base",
"..:threading",
"../../api/task_queue:pending_task_safety_flag",
"../../api/task_queue:to_queued_task",
"../synchronization:mutex",
]
}

View File

@ -14,7 +14,6 @@
#include <memory>
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/stream.h"
#include "rtc_base/synchronization/mutex.h"
@ -81,9 +80,9 @@ class FifoBuffer final : public StreamInterface {
private:
void PostEvent(int events, int err) {
owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
SignalEvent(this, events, err);
}));
owner_->PostTask(webrtc::SafeTask(
task_safety_.flag(),
[this, events, err]() { SignalEvent(this, events, err); }));
}
// Helper method that implements Read. Caller must acquire a lock

View File

@ -30,8 +30,9 @@
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/to_queued_task.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/field_trial_based_config.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/memory/always_valid_pointer.h"
@ -44,6 +45,8 @@
namespace rtc {
namespace {
using ::webrtc::SafeTask;
using ::webrtc::TimeDelta;
// List of MAC addresses of known VPN (for windows).
constexpr uint8_t kVpns[2][6] = {
@ -906,14 +909,14 @@ void BasicNetworkManager::StartUpdating() {
// we should trigger network signal immediately for the new clients
// to start allocating ports.
if (sent_first_update_)
thread_->PostTask(ToQueuedTask(task_safety_flag_, [this] {
thread_->PostTask(SafeTask(task_safety_flag_, [this] {
RTC_DCHECK_RUN_ON(thread_);
SignalNetworksChanged();
}));
} else {
RTC_DCHECK(task_safety_flag_ == nullptr);
task_safety_flag_ = webrtc::PendingTaskSafetyFlag::Create();
thread_->PostTask(ToQueuedTask(task_safety_flag_, [this] {
thread_->PostTask(SafeTask(task_safety_flag_, [this] {
RTC_DCHECK_RUN_ON(thread_);
UpdateNetworksContinually();
}));
@ -1029,12 +1032,12 @@ void BasicNetworkManager::UpdateNetworksOnce() {
void BasicNetworkManager::UpdateNetworksContinually() {
UpdateNetworksOnce();
thread_->PostDelayedTask(ToQueuedTask(task_safety_flag_,
[this] {
RTC_DCHECK_RUN_ON(thread_);
UpdateNetworksContinually();
}),
kNetworksUpdateIntervalMs);
thread_->PostDelayedTask(SafeTask(task_safety_flag_,
[this] {
RTC_DCHECK_RUN_ON(thread_);
UpdateNetworksContinually();
}),
TimeDelta::Millis(kNetworksUpdateIntervalMs));
}
void BasicNetworkManager::DumpNetworks() {

View File

@ -39,7 +39,6 @@
#else
#include "rtc_base/openssl_identity.h"
#endif
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/openssl_utility.h"
#include "rtc_base/ssl_certificate.h"
#include "rtc_base/stream.h"
@ -60,6 +59,7 @@
namespace rtc {
namespace {
using ::webrtc::SafeTask;
// SRTP cipher suite table. `internal_name` is used to construct a
// colon-separated profile strings which is needed by
// SSL_CTX_set_tlsext_use_srtp().
@ -821,8 +821,9 @@ void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
}
void OpenSSLStreamAdapter::PostEvent(int events, int err) {
owner_->PostTask(webrtc::ToQueuedTask(
task_safety_, [this, events, err]() { SignalEvent(this, events, err); }));
owner_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
}));
}
void OpenSSLStreamAdapter::SetTimeout(int delay_ms) {

View File

@ -18,7 +18,6 @@
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/buffer_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/gunit.h"
@ -36,6 +35,7 @@ using ::testing::Combine;
using ::testing::tuple;
using ::testing::Values;
using ::testing::WithParamInterface;
using ::webrtc::SafeTask;
static const int kBlockSize = 4096;
static const char kExporterLabel[] = "label";
@ -220,7 +220,7 @@ class SSLDummyStreamBase : public rtc::StreamInterface,
private:
void PostEvent(int events, int err) {
thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
}));
}
@ -292,7 +292,7 @@ class BufferQueueStream : public rtc::StreamInterface {
private:
void PostEvent(int events, int err) {
thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
}));
}

View File

@ -13,9 +13,10 @@
#include <utility>
#include "absl/cleanup/cleanup.h"
#include "absl/strings/string_view.h"
#include "api/function_view.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/location.h"
@ -24,13 +25,14 @@
namespace webrtc {
template <typename Closure>
void SendTask(rtc::Location loc, TaskQueueBase* task_queue, Closure&& task) {
inline void SendTask(rtc::Location loc,
TaskQueueBase* task_queue,
rtc::FunctionView<void()> task) {
RTC_CHECK(!task_queue->IsCurrent())
<< "Called SendTask to a queue from the same queue at " << loc.ToString();
rtc::Event event;
task_queue->PostTask(
ToQueuedTask(std::forward<Closure>(task), [&event] { event.Set(); }));
absl::Cleanup cleanup = [&event] { event.Set(); };
task_queue->PostTask([task, cleanup = std::move(cleanup)] { task(); });
RTC_CHECK(event.Wait(/*give_up_after_ms=*/rtc::Event::kForever,
/*warn_after_ms=*/10'000))
<< "Waited too long at " << loc.ToString();
@ -47,24 +49,8 @@ class RTC_LOCKABLE TaskQueueForTest : public rtc::TaskQueue {
// A convenience, test-only method that blocks the current thread while
// a task executes on the task queue.
// This variant is specifically for posting custom QueuedTask derived
// implementations that tests do not want to pass ownership of over to the
// task queue (i.e. the Run() method always returns `false`.).
template <class Closure>
void SendTask(Closure* task) {
RTC_CHECK(!IsCurrent());
rtc::Event event;
PostTask(ToQueuedTask(
[&task] { RTC_CHECK_EQ(false, static_cast<QueuedTask*>(task)->Run()); },
[&event] { event.Set(); }));
event.Wait(rtc::Event::kForever);
}
// A convenience, test-only method that blocks the current thread while
// a task executes on the task queue.
template <class Closure>
void SendTask(Closure&& task, rtc::Location loc) {
::webrtc::SendTask(loc, Get(), std::forward<Closure>(task));
void SendTask(rtc::FunctionView<void()> task, rtc::Location loc) {
::webrtc::SendTask(loc, Get(), task);
}
// Wait for the completion of all tasks posted prior to the

View File

@ -46,7 +46,6 @@ if (rtc_include_tests) {
"..:rtc_task_queue",
"..:task_queue_for_test",
"../../api/task_queue",
"../../api/task_queue:to_queued_task",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../system_wrappers:system_wrappers",

View File

@ -1039,8 +1039,7 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
void Thread::AllowInvokesToThread(Thread* thread) {
#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask(
[thread, this]() { AllowInvokesToThread(thread); }));
PostTask([thread, this]() { AllowInvokesToThread(thread); });
return;
}
RTC_DCHECK_RUN_ON(this);
@ -1052,7 +1051,7 @@ void Thread::AllowInvokesToThread(Thread* thread) {
void Thread::DisallowAllInvokes() {
#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask([this]() { DisallowAllInvokes(); }));
PostTask([this]() { DisallowAllInvokes(); });
return;
}
RTC_DCHECK_RUN_ON(this);

View File

@ -32,7 +32,6 @@
#include "api/function_view.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h"

View File

@ -12,7 +12,6 @@
#include <memory>
#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "rtc_base/event.h"
#include "rtc_base/fake_clock.h"
@ -23,6 +22,7 @@
#include "test/gtest.h"
namespace rtc {
using ::webrtc::TimeDelta;
TEST(TimeTest, TimeInMs) {
int64_t ts_earlier = TimeMillis();
@ -270,10 +270,9 @@ TEST(FakeClock, SettingTimeWakesThreads) {
// Post an event that won't be executed for 10 seconds.
Event message_handler_dispatched;
worker->PostDelayedTask(webrtc::ToQueuedTask([&message_handler_dispatched] {
message_handler_dispatched.Set();
}),
/*milliseconds=*/60000);
worker->PostDelayedTask(
[&message_handler_dispatched] { message_handler_dispatched.Set(); },
TimeDelta::Seconds(60));
// Wait for a bit for the worker thread to be started and enter its socket
// select(). Otherwise this test would be trivial since the worker thread

View File

@ -14,8 +14,10 @@
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/functional/any_invocable.h"
#include "api/array_view.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/gunit.h"
#include "rtc_base/helpers.h"
#include "test/gmock.h"
@ -31,9 +33,11 @@ class FakeTaskQueue : public webrtc::TaskQueueBase {
FakeTaskQueue() : task_queue_setter_(this) {}
void Delete() override {}
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override {}
void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) override {}
void PostTask(absl::AnyInvocable<void() &&> task) override {}
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay) override {}
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay) override {}
private:
CurrentTaskQueueSetter task_queue_setter_;

View File

@ -42,6 +42,7 @@ if (rtc_enable_protobuf) {
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/task_queue:default_task_queue_factory",
"../../api/task_queue:pending_task_safety_flag",
"../../p2p:rtc_p2p",
"../../rtc_base",
"../../rtc_base:checks",
@ -58,7 +59,10 @@ if (rtc_enable_protobuf) {
"../../rtc_base/system:no_unique_address",
"../../rtc_base/third_party/sigslot",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/types:optional",
]
}
network_tester_unittests_resources = [

View File

@ -15,7 +15,8 @@
#include <string>
#include <utility>
#include "api/task_queue/queued_task.h"
#include "absl/functional/any_invocable.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/time_utils.h"
#include "rtc_tools/network_tester/config_reader.h"
@ -25,65 +26,47 @@ namespace webrtc {
namespace {
class SendPacketTask : public QueuedTask {
public:
explicit SendPacketTask(
PacketSender* packet_sender,
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
: target_time_ms_(rtc::TimeMillis()),
packet_sender_(packet_sender),
task_safety_flag_(task_safety_flag) {}
private:
bool Run() override {
if (task_safety_flag_->alive() && packet_sender_->IsSending()) {
packet_sender_->SendPacket();
target_time_ms_ += packet_sender_->GetSendIntervalMs();
int64_t delay_ms = std::max(static_cast<int64_t>(0),
target_time_ms_ - rtc::TimeMillis());
absl::AnyInvocable<void() &&> SendPacketTask(
PacketSender* packet_sender,
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
int64_t target_time_ms = rtc::TimeMillis()) {
return [target_time_ms, packet_sender,
task_safety_flag = std::move(task_safety_flag)]() mutable {
if (task_safety_flag->alive() && packet_sender->IsSending()) {
packet_sender->SendPacket();
target_time_ms += packet_sender->GetSendIntervalMs();
int64_t delay_ms =
std::max(static_cast<int64_t>(0), target_time_ms - rtc::TimeMillis());
TaskQueueBase::Current()->PostDelayedTask(
std::unique_ptr<QueuedTask>(this), delay_ms);
return false;
} else {
return true;
SendPacketTask(packet_sender, std::move(task_safety_flag),
target_time_ms),
TimeDelta::Millis(delay_ms));
}
}
int64_t target_time_ms_;
PacketSender* const packet_sender_;
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
};
};
}
class UpdateTestSettingTask : public QueuedTask {
public:
UpdateTestSettingTask(
PacketSender* packet_sender,
std::unique_ptr<ConfigReader> config_reader,
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
: packet_sender_(packet_sender),
config_reader_(std::move(config_reader)),
task_safety_flag_(task_safety_flag) {}
private:
bool Run() override {
if (!task_safety_flag_->alive()) {
return true;
absl::AnyInvocable<void() &&> UpdateTestSettingTask(
PacketSender* packet_sender,
std::unique_ptr<ConfigReader> config_reader,
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag) {
return [packet_sender, config_reader = std::move(config_reader),
task_safety_flag = std::move(task_safety_flag)]() mutable {
if (!task_safety_flag->alive()) {
return;
}
auto config = config_reader_->GetNextConfig();
if (config) {
packet_sender_->UpdateTestSetting((*config).packet_size,
(*config).packet_send_interval_ms);
if (absl::optional<ConfigReader::Config> config =
config_reader->GetNextConfig()) {
packet_sender->UpdateTestSetting(config->packet_size,
config->packet_send_interval_ms);
TaskQueueBase::Current()->PostDelayedTask(
std::unique_ptr<QueuedTask>(this), (*config).execution_time_ms);
return false;
UpdateTestSettingTask(packet_sender, std::move(config_reader),
std::move(task_safety_flag)),
TimeDelta::Millis(config->execution_time_ms));
} else {
packet_sender_->StopSending();
return true;
packet_sender->StopSending();
}
}
PacketSender* const packet_sender_;
const std::unique_ptr<ConfigReader> config_reader_;
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
};
};
}
} // namespace
@ -105,15 +88,14 @@ PacketSender::~PacketSender() = default;
void PacketSender::StartSending() {
worker_queue_checker_.Detach();
worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
worker_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
sending_ = true;
}));
worker_queue_->PostTask(std::make_unique<UpdateTestSettingTask>(
worker_queue_->PostTask(UpdateTestSettingTask(
this, std::make_unique<ConfigReader>(config_file_path_),
task_safety_flag_));
worker_queue_->PostTask(
std::make_unique<SendPacketTask>(this, task_safety_flag_));
worker_queue_->PostTask(SendPacketTask(this, task_safety_flag_));
}
void PacketSender::StopSending() {

View File

@ -15,10 +15,10 @@
#include <string>
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_factory.h"
#include "rtc_base/ignore_wundef.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/task_queue.h"
#ifdef WEBRTC_NETWORK_TESTER_PROTO
RTC_PUSH_IGNORING_WUNDEF()

View File

@ -67,7 +67,7 @@ void TestController::SendConnectTo(const std::string& hostname, int port) {
void TestController::SendData(const NetworkTesterPacket& packet,
absl::optional<size_t> data_size) {
if (!packet_sender_thread_->IsCurrent()) {
packet_sender_thread_->PostTask(ToQueuedTask(
packet_sender_thread_->PostTask(SafeTask(
task_safety_flag_,
[this, packet, data_size]() { this->SendData(packet, data_size); }));
return;

View File

@ -355,7 +355,7 @@ class RtpReplayer final {
// Creation of the streams must happen inside a task queue because it is
// resued as a worker thread.
worker_thread->PostTask(ToQueuedTask([&]() {
worker_thread->PostTask([&]() {
call.reset(Call::Create(call_config));
// Attempt to load the configuration
@ -375,7 +375,7 @@ class RtpReplayer final {
receive_stream->Start();
}
sync_event.Set();
}));
});
// Attempt to create an RtpReader from the input file.
std::unique_ptr<test::RtpFileReader> rtp_reader =
@ -392,7 +392,7 @@ class RtpReplayer final {
// Destruction of streams and the call must happen on the same thread as
// their creation.
worker_thread->PostTask(ToQueuedTask([&]() {
worker_thread->PostTask([&]() {
for (const auto& receive_stream : stream_state->receive_streams) {
call->DestroyVideoReceiveStream(receive_stream);
}
@ -401,7 +401,7 @@ class RtpReplayer final {
}
call.reset();
sync_event.Set();
}));
});
sync_event.Wait(/*give_up_after_ms=*/10000);
}
@ -606,12 +606,12 @@ class RtpReplayer final {
++num_packets;
PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK;
worker_thread->PostTask(ToQueuedTask([&]() {
worker_thread->PostTask([&]() {
result = call->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO,
std::move(packet_buffer),
/* packet_time_us */ -1);
event.Set();
}));
});
event.Wait(/*give_up_after_ms=*/10000);
switch (result) {
case PacketReceiver::DELIVERY_OK: