Prepare to avoid hops to worker for network events.
This moves the thread hop for network events, from BaseChannel and into Call. The reason for this is to move the control over those hops (including DeliverPacket[Async]) into the same class where the state is held that is affected by those hops. Once that's done, we can start moving the relevant network state over to the network thread and eventually remove the hops. I'm also adding several TODOs for tracking future steps and give developers a heads up. Bug: webrtc:11993 Change-Id: Ice7ee3b5b6893532df52039324293979196d341d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/204800 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Niels Moller <nisse@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33138}
This commit is contained in:
parent
16ab60c4c6
commit
d48a2b14e7
@ -341,6 +341,7 @@ bool AudioReceiveStream::SetMinimumPlayoutDelay(int delay_ms) {
|
||||
}
|
||||
|
||||
void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) {
|
||||
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
|
||||
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
|
||||
channel_receive_->SetAssociatedSendChannel(
|
||||
send_stream ? send_stream->GetChannel() : nullptr);
|
||||
@ -362,6 +363,8 @@ const webrtc::AudioReceiveStream::Config& AudioReceiveStream::config() const {
|
||||
|
||||
const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting()
|
||||
const {
|
||||
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or
|
||||
// remove test method and |associated_send_stream_| variable.
|
||||
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
|
||||
return associated_send_stream_;
|
||||
}
|
||||
|
||||
@ -787,6 +787,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers,
|
||||
|
||||
void ChannelReceive::SetAssociatedSendChannel(
|
||||
const ChannelSendInterface* channel) {
|
||||
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
|
||||
RTC_DCHECK(worker_thread_checker_.IsCurrent());
|
||||
MutexLock lock(&assoc_send_channel_lock_);
|
||||
associated_send_channel_ = channel;
|
||||
|
||||
107
call/call.cc
107
call/call.cc
@ -335,15 +335,18 @@ class Call final : public webrtc::Call,
|
||||
|
||||
NetworkState audio_network_state_;
|
||||
NetworkState video_network_state_;
|
||||
// TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the
|
||||
// network thread.
|
||||
bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
|
||||
|
||||
// Audio, Video, and FlexFEC receive streams are owned by the client that
|
||||
// creates them.
|
||||
// TODO(bugs.webrtc.org/11993): Move audio_receive_streams_,
|
||||
// video_receive_streams_ and sync_stream_mapping_ over to the network thread.
|
||||
std::set<AudioReceiveStream*> audio_receive_streams_
|
||||
RTC_GUARDED_BY(worker_thread_);
|
||||
std::set<VideoReceiveStream2*> video_receive_streams_
|
||||
RTC_GUARDED_BY(worker_thread_);
|
||||
|
||||
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
|
||||
RTC_GUARDED_BY(worker_thread_);
|
||||
|
||||
@ -378,6 +381,9 @@ class Call final : public webrtc::Call,
|
||||
// send side BWE are negotiated.
|
||||
const bool use_send_side_bwe;
|
||||
};
|
||||
|
||||
// TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the
|
||||
// network thread.
|
||||
std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
|
||||
RTC_GUARDED_BY(worker_thread_);
|
||||
|
||||
@ -800,6 +806,8 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
|
||||
audio_send_ssrcs_.end());
|
||||
audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
|
||||
|
||||
// TODO(bugs.webrtc.org/11993): call AssociateSendStream and
|
||||
// UpdateAggregateNetworkState asynchronously on the network thread.
|
||||
for (AudioReceiveStream* stream : audio_receive_streams_) {
|
||||
if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
|
||||
stream->AssociateSendStream(send_stream);
|
||||
@ -807,6 +815,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
|
||||
}
|
||||
|
||||
UpdateAggregateNetworkState();
|
||||
|
||||
return send_stream;
|
||||
}
|
||||
|
||||
@ -825,6 +834,8 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
|
||||
size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
|
||||
RTC_DCHECK_EQ(1, num_deleted);
|
||||
|
||||
// TODO(bugs.webrtc.org/11993): call AssociateSendStream and
|
||||
// UpdateAggregateNetworkState asynchronously on the network thread.
|
||||
for (AudioReceiveStream* stream : audio_receive_streams_) {
|
||||
if (stream->config().rtp.local_ssrc == ssrc) {
|
||||
stream->AssociateSendStream(nullptr);
|
||||
@ -832,6 +843,7 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
|
||||
}
|
||||
|
||||
UpdateAggregateNetworkState();
|
||||
|
||||
delete send_stream;
|
||||
}
|
||||
|
||||
@ -842,11 +854,19 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
|
||||
EnsureStarted();
|
||||
event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
|
||||
CreateRtcLogStreamConfig(config)));
|
||||
|
||||
// TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
|
||||
// and |audio_receiver_controller_| out of AudioReceiveStream construction and
|
||||
// set it up asynchronously on the network thread (the registration and
|
||||
// |audio_receiver_controller_| need to live on the network thread).
|
||||
AudioReceiveStream* receive_stream = new AudioReceiveStream(
|
||||
clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
|
||||
module_process_thread_->process_thread(), config_.neteq_factory, config,
|
||||
config_.audio_state, event_log_);
|
||||
|
||||
// TODO(bugs.webrtc.org/11993): Update the below on the network thread.
|
||||
// We could possibly set up the audio_receiver_controller_ association up
|
||||
// as part of the async setup.
|
||||
receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
|
||||
audio_receive_streams_.insert(receive_stream);
|
||||
|
||||
@ -873,8 +893,12 @@ void Call::DestroyAudioReceiveStream(
|
||||
uint32_t ssrc = config.rtp.remote_ssrc;
|
||||
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
|
||||
->RemoveStream(ssrc);
|
||||
|
||||
// TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync
|
||||
// and UpdateAggregateNetworkState on the network thread.
|
||||
audio_receive_streams_.erase(audio_receive_stream);
|
||||
const std::string& sync_group = audio_receive_stream->config().sync_group;
|
||||
|
||||
const auto it = sync_stream_mapping_.find(sync_group);
|
||||
if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
|
||||
sync_stream_mapping_.erase(it);
|
||||
@ -883,6 +907,9 @@ void Call::DestroyAudioReceiveStream(
|
||||
receive_rtp_config_.erase(ssrc);
|
||||
|
||||
UpdateAggregateNetworkState();
|
||||
// TODO(bugs.webrtc.org/11993): Consider if deleting |audio_receive_stream|
|
||||
// on the network thread would be better or if we'd need to tear down the
|
||||
// state in two phases.
|
||||
delete audio_receive_stream;
|
||||
}
|
||||
|
||||
@ -995,13 +1022,15 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
|
||||
|
||||
EnsureStarted();
|
||||
|
||||
TaskQueueBase* current = GetCurrentTaskQueueOrThread();
|
||||
RTC_CHECK(current);
|
||||
// TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
|
||||
// and |video_receiver_controller_| out of VideoReceiveStream2 construction
|
||||
// and set it up asynchronously on the network thread (the registration and
|
||||
// |video_receiver_controller_| need to live on the network thread).
|
||||
VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
|
||||
task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_,
|
||||
transport_send_ptr_->packet_router(), std::move(configuration),
|
||||
module_process_thread_->process_thread(), call_stats_.get(), clock_,
|
||||
new VCMTiming(clock_));
|
||||
task_queue_factory_, worker_thread_, &video_receiver_controller_,
|
||||
num_cpu_cores_, transport_send_ptr_->packet_router(),
|
||||
std::move(configuration), module_process_thread_->process_thread(),
|
||||
call_stats_.get(), clock_, new VCMTiming(clock_));
|
||||
|
||||
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
|
||||
if (config.rtp.rtx_ssrc) {
|
||||
@ -1134,34 +1163,54 @@ const WebRtcKeyValueConfig& Call::trials() const {
|
||||
}
|
||||
|
||||
void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
switch (media) {
|
||||
case MediaType::AUDIO:
|
||||
audio_network_state_ = state;
|
||||
break;
|
||||
case MediaType::VIDEO:
|
||||
video_network_state_ = state;
|
||||
break;
|
||||
case MediaType::ANY:
|
||||
case MediaType::DATA:
|
||||
RTC_NOTREACHED();
|
||||
break;
|
||||
}
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DCHECK(media == MediaType::AUDIO || media == MediaType::VIDEO);
|
||||
|
||||
UpdateAggregateNetworkState();
|
||||
for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
|
||||
video_receive_stream->SignalNetworkState(video_network_state_);
|
||||
auto closure = [this, media, state]() {
|
||||
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
if (media == MediaType::AUDIO) {
|
||||
audio_network_state_ = state;
|
||||
} else {
|
||||
RTC_DCHECK_EQ(media, MediaType::VIDEO);
|
||||
video_network_state_ = state;
|
||||
}
|
||||
|
||||
// TODO(tommi): Is it necessary to always do this, including if there
|
||||
// was no change in state?
|
||||
UpdateAggregateNetworkState();
|
||||
|
||||
// TODO(tommi): Is it right to do this if media == AUDIO?
|
||||
for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
|
||||
video_receive_stream->SignalNetworkState(video_network_state_);
|
||||
}
|
||||
};
|
||||
|
||||
if (network_thread_ == worker_thread_) {
|
||||
closure();
|
||||
} else {
|
||||
// TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to
|
||||
// post to the worker thread.
|
||||
worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure)));
|
||||
}
|
||||
}
|
||||
|
||||
void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
for (auto& kv : audio_send_ssrcs_) {
|
||||
kv.second->SetTransportOverhead(transport_overhead_per_packet);
|
||||
}
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
worker_thread_->PostTask(
|
||||
ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() {
|
||||
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
for (auto& kv : audio_send_ssrcs_) {
|
||||
kv.second->SetTransportOverhead(transport_overhead_per_packet);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
void Call::UpdateAggregateNetworkState() {
|
||||
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
|
||||
// RTC_DCHECK_RUN_ON(network_thread_);
|
||||
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
|
||||
bool have_audio =
|
||||
@ -1241,6 +1290,7 @@ void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
|
||||
}
|
||||
|
||||
void Call::ConfigureSync(const std::string& sync_group) {
|
||||
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
|
||||
// Set sync only if there was no previous one.
|
||||
if (sync_group.empty())
|
||||
return;
|
||||
@ -1452,6 +1502,9 @@ void Call::DeliverPacketAsync(MediaType media_type,
|
||||
}
|
||||
|
||||
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
|
||||
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
|
||||
// This method is called synchronously via |OnRtpPacket()| (see DeliverRtp)
|
||||
// on the same thread.
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
RtpPacketReceived parsed_packet;
|
||||
if (!parsed_packet.Parse(packet, length))
|
||||
|
||||
@ -1803,7 +1803,7 @@ void WebRtcVideoChannel::BackfillBufferedPackets(
|
||||
}
|
||||
|
||||
void WebRtcVideoChannel::OnReadyToSend(bool ready) {
|
||||
RTC_DCHECK_RUN_ON(&thread_checker_);
|
||||
RTC_DCHECK_RUN_ON(&network_thread_checker_);
|
||||
RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready.");
|
||||
call_->SignalChannelNetworkState(
|
||||
webrtc::MediaType::VIDEO,
|
||||
@ -1813,11 +1813,11 @@ void WebRtcVideoChannel::OnReadyToSend(bool ready) {
|
||||
void WebRtcVideoChannel::OnNetworkRouteChanged(
|
||||
const std::string& transport_name,
|
||||
const rtc::NetworkRoute& network_route) {
|
||||
RTC_DCHECK_RUN_ON(&thread_checker_);
|
||||
call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name,
|
||||
network_route);
|
||||
call_->GetTransportControllerSend()->OnTransportOverheadChanged(
|
||||
network_route.packet_overhead);
|
||||
RTC_DCHECK_RUN_ON(&network_thread_checker_);
|
||||
webrtc::RtpTransportControllerSendInterface* transport =
|
||||
call_->GetTransportControllerSend();
|
||||
transport->OnNetworkRouteChanged(transport_name, network_route);
|
||||
transport->OnTransportOverheadChanged(network_route.packet_overhead);
|
||||
}
|
||||
|
||||
void WebRtcVideoChannel::SetInterface(NetworkInterface* iface) {
|
||||
|
||||
@ -2290,7 +2290,7 @@ void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet,
|
||||
void WebRtcVoiceMediaChannel::OnNetworkRouteChanged(
|
||||
const std::string& transport_name,
|
||||
const rtc::NetworkRoute& network_route) {
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
RTC_DCHECK_RUN_ON(&network_thread_checker_);
|
||||
call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name,
|
||||
network_route);
|
||||
call_->OnAudioTransportOverheadChanged(network_route.packet_overhead);
|
||||
@ -2335,7 +2335,7 @@ bool WebRtcVoiceMediaChannel::SetMaxSendBitrate(int bps) {
|
||||
}
|
||||
|
||||
void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) {
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
RTC_DCHECK_RUN_ON(&network_thread_checker_);
|
||||
RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready.");
|
||||
call_->SignalChannelNetworkState(
|
||||
webrtc::MediaType::AUDIO,
|
||||
|
||||
@ -369,7 +369,7 @@ void BaseChannel::OnWritableState(bool writable) {
|
||||
|
||||
void BaseChannel::OnNetworkRouteChanged(
|
||||
absl::optional<rtc::NetworkRoute> network_route) {
|
||||
RTC_LOG(LS_INFO) << "Network route for " << ToString() << " was changed.";
|
||||
RTC_LOG(LS_INFO) << "Network route changed for " << ToString();
|
||||
|
||||
RTC_DCHECK_RUN_ON(network_thread());
|
||||
rtc::NetworkRoute new_route;
|
||||
@ -380,10 +380,7 @@ void BaseChannel::OnNetworkRouteChanged(
|
||||
// use the same transport name and MediaChannel::OnNetworkRouteChanged cannot
|
||||
// work correctly. Intentionally leave it broken to simplify the code and
|
||||
// encourage the users to stop using non-muxing RTCP.
|
||||
worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] {
|
||||
RTC_DCHECK_RUN_ON(worker_thread());
|
||||
media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
|
||||
}));
|
||||
media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
|
||||
}
|
||||
|
||||
sigslot::signal1<ChannelInterface*>& BaseChannel::SignalFirstPacketReceived() {
|
||||
@ -399,10 +396,8 @@ sigslot::signal1<const rtc::SentPacket&>& BaseChannel::SignalSentPacket() {
|
||||
}
|
||||
|
||||
void BaseChannel::OnTransportReadyToSend(bool ready) {
|
||||
worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] {
|
||||
RTC_DCHECK_RUN_ON(worker_thread());
|
||||
media_channel_->OnReadyToSend(ready);
|
||||
}));
|
||||
RTC_DCHECK_RUN_ON(network_thread());
|
||||
media_channel_->OnReadyToSend(ready);
|
||||
}
|
||||
|
||||
bool BaseChannel::SendPacket(bool rtcp,
|
||||
|
||||
@ -1205,11 +1205,13 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
|
||||
CreateChannels(0, 0);
|
||||
EXPECT_FALSE(media_channel1_->ready_to_send());
|
||||
|
||||
channel1_->OnTransportReadyToSend(true);
|
||||
network_thread_->PostTask(
|
||||
RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(true); });
|
||||
WaitForThreads();
|
||||
EXPECT_TRUE(media_channel1_->ready_to_send());
|
||||
|
||||
channel1_->OnTransportReadyToSend(false);
|
||||
network_thread_->PostTask(
|
||||
RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(false); });
|
||||
WaitForThreads();
|
||||
EXPECT_FALSE(media_channel1_->ready_to_send());
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user