Align sender/receiver teardown in RtpTransceiver.

This makes SetChannel() consistently make 2 invokes instead of a
multiple of senders+receivers (previous minimum was 4 but could be
larger).

* Stop() doesn't hop to the worker thread.
* SetMediaChannel(), an already-required step on the worker thread for
  senders and *sometimes* for receivers[1], is now consistently required
  for both. This simplifies transceiver teardown and enables the next
  bullet.
* Transceiver stops all senders and receivers in one go rather than
  ping ponging between threads.

[1] When not required, it was done implicitly inside of Stop().
  See changes in `RtpTransceiver::SetChannel`

Bug: webrtc:13540
Change-Id: Ied61636c8ef09d782bf519524fff2a31e15219a8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249797
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36057}
This commit is contained in:
Tommi 2022-02-17 23:36:47 +01:00 committed by WebRTC LUCI CQ
parent 3b393ec991
commit 6589def397
16 changed files with 314 additions and 220 deletions

View File

@ -1278,6 +1278,7 @@ rtc_library("video_track") {
"../api:sequence_checker", "../api:sequence_checker",
"../api/video:video_frame", "../api/video:video_frame",
"../media:rtc_media_base", "../media:rtc_media_base",
"../pc:rtc_pc_base",
"../rtc_base", "../rtc_base",
"../rtc_base:checks", "../rtc_base:checks",
"../rtc_base:refcount", "../rtc_base:refcount",

View File

@ -25,20 +25,24 @@
namespace webrtc { namespace webrtc {
AudioRtpReceiver::AudioRtpReceiver(rtc::Thread* worker_thread, AudioRtpReceiver::AudioRtpReceiver(
std::string receiver_id, rtc::Thread* worker_thread,
std::vector<std::string> stream_ids, std::string receiver_id,
bool is_unified_plan) std::vector<std::string> stream_ids,
bool is_unified_plan,
cricket::VoiceMediaChannel* voice_channel /*= nullptr*/)
: AudioRtpReceiver(worker_thread, : AudioRtpReceiver(worker_thread,
receiver_id, receiver_id,
CreateStreamsFromIds(std::move(stream_ids)), CreateStreamsFromIds(std::move(stream_ids)),
is_unified_plan) {} is_unified_plan,
voice_channel) {}
AudioRtpReceiver::AudioRtpReceiver( AudioRtpReceiver::AudioRtpReceiver(
rtc::Thread* worker_thread, rtc::Thread* worker_thread,
const std::string& receiver_id, const std::string& receiver_id,
const std::vector<rtc::scoped_refptr<MediaStreamInterface>>& streams, const std::vector<rtc::scoped_refptr<MediaStreamInterface>>& streams,
bool is_unified_plan) bool is_unified_plan,
cricket::VoiceMediaChannel* voice_channel /*= nullptr*/)
: worker_thread_(worker_thread), : worker_thread_(worker_thread),
id_(receiver_id), id_(receiver_id),
source_(rtc::make_ref_counted<RemoteAudioSource>( source_(rtc::make_ref_counted<RemoteAudioSource>(
@ -49,7 +53,8 @@ AudioRtpReceiver::AudioRtpReceiver(
track_(AudioTrackProxyWithInternal<AudioTrack>::Create( track_(AudioTrackProxyWithInternal<AudioTrack>::Create(
rtc::Thread::Current(), rtc::Thread::Current(),
AudioTrack::Create(receiver_id, source_))), AudioTrack::Create(receiver_id, source_))),
cached_track_enabled_(track_->enabled()), media_channel_(voice_channel),
cached_track_enabled_(track_->internal()->enabled()),
attachment_id_(GenerateUniqueId()), attachment_id_(GenerateUniqueId()),
worker_thread_safety_(PendingTaskSafetyFlag::CreateDetachedInactive()) { worker_thread_safety_(PendingTaskSafetyFlag::CreateDetachedInactive()) {
RTC_DCHECK(worker_thread_); RTC_DCHECK(worker_thread_);
@ -69,15 +74,15 @@ AudioRtpReceiver::~AudioRtpReceiver() {
void AudioRtpReceiver::OnChanged() { void AudioRtpReceiver::OnChanged() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
if (cached_track_enabled_ != track_->enabled()) { const bool enabled = track_->internal()->enabled();
cached_track_enabled_ = track_->enabled(); if (cached_track_enabled_ == enabled)
worker_thread_->PostTask(ToQueuedTask( return;
worker_thread_safety_, cached_track_enabled_ = enabled;
[this, enabled = cached_track_enabled_, volume = cached_volume_]() { worker_thread_->PostTask(
RTC_DCHECK_RUN_ON(worker_thread_); ToQueuedTask(worker_thread_safety_, [this, enabled]() {
Reconfigure(enabled, volume); RTC_DCHECK_RUN_ON(worker_thread_);
})); Reconfigure(enabled);
} }));
} }
// RTC_RUN_ON(worker_thread_) // RTC_RUN_ON(worker_thread_)
@ -97,20 +102,18 @@ void AudioRtpReceiver::OnSetVolume(double volume) {
RTC_DCHECK_GE(volume, 0); RTC_DCHECK_GE(volume, 0);
RTC_DCHECK_LE(volume, 10); RTC_DCHECK_LE(volume, 10);
// Update the cached_volume_ even when stopped, to allow clients to set the bool track_enabled = track_->internal()->enabled();
// volume before starting/restarting, eg see crbug.com/1272566. worker_thread_->Invoke<void>(RTC_FROM_HERE, [&]() {
cached_volume_ = volume; RTC_DCHECK_RUN_ON(worker_thread_);
// Update the cached_volume_ even when stopped, to allow clients to set
// When the track is disabled, the volume of the source, which is the // the volume before starting/restarting, eg see crbug.com/1272566.
// corresponding WebRtc Voice Engine channel will be 0. So we do not allow cached_volume_ = volume;
// setting the volume to the source when the track is disabled. // When the track is disabled, the volume of the source, which is the
if (track_->enabled()) { // corresponding WebRtc Voice Engine channel will be 0. So we do not
worker_thread_->PostTask( // allow setting the volume to the source when the track is disabled.
ToQueuedTask(worker_thread_safety_, [this, volume = cached_volume_]() { if (track_enabled)
RTC_DCHECK_RUN_ON(worker_thread_); SetOutputVolume_w(volume);
SetOutputVolume_w(volume); });
}));
}
} }
rtc::scoped_refptr<DtlsTransportInterface> AudioRtpReceiver::dtls_transport() rtc::scoped_refptr<DtlsTransportInterface> AudioRtpReceiver::dtls_transport()
@ -159,52 +162,49 @@ AudioRtpReceiver::GetFrameDecryptor() const {
void AudioRtpReceiver::Stop() { void AudioRtpReceiver::Stop() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
// TODO(deadbeef): Need to do more here to fully stop receiving packets.
source_->SetState(MediaSourceInterface::kEnded); source_->SetState(MediaSourceInterface::kEnded);
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&]() {
RTC_DCHECK_RUN_ON(worker_thread_);
if (media_channel_)
SetOutputVolume_w(0.0);
SetMediaChannel_w(nullptr);
});
}
void AudioRtpReceiver::StopAndEndTrack() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
Stop();
track_->internal()->set_ended(); track_->internal()->set_ended();
} }
void AudioRtpReceiver::RestartMediaChannel(absl::optional<uint32_t> ssrc) { void AudioRtpReceiver::SetSourceEnded() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
source_->SetState(MediaSourceInterface::kEnded);
}
// RTC_RUN_ON(&signaling_thread_checker_)
void AudioRtpReceiver::RestartMediaChannel(absl::optional<uint32_t> ssrc) {
bool enabled = track_->internal()->enabled();
MediaSourceInterface::SourceState state = source_->state(); MediaSourceInterface::SourceState state = source_->state();
worker_thread_->Invoke<void>( worker_thread_->Invoke<void>(RTC_FROM_HERE, [&]() {
RTC_FROM_HERE, RTC_DCHECK_RUN_ON(worker_thread_);
[&, enabled = cached_track_enabled_, volume = cached_volume_]() { RestartMediaChannel_w(std::move(ssrc), enabled, state);
RTC_DCHECK_RUN_ON(worker_thread_); });
if (!media_channel_)
return; // Can't restart.
if (state != MediaSourceInterface::kInitializing) {
if (ssrc_ == ssrc)
return;
source_->Stop(media_channel_, ssrc_);
}
ssrc_ = std::move(ssrc);
source_->Start(media_channel_, ssrc_);
if (ssrc_) {
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
Reconfigure(enabled, volume);
});
source_->SetState(MediaSourceInterface::kLive); source_->SetState(MediaSourceInterface::kLive);
} }
// RTC_RUN_ON(worker_thread_)
void AudioRtpReceiver::RestartMediaChannel_w(
absl::optional<uint32_t> ssrc,
bool track_enabled,
MediaSourceInterface::SourceState state) {
if (!media_channel_)
return; // Can't restart.
if (state != MediaSourceInterface::kInitializing) {
if (ssrc_ == ssrc)
return;
source_->Stop(media_channel_, ssrc_);
}
ssrc_ = std::move(ssrc);
source_->Start(media_channel_, ssrc_);
if (ssrc_) {
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
Reconfigure(track_enabled);
}
void AudioRtpReceiver::SetupMediaChannel(uint32_t ssrc) { void AudioRtpReceiver::SetupMediaChannel(uint32_t ssrc) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RestartMediaChannel(ssrc); RestartMediaChannel(ssrc);
@ -284,10 +284,10 @@ void AudioRtpReceiver::SetDepacketizerToDecoderFrameTransformer(
} }
// RTC_RUN_ON(worker_thread_) // RTC_RUN_ON(worker_thread_)
void AudioRtpReceiver::Reconfigure(bool track_enabled, double volume) { void AudioRtpReceiver::Reconfigure(bool track_enabled) {
RTC_DCHECK(media_channel_); RTC_DCHECK(media_channel_);
SetOutputVolume_w(track_enabled ? volume : 0); SetOutputVolume_w(track_enabled ? cached_volume_ : 0);
if (ssrc_ && frame_decryptor_) { if (ssrc_ && frame_decryptor_) {
// Reattach the frame decryptor if we were reconfigured. // Reattach the frame decryptor if we were reconfigured.
@ -318,18 +318,12 @@ void AudioRtpReceiver::SetJitterBufferMinimumDelay(
} }
void AudioRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) { void AudioRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(media_channel == nullptr || RTC_DCHECK(media_channel == nullptr ||
media_channel->media_type() == media_type()); media_channel->media_type() == media_type());
if (!media_channel && media_channel_)
SetOutputVolume_w(0.0);
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_);
SetMediaChannel_w(media_channel);
});
}
// RTC_RUN_ON(worker_thread_)
void AudioRtpReceiver::SetMediaChannel_w(cricket::MediaChannel* media_channel) {
media_channel ? worker_thread_safety_->SetAlive() media_channel ? worker_thread_safety_->SetAlive()
: worker_thread_safety_->SetNotAlive(); : worker_thread_safety_->SetNotAlive();
media_channel_ = static_cast<cricket::VoiceMediaChannel*>(media_channel); media_channel_ = static_cast<cricket::VoiceMediaChannel*>(media_channel);

View File

@ -45,16 +45,24 @@ class AudioRtpReceiver : public ObserverInterface,
public AudioSourceInterface::AudioObserver, public AudioSourceInterface::AudioObserver,
public RtpReceiverInternal { public RtpReceiverInternal {
public: public:
// The constructor supports optionally passing the voice channel to the
// instance at construction time without having to call `SetMediaChannel()`
// on the worker thread straight after construction.
// However, when using that, the assumption is that right after construction,
// a call to either `SetupUnsignaledMediaChannel` or `SetupMediaChannel`
// will be made, which will internally start the source on the worker thread.
AudioRtpReceiver(rtc::Thread* worker_thread, AudioRtpReceiver(rtc::Thread* worker_thread,
std::string receiver_id, std::string receiver_id,
std::vector<std::string> stream_ids, std::vector<std::string> stream_ids,
bool is_unified_plan); bool is_unified_plan,
cricket::VoiceMediaChannel* voice_channel = nullptr);
// TODO(https://crbug.com/webrtc/9480): Remove this when streams() is removed. // TODO(https://crbug.com/webrtc/9480): Remove this when streams() is removed.
AudioRtpReceiver( AudioRtpReceiver(
rtc::Thread* worker_thread, rtc::Thread* worker_thread,
const std::string& receiver_id, const std::string& receiver_id,
const std::vector<rtc::scoped_refptr<MediaStreamInterface>>& streams, const std::vector<rtc::scoped_refptr<MediaStreamInterface>>& streams,
bool is_unified_plan); bool is_unified_plan,
cricket::VoiceMediaChannel* media_channel = nullptr);
virtual ~AudioRtpReceiver(); virtual ~AudioRtpReceiver();
// ObserverInterface implementation // ObserverInterface implementation
@ -90,7 +98,7 @@ class AudioRtpReceiver : public ObserverInterface,
// RtpReceiverInternal implementation. // RtpReceiverInternal implementation.
void Stop() override; void Stop() override;
void StopAndEndTrack() override; void SetSourceEnded() override;
void SetupMediaChannel(uint32_t ssrc) override; void SetupMediaChannel(uint32_t ssrc) override;
void SetupUnsignaledMediaChannel() override; void SetupUnsignaledMediaChannel() override;
uint32_t ssrc() const override; uint32_t ssrc() const override;
@ -114,12 +122,14 @@ class AudioRtpReceiver : public ObserverInterface,
override; override;
private: private:
void RestartMediaChannel(absl::optional<uint32_t> ssrc); void RestartMediaChannel(absl::optional<uint32_t> ssrc)
void Reconfigure(bool track_enabled, double volume) RTC_RUN_ON(&signaling_thread_checker_);
void RestartMediaChannel_w(absl::optional<uint32_t> ssrc,
bool track_enabled,
MediaSourceInterface::SourceState state)
RTC_RUN_ON(worker_thread_); RTC_RUN_ON(worker_thread_);
void Reconfigure(bool track_enabled) RTC_RUN_ON(worker_thread_);
void SetOutputVolume_w(double volume) RTC_RUN_ON(worker_thread_); void SetOutputVolume_w(double volume) RTC_RUN_ON(worker_thread_);
void SetMediaChannel_w(cricket::MediaChannel* media_channel)
RTC_RUN_ON(worker_thread_);
RTC_NO_UNIQUE_ADDRESS SequenceChecker signaling_thread_checker_; RTC_NO_UNIQUE_ADDRESS SequenceChecker signaling_thread_checker_;
rtc::Thread* const worker_thread_; rtc::Thread* const worker_thread_;
@ -132,7 +142,7 @@ class AudioRtpReceiver : public ObserverInterface,
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams_ std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams_
RTC_GUARDED_BY(&signaling_thread_checker_); RTC_GUARDED_BY(&signaling_thread_checker_);
bool cached_track_enabled_ RTC_GUARDED_BY(&signaling_thread_checker_); bool cached_track_enabled_ RTC_GUARDED_BY(&signaling_thread_checker_);
double cached_volume_ RTC_GUARDED_BY(&signaling_thread_checker_) = 1.0; double cached_volume_ RTC_GUARDED_BY(worker_thread_) = 1.0;
RtpReceiverObserverInterface* observer_ RtpReceiverObserverInterface* observer_
RTC_GUARDED_BY(&signaling_thread_checker_) = nullptr; RTC_GUARDED_BY(&signaling_thread_checker_) = nullptr;
bool received_first_packet_ RTC_GUARDED_BY(&signaling_thread_checker_) = bool received_first_packet_ RTC_GUARDED_BY(&signaling_thread_checker_) =

View File

@ -24,6 +24,7 @@ using ::testing::Mock;
static const int kTimeOut = 100; static const int kTimeOut = 100;
static const double kDefaultVolume = 1; static const double kDefaultVolume = 1;
static const double kVolume = 3.7; static const double kVolume = 3.7;
static const double kVolumeMuted = 0.0;
static const uint32_t kSsrc = 3; static const uint32_t kSsrc = 3;
namespace webrtc { namespace webrtc {
@ -42,8 +43,8 @@ class AudioRtpReceiverTest : public ::testing::Test {
} }
~AudioRtpReceiverTest() { ~AudioRtpReceiverTest() {
EXPECT_CALL(media_channel_, SetOutputVolume(kSsrc, kVolumeMuted));
receiver_->SetMediaChannel(nullptr); receiver_->SetMediaChannel(nullptr);
receiver_->Stop();
} }
rtc::Thread* worker_; rtc::Thread* worker_;

View File

@ -363,7 +363,6 @@ rtc::scoped_refptr<MockRtpSenderInternal> CreateMockSender(
EXPECT_CALL(*sender, AttachmentId()).WillRepeatedly(Return(attachment_id)); EXPECT_CALL(*sender, AttachmentId()).WillRepeatedly(Return(attachment_id));
EXPECT_CALL(*sender, stream_ids()).WillRepeatedly(Return(local_stream_ids)); EXPECT_CALL(*sender, stream_ids()).WillRepeatedly(Return(local_stream_ids));
EXPECT_CALL(*sender, SetTransceiverAsStopped()); EXPECT_CALL(*sender, SetTransceiverAsStopped());
EXPECT_CALL(*sender, Stop());
return sender; return sender;
} }
@ -389,7 +388,7 @@ rtc::scoped_refptr<MockRtpReceiverInternal> CreateMockReceiver(
return params; return params;
})); }));
EXPECT_CALL(*receiver, AttachmentId()).WillRepeatedly(Return(attachment_id)); EXPECT_CALL(*receiver, AttachmentId()).WillRepeatedly(Return(attachment_id));
EXPECT_CALL(*receiver, StopAndEndTrack()); EXPECT_CALL(*receiver, Stop()).WillRepeatedly(Return());
return receiver; return receiver;
} }
@ -460,6 +459,8 @@ class RTCStatsCollectorWrapper {
rtc::scoped_refptr<MockRtpSenderInternal> sender = rtc::scoped_refptr<MockRtpSenderInternal> sender =
CreateMockSender(media_type, track, ssrc, attachment_id, {}); CreateMockSender(media_type, track, ssrc, attachment_id, {});
EXPECT_CALL(*sender, Stop());
EXPECT_CALL(*sender, SetMediaChannel(_));
pc_->AddSender(sender); pc_->AddSender(sender);
return sender; return sender;
} }
@ -490,6 +491,7 @@ class RTCStatsCollectorWrapper {
.WillRepeatedly( .WillRepeatedly(
Return(std::vector<rtc::scoped_refptr<MediaStreamInterface>>( Return(std::vector<rtc::scoped_refptr<MediaStreamInterface>>(
{remote_stream}))); {remote_stream})));
EXPECT_CALL(*receiver, SetMediaChannel(_)).WillRepeatedly(Return());
pc_->AddReceiver(receiver); pc_->AddReceiver(receiver);
return receiver; return receiver;
} }
@ -532,6 +534,7 @@ class RTCStatsCollectorWrapper {
voice_sender_info.local_stats[0].ssrc, voice_sender_info.local_stats[0].ssrc,
voice_sender_info.local_stats[0].ssrc + 10, local_stream_ids); voice_sender_info.local_stats[0].ssrc + 10, local_stream_ids);
EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return()); EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return());
EXPECT_CALL(*rtp_sender, Stop());
pc_->AddSender(rtp_sender); pc_->AddSender(rtp_sender);
} }
@ -550,7 +553,7 @@ class RTCStatsCollectorWrapper {
voice_receiver_info.local_stats[0].ssrc + 10); voice_receiver_info.local_stats[0].ssrc + 10);
EXPECT_CALL(*rtp_receiver, streams()) EXPECT_CALL(*rtp_receiver, streams())
.WillRepeatedly(Return(remote_streams)); .WillRepeatedly(Return(remote_streams));
EXPECT_CALL(*rtp_receiver, SetMediaChannel(_)); EXPECT_CALL(*rtp_receiver, SetMediaChannel(_)).WillRepeatedly(Return());
pc_->AddReceiver(rtp_receiver); pc_->AddReceiver(rtp_receiver);
} }
@ -569,6 +572,7 @@ class RTCStatsCollectorWrapper {
video_sender_info.local_stats[0].ssrc, video_sender_info.local_stats[0].ssrc,
video_sender_info.local_stats[0].ssrc + 10, local_stream_ids); video_sender_info.local_stats[0].ssrc + 10, local_stream_ids);
EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return()); EXPECT_CALL(*rtp_sender, SetMediaChannel(_)).WillRepeatedly(Return());
EXPECT_CALL(*rtp_sender, Stop());
pc_->AddSender(rtp_sender); pc_->AddSender(rtp_sender);
} }
@ -587,7 +591,7 @@ class RTCStatsCollectorWrapper {
video_receiver_info.local_stats[0].ssrc + 10); video_receiver_info.local_stats[0].ssrc + 10);
EXPECT_CALL(*rtp_receiver, streams()) EXPECT_CALL(*rtp_receiver, streams())
.WillRepeatedly(Return(remote_streams)); .WillRepeatedly(Return(remote_streams));
EXPECT_CALL(*rtp_receiver, SetMediaChannel(_)); EXPECT_CALL(*rtp_receiver, SetMediaChannel(_)).WillRepeatedly(Return());
pc_->AddReceiver(rtp_receiver); pc_->AddReceiver(rtp_receiver);
} }
@ -2691,6 +2695,8 @@ TEST_F(RTCStatsCollectorTest, RTCVideoSourceStatsCollectedForSenderWithTrack) {
"LocalVideoTrackID", MediaStreamTrackInterface::kLive, video_source); "LocalVideoTrackID", MediaStreamTrackInterface::kLive, video_source);
rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender( rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender(
cricket::MEDIA_TYPE_VIDEO, video_track, kSsrc, kAttachmentId, {}); cricket::MEDIA_TYPE_VIDEO, video_track, kSsrc, kAttachmentId, {});
EXPECT_CALL(*sender, Stop());
EXPECT_CALL(*sender, SetMediaChannel(_));
pc_->AddSender(sender); pc_->AddSender(sender);
rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport(); rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport();
@ -2734,6 +2740,8 @@ TEST_F(RTCStatsCollectorTest,
"LocalVideoTrackID", MediaStreamTrackInterface::kLive, video_source); "LocalVideoTrackID", MediaStreamTrackInterface::kLive, video_source);
rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender( rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender(
cricket::MEDIA_TYPE_VIDEO, video_track, kNoSsrc, kAttachmentId, {}); cricket::MEDIA_TYPE_VIDEO, video_track, kNoSsrc, kAttachmentId, {});
EXPECT_CALL(*sender, Stop());
EXPECT_CALL(*sender, SetMediaChannel(_));
pc_->AddSender(sender); pc_->AddSender(sender);
rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport(); rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport();
@ -2763,6 +2771,8 @@ TEST_F(RTCStatsCollectorTest,
/*source=*/nullptr); /*source=*/nullptr);
rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender( rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender(
cricket::MEDIA_TYPE_VIDEO, video_track, kSsrc, kAttachmentId, {}); cricket::MEDIA_TYPE_VIDEO, video_track, kSsrc, kAttachmentId, {});
EXPECT_CALL(*sender, Stop());
EXPECT_CALL(*sender, SetMediaChannel(_));
pc_->AddSender(sender); pc_->AddSender(sender);
rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport(); rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport();
@ -2785,6 +2795,8 @@ TEST_F(RTCStatsCollectorTest,
pc_->AddVoiceChannel("AudioMid", "TransportName", voice_media_info); pc_->AddVoiceChannel("AudioMid", "TransportName", voice_media_info);
rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender( rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender(
cricket::MEDIA_TYPE_AUDIO, /*track=*/nullptr, kSsrc, kAttachmentId, {}); cricket::MEDIA_TYPE_AUDIO, /*track=*/nullptr, kSsrc, kAttachmentId, {});
EXPECT_CALL(*sender, Stop());
EXPECT_CALL(*sender, SetMediaChannel(_));
pc_->AddSender(sender); pc_->AddSender(sender);
rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport(); rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport();
@ -3108,6 +3120,8 @@ TEST_F(RTCStatsCollectorTest,
rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender( rtc::scoped_refptr<MockRtpSenderInternal> sender = CreateMockSender(
cricket::MEDIA_TYPE_VIDEO, /*track=*/nullptr, kSsrc, kAttachmentId, {}); cricket::MEDIA_TYPE_VIDEO, /*track=*/nullptr, kSsrc, kAttachmentId, {});
EXPECT_CALL(*sender, Stop());
EXPECT_CALL(*sender, SetMediaChannel(_));
pc_->AddSender(sender); pc_->AddSender(sender);
rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport(); rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport();
@ -3257,6 +3271,7 @@ TEST_F(RTCStatsCollectorTest, StatsReportedOnZeroSsrc) {
MediaStreamTrackInterface::kLive); MediaStreamTrackInterface::kLive);
rtc::scoped_refptr<MockRtpSenderInternal> sender = rtc::scoped_refptr<MockRtpSenderInternal> sender =
CreateMockSender(cricket::MEDIA_TYPE_AUDIO, track, 0, 49, {}); CreateMockSender(cricket::MEDIA_TYPE_AUDIO, track, 0, 49, {});
EXPECT_CALL(*sender, Stop());
pc_->AddSender(sender); pc_->AddSender(sender);
rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport(); rtc::scoped_refptr<const RTCStatsReport> report = stats_->GetStatsReport();
@ -3276,6 +3291,7 @@ TEST_F(RTCStatsCollectorTest, DoNotCrashOnSsrcChange) {
MediaStreamTrackInterface::kLive); MediaStreamTrackInterface::kLive);
rtc::scoped_refptr<MockRtpSenderInternal> sender = rtc::scoped_refptr<MockRtpSenderInternal> sender =
CreateMockSender(cricket::MEDIA_TYPE_AUDIO, track, 4711, 49, {}); CreateMockSender(cricket::MEDIA_TYPE_AUDIO, track, 4711, 49, {});
EXPECT_CALL(*sender, Stop());
pc_->AddSender(sender); pc_->AddSender(sender);
// We do not generate any matching voice_sender_info stats. // We do not generate any matching voice_sender_info stats.

View File

@ -42,16 +42,27 @@ namespace webrtc {
// Internal class used by PeerConnection. // Internal class used by PeerConnection.
class RtpReceiverInternal : public RtpReceiverInterface { class RtpReceiverInternal : public RtpReceiverInterface {
public: public:
// Stops receiving. The track may be reactivated. // Call on the signaling thread, to let the receiver know that the the
// embedded source object should enter a stopped/ended state and the track's
// state set to `kEnded`, a final state that cannot be reversed.
virtual void Stop() = 0; virtual void Stop() = 0;
// Stops the receiver permanently.
// Causes the associated track to enter kEnded state. Cannot be reversed. // Call on the signaling thread to set the source's state to `ended` before
virtual void StopAndEndTrack() = 0; // clearing the media channel (`SetMediaChannel(nullptr)`) on the worker
// thread.
// The difference between `Stop()` and `SetSourceEnded()` is that the latter
// does not change the state of the associated track.
// NOTE: Calling this function should be followed with a call to
// `SetMediaChannel(nullptr)` on the worker thread, to complete the operation.
virtual void SetSourceEnded() = 0;
// Sets the underlying MediaEngine channel associated with this RtpSender. // Sets the underlying MediaEngine channel associated with this RtpSender.
// A VoiceMediaChannel should be used for audio RtpSenders and // A VoiceMediaChannel should be used for audio RtpSenders and
// a VideoMediaChannel should be used for video RtpSenders. // a VideoMediaChannel should be used for video RtpSenders.
// Must call SetMediaChannel(nullptr) before the media channel is destroyed. // NOTE:
// * SetMediaChannel(nullptr) must be called before the media channel is
// destroyed.
// * This method must be invoked on the worker thread.
virtual void SetMediaChannel(cricket::MediaChannel* media_channel) = 0; virtual void SetMediaChannel(cricket::MediaChannel* media_channel) = 0;
// Configures the RtpReceiver with the underlying media channel, with the // Configures the RtpReceiver with the underlying media channel, with the

View File

@ -222,6 +222,11 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface {
std::vector<std::string> stream_ids_; std::vector<std::string> stream_ids_;
RtpParameters init_parameters_; RtpParameters init_parameters_;
// TODO(tommi): `media_channel_` and several other member variables in this
// class (ssrc_, stopped_, etc) are accessed from more than one thread without
// a guard or lock. Internally there are also several Invoke()s that we could
// remove since the upstream code may already be performing several operations
// on the worker thread.
cricket::MediaChannel* media_channel_ = nullptr; cricket::MediaChannel* media_channel_ = nullptr;
rtc::scoped_refptr<MediaStreamTrackInterface> track_; rtc::scoped_refptr<MediaStreamTrackInterface> track_;

View File

@ -347,7 +347,7 @@ class RtpSenderReceiverTest
void DestroyAudioRtpReceiver() { void DestroyAudioRtpReceiver() {
if (!audio_rtp_receiver_) if (!audio_rtp_receiver_)
return; return;
audio_rtp_receiver_->Stop(); audio_rtp_receiver_->SetMediaChannel(nullptr);
audio_rtp_receiver_ = nullptr; audio_rtp_receiver_ = nullptr;
VerifyVoiceChannelNoOutput(); VerifyVoiceChannelNoOutput();
} }
@ -356,6 +356,7 @@ class RtpSenderReceiverTest
if (!video_rtp_receiver_) if (!video_rtp_receiver_)
return; return;
video_rtp_receiver_->Stop(); video_rtp_receiver_->Stop();
video_rtp_receiver_->SetMediaChannel(nullptr);
video_rtp_receiver_ = nullptr; video_rtp_receiver_ = nullptr;
VerifyVideoChannelNoOutput(); VerifyVideoChannelNoOutput();
} }
@ -1640,7 +1641,7 @@ TEST_F(RtpSenderReceiverTest, AudioReceiverCannotSetFrameDecryptorAfterStop) {
rtc::scoped_refptr<FrameDecryptorInterface> fake_frame_decryptor( rtc::scoped_refptr<FrameDecryptorInterface> fake_frame_decryptor(
new FakeFrameDecryptor()); new FakeFrameDecryptor());
EXPECT_EQ(nullptr, audio_rtp_receiver_->GetFrameDecryptor()); EXPECT_EQ(nullptr, audio_rtp_receiver_->GetFrameDecryptor());
audio_rtp_receiver_->Stop(); audio_rtp_receiver_->SetMediaChannel(nullptr);
audio_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor); audio_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor);
// TODO(webrtc:9926) - Validate media channel not set once fakes updated. // TODO(webrtc:9926) - Validate media channel not set once fakes updated.
DestroyAudioRtpReceiver(); DestroyAudioRtpReceiver();
@ -1687,7 +1688,7 @@ TEST_F(RtpSenderReceiverTest, VideoReceiverCannotSetFrameDecryptorAfterStop) {
rtc::scoped_refptr<FrameDecryptorInterface> fake_frame_decryptor( rtc::scoped_refptr<FrameDecryptorInterface> fake_frame_decryptor(
new FakeFrameDecryptor()); new FakeFrameDecryptor());
EXPECT_EQ(nullptr, video_rtp_receiver_->GetFrameDecryptor()); EXPECT_EQ(nullptr, video_rtp_receiver_->GetFrameDecryptor());
video_rtp_receiver_->Stop(); video_rtp_receiver_->SetMediaChannel(nullptr);
video_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor); video_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor);
// TODO(webrtc:9926) - Validate media channel not set once fakes updated. // TODO(webrtc:9926) - Validate media channel not set once fakes updated.
DestroyVideoRtpReceiver(); DestroyVideoRtpReceiver();

View File

@ -212,26 +212,34 @@ void RtpTransceiver::SetChannel(
} }
}); });
for (const auto& sender : senders_) {
sender->internal()->SetMediaChannel(channel_ ? channel_->media_channel()
: nullptr);
}
RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1);
for (const auto& receiver : receivers_) { if (!channel_) {
if (!channel_) { for (const auto& receiver : receivers_)
receiver->internal()->Stop(); receiver->internal()->SetSourceEnded();
} else { RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); // There should not be an invoke.
receiver->internal()->SetMediaChannel(channel_->media_channel());
}
} }
// Destroy the channel, if we had one, now _after_ updating the receivers who if (channel_to_delete || !senders_.empty() || !receivers_.empty()) {
// might have had references to the previous channel. channel_manager_->worker_thread()->Invoke<void>(RTC_FROM_HERE, [&]() {
if (channel_to_delete) { auto* media_channel = channel_ ? channel_->media_channel() : nullptr;
channel_manager_->DestroyChannel(channel_to_delete); for (const auto& sender : senders_) {
sender->internal()->SetMediaChannel(media_channel);
}
for (const auto& receiver : receivers_) {
receiver->internal()->SetMediaChannel(media_channel);
}
// Destroy the channel, if we had one, now _after_ updating the receivers
// who might have had references to the previous channel.
if (channel_to_delete) {
channel_manager_->DestroyChannel(channel_to_delete);
}
});
} }
RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(2);
} }
void RtpTransceiver::AddSender( void RtpTransceiver::AddSender(
@ -272,6 +280,7 @@ void RtpTransceiver::AddReceiver(
} }
bool RtpTransceiver::RemoveReceiver(RtpReceiverInterface* receiver) { bool RtpTransceiver::RemoveReceiver(RtpReceiverInterface* receiver) {
RTC_DCHECK_RUN_ON(thread_);
RTC_DCHECK(!unified_plan_); RTC_DCHECK(!unified_plan_);
if (receiver) { if (receiver) {
RTC_DCHECK_EQ(media_type(), receiver->media_type()); RTC_DCHECK_EQ(media_type(), receiver->media_type());
@ -280,8 +289,13 @@ bool RtpTransceiver::RemoveReceiver(RtpReceiverInterface* receiver) {
if (it == receivers_.end()) { if (it == receivers_.end()) {
return false; return false;
} }
// `Stop()` will clear the internally cached pointer to the media channel.
(*it)->internal()->Stop(); (*it)->internal()->Stop();
channel_manager_->worker_thread()->Invoke<void>(RTC_FROM_HERE, [&]() {
// `Stop()` will clear the receiver's pointer to the media channel.
(*it)->internal()->SetMediaChannel(nullptr);
});
receivers_.erase(it); receivers_.erase(it);
return true; return true;
} }
@ -399,15 +413,22 @@ void RtpTransceiver::StopSendingAndReceiving() {
// //
// 3. Stop sending media with sender. // 3. Stop sending media with sender.
// //
RTC_DCHECK_RUN_ON(thread_);
// 4. Send an RTCP BYE for each RTP stream that was being sent by sender, as // 4. Send an RTCP BYE for each RTP stream that was being sent by sender, as
// specified in [RFC3550]. // specified in [RFC3550].
RTC_DCHECK_RUN_ON(thread_);
for (const auto& sender : senders_) for (const auto& sender : senders_)
sender->internal()->Stop(); sender->internal()->Stop();
// 5. Stop receiving media with receiver. // Signal to receiver sources that we're stopping.
for (const auto& receiver : receivers_) for (const auto& receiver : receivers_)
receiver->internal()->StopAndEndTrack(); receiver->internal()->Stop();
channel_manager_->worker_thread()->Invoke<void>(RTC_FROM_HERE, [&]() {
// 5 Stop receiving media with receiver.
for (const auto& receiver : receivers_)
receiver->internal()->SetMediaChannel(nullptr);
});
stopping_ = true; stopping_ = true;
direction_ = webrtc::RtpTransceiverDirection::kInactive; direction_ = webrtc::RtpTransceiverDirection::kInactive;

View File

@ -146,7 +146,8 @@ class RtpTransceiverUnifiedPlanTest : public ::testing::Test {
// Basic tests for Stop() // Basic tests for Stop()
TEST_F(RtpTransceiverUnifiedPlanTest, StopSetsDirection) { TEST_F(RtpTransceiverUnifiedPlanTest, StopSetsDirection) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -204,8 +205,7 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test {
} }
void ClearChannel(cricket::MockChannelInterface& mock_channel) { void ClearChannel(cricket::MockChannelInterface& mock_channel) {
EXPECT_CALL(*sender_.get(), SetMediaChannel(nullptr)); EXPECT_CALL(*sender_.get(), SetMediaChannel(_));
EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_)); EXPECT_CALL(mock_channel, SetFirstPacketReceivedCallback(_));
EXPECT_CALL(channel_manager_, DestroyChannel(&mock_channel)) EXPECT_CALL(channel_manager_, DestroyChannel(&mock_channel))
.WillRepeatedly(testing::Return()); .WillRepeatedly(testing::Return());
@ -221,7 +221,8 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test {
}; };
TEST_F(RtpTransceiverTestForHeaderExtensions, OffersChannelManagerList) { TEST_F(RtpTransceiverTestForHeaderExtensions, OffersChannelManagerList) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -229,7 +230,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, OffersChannelManagerList) {
} }
TEST_F(RtpTransceiverTestForHeaderExtensions, ModifiesDirection) { TEST_F(RtpTransceiverTestForHeaderExtensions, ModifiesDirection) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -253,7 +255,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ModifiesDirection) {
} }
TEST_F(RtpTransceiverTestForHeaderExtensions, AcceptsStoppedExtension) { TEST_F(RtpTransceiverTestForHeaderExtensions, AcceptsStoppedExtension) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -265,7 +268,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, AcceptsStoppedExtension) {
} }
TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsUnsupportedExtension) { TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsUnsupportedExtension) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -279,7 +283,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, RejectsUnsupportedExtension) {
TEST_F(RtpTransceiverTestForHeaderExtensions, TEST_F(RtpTransceiverTestForHeaderExtensions,
RejectsStoppedMandatoryExtensions) { RejectsStoppedMandatoryExtensions) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -299,7 +304,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions,
TEST_F(RtpTransceiverTestForHeaderExtensions, TEST_F(RtpTransceiverTestForHeaderExtensions,
NoNegotiatedHdrExtsWithoutChannel) { NoNegotiatedHdrExtsWithoutChannel) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre()); EXPECT_THAT(transceiver_.HeaderExtensionsNegotiated(), ElementsAre());
@ -308,8 +314,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions,
TEST_F(RtpTransceiverTestForHeaderExtensions, TEST_F(RtpTransceiverTestForHeaderExtensions,
NoNegotiatedHdrExtsWithChannelWithoutNegotiation) { NoNegotiatedHdrExtsWithChannelWithoutNegotiation) {
const std::string content_name("my_mid"); const std::string content_name("my_mid");
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)).WillRepeatedly(Return());
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop()).WillRepeatedly(Return());
EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); EXPECT_CALL(*sender_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -329,8 +335,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions,
TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) { TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) {
const std::string content_name("my_mid"); const std::string content_name("my_mid");
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)); EXPECT_CALL(*receiver_.get(), SetMediaChannel(_)).WillRepeatedly(Return());
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop()).WillRepeatedly(Return());
EXPECT_CALL(*sender_.get(), SetMediaChannel(_)); EXPECT_CALL(*sender_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());
@ -362,7 +368,8 @@ TEST_F(RtpTransceiverTestForHeaderExtensions, ReturnsNegotiatedHdrExts) {
TEST_F(RtpTransceiverTestForHeaderExtensions, TEST_F(RtpTransceiverTestForHeaderExtensions,
ReturnsNegotiatedHdrExtsSecondTime) { ReturnsNegotiatedHdrExtsSecondTime) {
EXPECT_CALL(*receiver_.get(), StopAndEndTrack()); EXPECT_CALL(*receiver_.get(), Stop());
EXPECT_CALL(*receiver_.get(), SetMediaChannel(_));
EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped()); EXPECT_CALL(*sender_.get(), SetTransceiverAsStopped());
EXPECT_CALL(*sender_.get(), Stop()); EXPECT_CALL(*sender_.get(), Stop());

View File

@ -459,13 +459,14 @@ void RtpTransmissionManager::CreateAudioReceiver(
// TODO(https://crbug.com/webrtc/9480): When we remove remote_streams(), use // TODO(https://crbug.com/webrtc/9480): When we remove remote_streams(), use
// the constructor taking stream IDs instead. // the constructor taking stream IDs instead.
auto audio_receiver = rtc::make_ref_counted<AudioRtpReceiver>( auto audio_receiver = rtc::make_ref_counted<AudioRtpReceiver>(
worker_thread(), remote_sender_info.sender_id, streams, IsUnifiedPlan()); worker_thread(), remote_sender_info.sender_id, streams, IsUnifiedPlan(),
audio_receiver->SetMediaChannel(voice_media_channel()); voice_media_channel());
if (remote_sender_info.sender_id == kDefaultAudioSenderId) { if (remote_sender_info.sender_id == kDefaultAudioSenderId) {
audio_receiver->SetupUnsignaledMediaChannel(); audio_receiver->SetupUnsignaledMediaChannel();
} else { } else {
audio_receiver->SetupMediaChannel(remote_sender_info.first_ssrc); audio_receiver->SetupMediaChannel(remote_sender_info.first_ssrc);
} }
auto receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create( auto receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
signaling_thread(), worker_thread(), std::move(audio_receiver)); signaling_thread(), worker_thread(), std::move(audio_receiver));
GetAudioTransceiver()->internal()->AddReceiver(receiver); GetAudioTransceiver()->internal()->AddReceiver(receiver);
@ -483,12 +484,13 @@ void RtpTransmissionManager::CreateVideoReceiver(
// the constructor taking stream IDs instead. // the constructor taking stream IDs instead.
auto video_receiver = rtc::make_ref_counted<VideoRtpReceiver>( auto video_receiver = rtc::make_ref_counted<VideoRtpReceiver>(
worker_thread(), remote_sender_info.sender_id, streams); worker_thread(), remote_sender_info.sender_id, streams);
video_receiver->SetMediaChannel(video_media_channel());
if (remote_sender_info.sender_id == kDefaultVideoSenderId) { video_receiver->SetupMediaChannel(
video_receiver->SetupUnsignaledMediaChannel(); remote_sender_info.sender_id == kDefaultVideoSenderId
} else { ? absl::nullopt
video_receiver->SetupMediaChannel(remote_sender_info.first_ssrc); : absl::optional<uint32_t>(remote_sender_info.first_ssrc),
} video_media_channel());
auto receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create( auto receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
signaling_thread(), worker_thread(), std::move(video_receiver)); signaling_thread(), worker_thread(), std::move(video_receiver));
GetVideoTransceiver()->internal()->AddReceiver(receiver); GetVideoTransceiver()->internal()->AddReceiver(receiver);

View File

@ -764,9 +764,8 @@ static rtc::scoped_refptr<MockRtpReceiverInternal> CreateMockReceiver(
Return(track->kind() == MediaStreamTrackInterface::kAudioKind Return(track->kind() == MediaStreamTrackInterface::kAudioKind
? cricket::MEDIA_TYPE_AUDIO ? cricket::MEDIA_TYPE_AUDIO
: cricket::MEDIA_TYPE_VIDEO)); : cricket::MEDIA_TYPE_VIDEO));
EXPECT_CALL(*receiver, SetMediaChannel(_)).Times(AtMost(1)); EXPECT_CALL(*receiver, SetMediaChannel(_)).WillRepeatedly(Return());
EXPECT_CALL(*receiver, Stop()); EXPECT_CALL(*receiver, Stop()).WillRepeatedly(Return());
EXPECT_CALL(*receiver, StopAndEndTrack());
return receiver; return receiver;
} }

View File

@ -57,7 +57,7 @@ class MockRtpReceiverInternal : public RtpReceiverInternal {
// RtpReceiverInternal methods. // RtpReceiverInternal methods.
MOCK_METHOD(void, Stop, (), (override)); MOCK_METHOD(void, Stop, (), (override));
MOCK_METHOD(void, StopAndEndTrack, (), (override)); MOCK_METHOD(void, SetSourceEnded, (), (override));
MOCK_METHOD(void, SetMediaChannel, (cricket::MediaChannel*), (override)); MOCK_METHOD(void, SetMediaChannel, (cricket::MediaChannel*), (override));
MOCK_METHOD(void, SetupMediaChannel, (uint32_t), (override)); MOCK_METHOD(void, SetupMediaChannel, (uint32_t), (override));
MOCK_METHOD(void, SetupUnsignaledMediaChannel, (), (override)); MOCK_METHOD(void, SetupUnsignaledMediaChannel, (), (override));

View File

@ -109,76 +109,68 @@ void VideoRtpReceiver::SetDepacketizerToDecoderFrameTransformer(
void VideoRtpReceiver::Stop() { void VideoRtpReceiver::Stop() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
// TODO(deadbeef): Need to do more here to fully stop receiving packets.
source_->SetState(MediaSourceInterface::kEnded); source_->SetState(MediaSourceInterface::kEnded);
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_);
if (media_channel_) {
SetSink(nullptr);
SetMediaChannel_w(nullptr);
}
source_->ClearCallback();
});
}
void VideoRtpReceiver::StopAndEndTrack() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
Stop();
track_->internal()->set_ended(); track_->internal()->set_ended();
} }
void VideoRtpReceiver::RestartMediaChannel(absl::optional<uint32_t> ssrc) { void VideoRtpReceiver::SetSourceEnded() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
source_->SetState(MediaSourceInterface::kEnded);
}
// RTC_RUN_ON(&signaling_thread_checker_)
void VideoRtpReceiver::RestartMediaChannel(absl::optional<uint32_t> ssrc) {
MediaSourceInterface::SourceState state = source_->state(); MediaSourceInterface::SourceState state = source_->state();
// TODO(tommi): Can we restart the media channel without blocking? // TODO(tommi): Can we restart the media channel without blocking?
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] { worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK_RUN_ON(worker_thread_);
if (!media_channel_) { RestartMediaChannel_w(std::move(ssrc), state);
// Ignore further negotiations if we've already been stopped and don't
// have an associated media channel.
return; // Can't restart.
}
const bool encoded_sink_enabled = saved_encoded_sink_enabled_;
if (state != MediaSourceInterface::kInitializing) {
if (ssrc == ssrc_)
return;
// Disconnect from a previous ssrc.
SetSink(nullptr);
if (encoded_sink_enabled)
SetEncodedSinkEnabled(false);
}
// Set up the new ssrc.
ssrc_ = std::move(ssrc);
SetSink(source_->sink());
if (encoded_sink_enabled) {
SetEncodedSinkEnabled(true);
}
if (frame_transformer_ && media_channel_) {
media_channel_->SetDepacketizerToDecoderFrameTransformer(
ssrc_.value_or(0), frame_transformer_);
}
if (media_channel_ && ssrc_) {
if (frame_decryptor_) {
media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_);
}
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
}); });
source_->SetState(MediaSourceInterface::kLive); source_->SetState(MediaSourceInterface::kLive);
} }
// RTC_RUN_ON(worker_thread_)
void VideoRtpReceiver::RestartMediaChannel_w(
absl::optional<uint32_t> ssrc,
MediaSourceInterface::SourceState state) {
if (!media_channel_) {
return; // Can't restart.
}
const bool encoded_sink_enabled = saved_encoded_sink_enabled_;
if (state != MediaSourceInterface::kInitializing) {
if (ssrc == ssrc_)
return;
// Disconnect from a previous ssrc.
SetSink(nullptr);
if (encoded_sink_enabled)
SetEncodedSinkEnabled(false);
}
// Set up the new ssrc.
ssrc_ = std::move(ssrc);
SetSink(source_->sink());
if (encoded_sink_enabled) {
SetEncodedSinkEnabled(true);
}
if (frame_transformer_ && media_channel_) {
media_channel_->SetDepacketizerToDecoderFrameTransformer(
ssrc_.value_or(0), frame_transformer_);
}
if (media_channel_ && ssrc_) {
if (frame_decryptor_) {
media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_);
}
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
}
// RTC_RUN_ON(worker_thread_) // RTC_RUN_ON(worker_thread_)
void VideoRtpReceiver::SetSink(rtc::VideoSinkInterface<VideoFrame>* sink) { void VideoRtpReceiver::SetSink(rtc::VideoSinkInterface<VideoFrame>* sink) {
if (ssrc_) { if (ssrc_) {
@ -266,14 +258,11 @@ void VideoRtpReceiver::SetJitterBufferMinimumDelay(
} }
void VideoRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) { void VideoRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(media_channel == nullptr || RTC_DCHECK(media_channel == nullptr ||
media_channel->media_type() == media_type()); media_channel->media_type() == media_type());
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] { SetMediaChannel_w(media_channel);
RTC_DCHECK_RUN_ON(worker_thread_);
SetMediaChannel_w(media_channel);
});
} }
// RTC_RUN_ON(worker_thread_) // RTC_RUN_ON(worker_thread_)
@ -281,6 +270,10 @@ void VideoRtpReceiver::SetMediaChannel_w(cricket::MediaChannel* media_channel) {
if (media_channel == media_channel_) if (media_channel == media_channel_)
return; return;
if (!media_channel) {
SetSink(nullptr);
}
bool encoded_sink_enabled = saved_encoded_sink_enabled_; bool encoded_sink_enabled = saved_encoded_sink_enabled_;
if (encoded_sink_enabled && media_channel_) { if (encoded_sink_enabled && media_channel_) {
// Turn off the old sink, if any. // Turn off the old sink, if any.
@ -303,6 +296,9 @@ void VideoRtpReceiver::SetMediaChannel_w(cricket::MediaChannel* media_channel) {
ssrc_.value_or(0), frame_transformer_); ssrc_.value_or(0), frame_transformer_);
} }
} }
if (!media_channel)
source_->ClearCallback();
} }
void VideoRtpReceiver::NotifyFirstPacketReceived() { void VideoRtpReceiver::NotifyFirstPacketReceived() {
@ -320,6 +316,19 @@ std::vector<RtpSource> VideoRtpReceiver::GetSources() const {
return media_channel_->GetSources(*ssrc_); return media_channel_->GetSources(*ssrc_);
} }
void VideoRtpReceiver::SetupMediaChannel(absl::optional<uint32_t> ssrc,
cricket::MediaChannel* media_channel) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RTC_DCHECK(media_channel);
MediaSourceInterface::SourceState state = source_->state();
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_);
SetMediaChannel_w(media_channel);
RestartMediaChannel_w(std::move(ssrc), state);
});
source_->SetState(MediaSourceInterface::kLive);
}
void VideoRtpReceiver::OnGenerateKeyFrame() { void VideoRtpReceiver::OnGenerateKeyFrame() {
RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK_RUN_ON(worker_thread_);
if (!media_channel_) { if (!media_channel_) {

View File

@ -88,7 +88,7 @@ class VideoRtpReceiver : public RtpReceiverInternal {
// RtpReceiverInternal implementation. // RtpReceiverInternal implementation.
void Stop() override; void Stop() override;
void StopAndEndTrack() override; void SetSourceEnded() override;
void SetupMediaChannel(uint32_t ssrc) override; void SetupMediaChannel(uint32_t ssrc) override;
void SetupUnsignaledMediaChannel() override; void SetupUnsignaledMediaChannel() override;
uint32_t ssrc() const override; uint32_t ssrc() const override;
@ -110,8 +110,17 @@ class VideoRtpReceiver : public RtpReceiverInternal {
std::vector<RtpSource> GetSources() const override; std::vector<RtpSource> GetSources() const override;
// Combines SetMediaChannel, SetupMediaChannel and
// SetupUnsignaledMediaChannel.
void SetupMediaChannel(absl::optional<uint32_t> ssrc,
cricket::MediaChannel* media_channel);
private: private:
void RestartMediaChannel(absl::optional<uint32_t> ssrc); void RestartMediaChannel(absl::optional<uint32_t> ssrc)
RTC_RUN_ON(&signaling_thread_checker_);
void RestartMediaChannel_w(absl::optional<uint32_t> ssrc,
MediaSourceInterface::SourceState state)
RTC_RUN_ON(worker_thread_);
void SetSink(rtc::VideoSinkInterface<VideoFrame>* sink) void SetSink(rtc::VideoSinkInterface<VideoFrame>* sink)
RTC_RUN_ON(worker_thread_); RTC_RUN_ON(worker_thread_);
void SetMediaChannel_w(cricket::MediaChannel* media_channel) void SetMediaChannel_w(cricket::MediaChannel* media_channel)

View File

@ -60,13 +60,20 @@ class VideoRtpReceiverTest : public testing::Test {
std::string("receiver"), std::string("receiver"),
std::vector<std::string>({"stream"}))) { std::vector<std::string>({"stream"}))) {
worker_thread_->Start(); worker_thread_->Start();
receiver_->SetMediaChannel(&channel_); SetMediaChannel(&channel_);
} }
~VideoRtpReceiverTest() override { ~VideoRtpReceiverTest() override {
// Clear expectations that tests may have set up before calling Stop(). // Clear expectations that tests may have set up before calling
// SetMediaChannel(nullptr).
Mock::VerifyAndClearExpectations(&channel_); Mock::VerifyAndClearExpectations(&channel_);
receiver_->Stop(); receiver_->Stop();
SetMediaChannel(nullptr);
}
void SetMediaChannel(cricket::MediaChannel* media_channel) {
worker_thread_->Invoke<void>(
RTC_FROM_HERE, [&]() { receiver_->SetMediaChannel(media_channel); });
} }
webrtc::VideoTrackSourceInterface* Source() { webrtc::VideoTrackSourceInterface* Source() {
@ -94,23 +101,24 @@ TEST_F(VideoRtpReceiverTest,
MockVideoMediaChannel channel2(nullptr, cricket::VideoOptions()); MockVideoMediaChannel channel2(nullptr, cricket::VideoOptions());
EXPECT_CALL(channel_, GenerateKeyFrame).Times(0); EXPECT_CALL(channel_, GenerateKeyFrame).Times(0);
EXPECT_CALL(channel2, GenerateKeyFrame).Times(0); EXPECT_CALL(channel2, GenerateKeyFrame).Times(0);
receiver_->SetMediaChannel(&channel2); SetMediaChannel(&channel2);
Mock::VerifyAndClearExpectations(&channel2); Mock::VerifyAndClearExpectations(&channel2);
// Generate a key frame. When we switch channel next time, we will have to // Generate a key frame. When we switch channel next time, we will have to
// re-generate it as we don't know if it was eventually received // re-generate it as we don't know if it was eventually received
EXPECT_CALL(channel2, GenerateKeyFrame).Times(1);
Source()->GenerateKeyFrame(); Source()->GenerateKeyFrame();
MockVideoMediaChannel channel3(nullptr, cricket::VideoOptions()); MockVideoMediaChannel channel3(nullptr, cricket::VideoOptions());
EXPECT_CALL(channel3, GenerateKeyFrame); EXPECT_CALL(channel3, GenerateKeyFrame);
receiver_->SetMediaChannel(&channel3); SetMediaChannel(&channel3);
// Switching to a new channel should now not cause calls to GenerateKeyFrame. // Switching to a new channel should now not cause calls to GenerateKeyFrame.
StrictMock<MockVideoMediaChannel> channel4(nullptr, cricket::VideoOptions()); StrictMock<MockVideoMediaChannel> channel4(nullptr, cricket::VideoOptions());
receiver_->SetMediaChannel(&channel4); SetMediaChannel(&channel4);
// We must call Stop() here since the mock media channels live on the stack // We must call SetMediaChannel(nullptr) here since the mock media channels
// and `receiver_` still has a pointer to those objects. // live on the stack and `receiver_` still has a pointer to those objects.
receiver_->Stop(); SetMediaChannel(nullptr);
} }
TEST_F(VideoRtpReceiverTest, EnablesEncodedOutput) { TEST_F(VideoRtpReceiverTest, EnablesEncodedOutput) {
@ -135,7 +143,7 @@ TEST_F(VideoRtpReceiverTest, DisablesEnablesEncodedOutputOnChannelSwitch) {
Source()->AddEncodedSink(&sink); Source()->AddEncodedSink(&sink);
MockVideoMediaChannel channel2(nullptr, cricket::VideoOptions()); MockVideoMediaChannel channel2(nullptr, cricket::VideoOptions());
EXPECT_CALL(channel2, SetRecordableEncodedFrameCallback); EXPECT_CALL(channel2, SetRecordableEncodedFrameCallback);
receiver_->SetMediaChannel(&channel2); SetMediaChannel(&channel2);
Mock::VerifyAndClearExpectations(&channel2); Mock::VerifyAndClearExpectations(&channel2);
// When clearing encoded frame buffer function, we need channel switches // When clearing encoded frame buffer function, we need channel switches
@ -143,11 +151,11 @@ TEST_F(VideoRtpReceiverTest, DisablesEnablesEncodedOutputOnChannelSwitch) {
EXPECT_CALL(channel2, ClearRecordableEncodedFrameCallback); EXPECT_CALL(channel2, ClearRecordableEncodedFrameCallback);
Source()->RemoveEncodedSink(&sink); Source()->RemoveEncodedSink(&sink);
StrictMock<MockVideoMediaChannel> channel3(nullptr, cricket::VideoOptions()); StrictMock<MockVideoMediaChannel> channel3(nullptr, cricket::VideoOptions());
receiver_->SetMediaChannel(&channel3); SetMediaChannel(&channel3);
// We must call Stop() here since the mock media channels live on the stack // We must call SetMediaChannel(nullptr) here since the mock media channels
// and `receiver_` still has a pointer to those objects. // live on the stack and `receiver_` still has a pointer to those objects.
receiver_->Stop(); SetMediaChannel(nullptr);
} }
TEST_F(VideoRtpReceiverTest, BroadcastsEncodedFramesWhenEnabled) { TEST_F(VideoRtpReceiverTest, BroadcastsEncodedFramesWhenEnabled) {