Adds network thread to rtc::BaseChannel

BaseChannel do calls to transport_channel on network_thread,
while keep calls to media_engine on worker_thread.
It still works when network_thread == worker_thread.

BUG=webrtc:5645
R=pthatcher@webrtc.org

Review URL: https://codereview.webrtc.org/1903393004 .

Cr-Commit-Position: refs/heads/master@{#12690}
This commit is contained in:
Danil Chapovalov 2016-05-11 19:55:27 +02:00
parent 3108fc933b
commit 33b01f2162
12 changed files with 1854 additions and 915 deletions

View File

@ -172,8 +172,9 @@ bool PeerConnectionFactory::Initialize() {
worker_thread_->Invoke<cricket::MediaEngineInterface*>(rtc::Bind(
&PeerConnectionFactory::CreateMediaEngine_w, this));
channel_manager_.reset(
new cricket::ChannelManager(media_engine, worker_thread_));
rtc::Thread* const network_thread = worker_thread_;
channel_manager_.reset(new cricket::ChannelManager(
media_engine, worker_thread_, network_thread));
channel_manager_->SetVideoRtxEnabled(true);
if (!channel_manager_->Init()) {

View File

@ -496,12 +496,15 @@ class StatsCollectorForTest : public webrtc::StatsCollector {
class StatsCollectorTest : public testing::Test {
protected:
StatsCollectorTest()
: media_engine_(new cricket::FakeMediaEngine()),
channel_manager_(
new cricket::ChannelManager(media_engine_, rtc::Thread::Current())),
: worker_thread_(rtc::Thread::Current()),
network_thread_(rtc::Thread::Current()),
media_engine_(new cricket::FakeMediaEngine()),
channel_manager_(new cricket::ChannelManager(media_engine_,
worker_thread_,
network_thread_)),
media_controller_(
webrtc::MediaControllerInterface::Create(cricket::MediaConfig(),
rtc::Thread::Current(),
worker_thread_,
channel_manager_.get())),
session_(media_controller_.get()) {
// By default, we ignore session GetStats calls.
@ -760,6 +763,8 @@ class StatsCollectorTest : public testing::Test {
srtp_crypto_suite);
}
rtc::Thread* const worker_thread_;
rtc::Thread* const network_thread_;
cricket::FakeMediaEngine* media_engine_;
std::unique_ptr<cricket::ChannelManager> channel_manager_;
std::unique_ptr<webrtc::MediaControllerInterface> media_controller_;
@ -828,8 +833,9 @@ TEST_F(StatsCollectorTest, BytesCounterHandles64Bits) {
Return(true)));
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, kVideoChannelName, false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, kVideoChannelName,
false);
StatsReports reports; // returned values.
cricket::VideoSenderInfo video_sender_info;
cricket::VideoMediaInfo stats_read;
@ -874,8 +880,9 @@ TEST_F(StatsCollectorTest, BandwidthEstimationInfoIsReported) {
Return(true)));
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, kVideoChannelName, false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, kVideoChannelName,
false);
StatsReports reports; // returned values.
cricket::VideoSenderInfo video_sender_info;
@ -949,8 +956,8 @@ TEST_F(StatsCollectorTest, TrackObjectExistsWithoutUpdateStats) {
StatsCollectorForTest stats(&pc_);
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, "video", false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, "video", false);
AddOutgoingVideoTrackStats();
stats.AddStream(stream_);
@ -985,8 +992,9 @@ TEST_F(StatsCollectorTest, TrackAndSsrcObjectExistAfterUpdateSsrcStats) {
Return(true)));
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, kVideoChannelName, false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, kVideoChannelName,
false);
AddOutgoingVideoTrackStats();
stats.AddStream(stream_);
@ -1053,8 +1061,8 @@ TEST_F(StatsCollectorTest, TransportObjectLinkedFromSsrcObject) {
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
// The transport_name known by the video channel.
const std::string kVcName("vcname");
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, kVcName, false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, kVcName, false);
AddOutgoingVideoTrackStats();
stats.AddStream(stream_);
@ -1111,8 +1119,8 @@ TEST_F(StatsCollectorTest, RemoteSsrcInfoIsAbsent) {
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
// The transport_name known by the video channel.
const std::string kVcName("vcname");
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, kVcName, false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, kVcName, false);
AddOutgoingVideoTrackStats();
stats.AddStream(stream_);
@ -1137,8 +1145,8 @@ TEST_F(StatsCollectorTest, RemoteSsrcInfoIsPresent) {
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
// The transport_name known by the video channel.
const std::string kVcName("vcname");
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, kVcName, false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, kVcName, false);
AddOutgoingVideoTrackStats();
stats.AddStream(stream_);
@ -1192,8 +1200,9 @@ TEST_F(StatsCollectorTest, ReportsFromRemoteTrack) {
Return(true)));
MockVideoMediaChannel* media_channel = new MockVideoMediaChannel();
cricket::VideoChannel video_channel(rtc::Thread::Current(), media_channel,
nullptr, kVideoChannelName, false);
cricket::VideoChannel video_channel(worker_thread_, network_thread_,
media_channel, nullptr, kVideoChannelName,
false);
AddIncomingVideoTrackStats();
stats.AddStream(stream_);
@ -1504,8 +1513,9 @@ TEST_F(StatsCollectorTest, GetStatsFromLocalAudioTrack) {
MockVoiceMediaChannel* media_channel = new MockVoiceMediaChannel();
// The transport_name known by the voice channel.
const std::string kVcName("vcname");
cricket::VoiceChannel voice_channel(rtc::Thread::Current(), media_engine_,
media_channel, nullptr, kVcName, false);
cricket::VoiceChannel voice_channel(worker_thread_, network_thread_,
media_engine_, media_channel, nullptr,
kVcName, false);
AddOutgoingAudioTrackStats();
stats.AddStream(stream_);
stats.AddLocalAudioTrack(audio_track_, kSsrcOfTrack);
@ -1539,8 +1549,9 @@ TEST_F(StatsCollectorTest, GetStatsFromRemoteStream) {
MockVoiceMediaChannel* media_channel = new MockVoiceMediaChannel();
// The transport_name known by the voice channel.
const std::string kVcName("vcname");
cricket::VoiceChannel voice_channel(rtc::Thread::Current(), media_engine_,
media_channel, nullptr, kVcName, false);
cricket::VoiceChannel voice_channel(worker_thread_, network_thread_,
media_engine_, media_channel, nullptr,
kVcName, false);
AddIncomingAudioTrackStats();
stats.AddStream(stream_);
@ -1568,8 +1579,9 @@ TEST_F(StatsCollectorTest, GetStatsAfterRemoveAudioStream) {
MockVoiceMediaChannel* media_channel = new MockVoiceMediaChannel();
// The transport_name known by the voice channel.
const std::string kVcName("vcname");
cricket::VoiceChannel voice_channel(rtc::Thread::Current(), media_engine_,
media_channel, nullptr, kVcName, false);
cricket::VoiceChannel voice_channel(worker_thread_, network_thread_,
media_engine_, media_channel, nullptr,
kVcName, false);
AddOutgoingAudioTrackStats();
stats.AddStream(stream_);
stats.AddLocalAudioTrack(audio_track_.get(), kSsrcOfTrack);
@ -1629,8 +1641,9 @@ TEST_F(StatsCollectorTest, LocalAndRemoteTracksWithSameSsrc) {
MockVoiceMediaChannel* media_channel = new MockVoiceMediaChannel();
// The transport_name known by the voice channel.
const std::string kVcName("vcname");
cricket::VoiceChannel voice_channel(rtc::Thread::Current(), media_engine_,
media_channel, nullptr, kVcName, false);
cricket::VoiceChannel voice_channel(worker_thread_, network_thread_,
media_engine_, media_channel, nullptr,
kVcName, false);
// Create a local stream with a local audio track and adds it to the stats.
AddOutgoingAudioTrackStats();
@ -1716,8 +1729,9 @@ TEST_F(StatsCollectorTest, TwoLocalTracksWithSameSsrc) {
MockVoiceMediaChannel* media_channel = new MockVoiceMediaChannel();
// The transport_name known by the voice channel.
const std::string kVcName("vcname");
cricket::VoiceChannel voice_channel(rtc::Thread::Current(), media_engine_,
media_channel, nullptr, kVcName, false);
cricket::VoiceChannel voice_channel(worker_thread_, network_thread_,
media_engine_, media_channel, nullptr,
kVcName, false);
// Create a local stream with a local audio track and adds it to the stats.
AddOutgoingAudioTrackStats();

View File

@ -1797,8 +1797,8 @@ bool WebRtcSession::CreateVoiceChannel(const cricket::ContentInfo* content) {
this, &WebRtcSession::OnDtlsSetupFailure);
SignalVoiceChannelCreated();
voice_channel_->transport_channel()->SignalSentPacket.connect(
this, &WebRtcSession::OnSentPacket_w);
voice_channel_->SignalSentPacket.connect(this,
&WebRtcSession::OnSentPacket_w);
return true;
}
@ -1814,8 +1814,8 @@ bool WebRtcSession::CreateVideoChannel(const cricket::ContentInfo* content) {
this, &WebRtcSession::OnDtlsSetupFailure);
SignalVideoChannelCreated();
video_channel_->transport_channel()->SignalSentPacket.connect(
this, &WebRtcSession::OnSentPacket_w);
video_channel_->SignalSentPacket.connect(this,
&WebRtcSession::OnSentPacket_w);
return true;
}
@ -1836,8 +1836,7 @@ bool WebRtcSession::CreateDataChannel(const cricket::ContentInfo* content) {
this, &WebRtcSession::OnDtlsSetupFailure);
SignalDataChannelCreated();
data_channel_->transport_channel()->SignalSentPacket.connect(
this, &WebRtcSession::OnSentPacket_w);
data_channel_->SignalSentPacket.connect(this, &WebRtcSession::OnSentPacket_w);
return true;
}
@ -2155,8 +2154,7 @@ void WebRtcSession::ReportNegotiatedCiphers(
}
}
void WebRtcSession::OnSentPacket_w(cricket::TransportChannel* channel,
const rtc::SentPacket& sent_packet) {
void WebRtcSession::OnSentPacket_w(const rtc::SentPacket& sent_packet) {
RTC_DCHECK(worker_thread()->IsCurrent());
media_controller_->call_w()->OnSentPacket(sent_packet);
}

View File

@ -465,8 +465,7 @@ class WebRtcSession : public AudioProviderInterface,
void ReportNegotiatedCiphers(const cricket::TransportStats& stats);
void OnSentPacket_w(cricket::TransportChannel* channel,
const rtc::SentPacket& sent_packet);
void OnSentPacket_w(const rtc::SentPacket& sent_packet);
rtc::Thread* const signaling_thread_;
rtc::Thread* const worker_thread_;

View File

@ -1316,6 +1316,8 @@ class WebRtcSessionTest
SetupLoopbackCall();
// Wait for channel to be ready for sending.
EXPECT_TRUE_WAIT(media_engine_->GetVideoChannel(0)->sending(), 100);
uint8_t test_packet[15] = {0};
rtc::PacketOptions options;
options.packet_id = 10;

View File

@ -55,7 +55,9 @@ template <class Base> class RtpHelper : public Base {
const std::list<std::string>& rtp_packets() const { return rtp_packets_; }
const std::list<std::string>& rtcp_packets() const { return rtcp_packets_; }
bool SendRtp(const void* data, int len, const rtc::PacketOptions& options) {
bool SendRtp(const void* data,
size_t len,
const rtc::PacketOptions& options) {
if (!sending_) {
return false;
}
@ -63,13 +65,13 @@ template <class Base> class RtpHelper : public Base {
kMaxRtpPacketLen);
return Base::SendPacket(&packet, options);
}
bool SendRtcp(const void* data, int len) {
bool SendRtcp(const void* data, size_t len) {
rtc::CopyOnWriteBuffer packet(reinterpret_cast<const uint8_t*>(data), len,
kMaxRtpPacketLen);
return Base::SendRtcp(&packet, rtc::PacketOptions());
}
bool CheckRtp(const void* data, int len) {
bool CheckRtp(const void* data, size_t len) {
bool success = !rtp_packets_.empty();
if (success) {
std::string packet = rtp_packets_.front();
@ -78,7 +80,7 @@ template <class Base> class RtpHelper : public Base {
}
return success;
}
bool CheckRtcp(const void* data, int len) {
bool CheckRtcp(const void* data, size_t len) {
bool success = !rtcp_packets_.empty();
if (success) {
std::string packet = rtcp_packets_.front();

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@
#include <vector>
#include "webrtc/audio_sink.h"
#include "webrtc/base/asyncinvoker.h"
#include "webrtc/base/asyncudpsocket.h"
#include "webrtc/base/criticalsection.h"
#include "webrtc/base/network.h"
@ -47,14 +48,17 @@ namespace cricket {
struct CryptoParams;
class MediaContentDescription;
enum SinkType {
SINK_PRE_CRYPTO, // Sink packets before encryption or after decryption.
SINK_POST_CRYPTO // Sink packets after encryption or before decryption.
};
// BaseChannel contains logic common to voice and video, including
// enable, marshaling calls to a worker thread, and
// enable, marshaling calls to a worker and network threads, and
// connection and media monitors.
// BaseChannel assumes signaling and other threads are allowed to make
// synchronous calls to the worker thread, the worker thread makes synchronous
// calls only to the network thread, and the network thread can't be blocked by
// other threads.
// All methods with _n suffix must be called on network thread,
// methods with _w suffix - on worker thread
// and methods with _s suffix on signaling thread.
// Network and worker threads may be the same thread.
//
// WARNING! SUBCLASSES MUST CALL Deinit() IN THEIR DESTRUCTORS!
// This is required to avoid a data race between the destructor modifying the
@ -66,26 +70,22 @@ class BaseChannel
public MediaChannel::NetworkInterface,
public ConnectionStatsGetter {
public:
BaseChannel(rtc::Thread* thread,
BaseChannel(rtc::Thread* worker_thread,
rtc::Thread* network_thread,
MediaChannel* channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp);
virtual ~BaseChannel();
bool Init();
// Deinit may be called multiple times and is simply ignored if it's alreay
bool Init_w();
// Deinit may be called multiple times and is simply ignored if it's already
// done.
void Deinit();
rtc::Thread* worker_thread() const { return worker_thread_; }
rtc::Thread* network_thread() const { return network_thread_; }
const std::string& content_name() const { return content_name_; }
const std::string& transport_name() const { return transport_name_; }
TransportChannel* transport_channel() const {
return transport_channel_;
}
TransportChannel* rtcp_transport_channel() const {
return rtcp_transport_channel_;
}
bool enabled() const { return enabled_; }
// This function returns true if we are using SRTP.
@ -143,18 +143,28 @@ class BaseChannel
}
sigslot::signal2<BaseChannel*, bool> SignalDtlsSetupFailure;
void SignalDtlsSetupFailure_w(bool rtcp);
void SignalDtlsSetupFailure_n(bool rtcp);
void SignalDtlsSetupFailure_s(bool rtcp);
// Used for latency measurements.
sigslot::signal1<BaseChannel*> SignalFirstPacketReceived;
// Forward TransportChannel SignalSentPacket to worker thread.
sigslot::signal1<const rtc::SentPacket&> SignalSentPacket;
// Only public for unit tests. Otherwise, consider private.
TransportChannel* transport_channel() const { return transport_channel_; }
TransportChannel* rtcp_transport_channel() const {
return rtcp_transport_channel_;
}
// Made public for easier testing.
void SetReadyToSend(bool rtcp, bool ready);
// Only public for unit tests. Otherwise, consider protected.
int SetOption(SocketType type, rtc::Socket::Option o, int val)
override;
int SetOption_n(SocketType type, rtc::Socket::Option o, int val);
SrtpFilter* srtp_filter() { return &srtp_filter_; }
@ -162,11 +172,11 @@ class BaseChannel
virtual MediaChannel* media_channel() const { return media_channel_; }
// Sets the |transport_channel_| (and |rtcp_transport_channel_|, if |rtcp_| is
// true). Gets the transport channels from |transport_controller_|.
bool SetTransport_w(const std::string& transport_name);
bool SetTransport_n(const std::string& transport_name);
void set_transport_channel(TransportChannel* transport);
void set_rtcp_transport_channel(TransportChannel* transport,
bool update_writablity);
void SetTransportChannel_n(TransportChannel* transport);
void SetRtcpTransportChannel_n(TransportChannel* transport,
bool update_writablity);
bool was_ever_writable() const { return was_ever_writable_; }
void set_local_content_direction(MediaContentDirection direction) {
@ -178,8 +188,8 @@ class BaseChannel
void set_secure_required(bool secure_required) {
secure_required_ = secure_required;
}
bool IsReadyToReceive() const;
bool IsReadyToSend() const;
bool IsReadyToReceive_w() const;
bool IsReadyToSend_w() const;
rtc::Thread* signaling_thread() {
return transport_controller_->signaling_thread();
}
@ -188,7 +198,7 @@ class BaseChannel
void ConnectToTransportChannel(TransportChannel* tc);
void DisconnectFromTransportChannel(TransportChannel* tc);
void FlushRtcpMessages();
void FlushRtcpMessages_n();
// NetworkInterface implementation, called by MediaEngine
bool SendPacket(rtc::CopyOnWriteBuffer* packet,
@ -217,28 +227,33 @@ class BaseChannel
bool SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options);
virtual bool WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet);
void HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
const rtc::PacketTime& packet_time);
void OnPacketReceived(bool rtcp,
const rtc::CopyOnWriteBuffer& packet,
const rtc::PacketTime& packet_time);
void EnableMedia_w();
void DisableMedia_w();
void UpdateWritableState_w();
void ChannelWritable_w();
void ChannelNotWritable_w();
void UpdateWritableState_n();
void ChannelWritable_n();
void ChannelNotWritable_n();
bool AddRecvStream_w(const StreamParams& sp);
bool RemoveRecvStream_w(uint32_t ssrc);
bool AddSendStream_w(const StreamParams& sp);
bool RemoveSendStream_w(uint32_t ssrc);
virtual bool ShouldSetupDtlsSrtp() const;
virtual bool ShouldSetupDtlsSrtp_n() const;
// Do the DTLS key expansion and impose it on the SRTP/SRTCP filters.
// |rtcp_channel| indicates whether to set up the RTP or RTCP filter.
bool SetupDtlsSrtp(bool rtcp_channel);
void MaybeSetupDtlsSrtp_w();
bool SetupDtlsSrtp_n(bool rtcp_channel);
void MaybeSetupDtlsSrtp_n();
// Set the DTLS-SRTP cipher policy on this channel as appropriate.
bool SetDtlsSrtpCryptoSuites(TransportChannel* tc, bool rtcp);
bool SetDtlsSrtpCryptoSuites_n(TransportChannel* tc, bool rtcp);
virtual void ChangeState() = 0;
void ChangeState();
virtual void ChangeState_w() = 0;
// Gets the content info appropriate to the channel (audio or video).
virtual const ContentInfo* GetFirstContent(
@ -255,25 +270,29 @@ class BaseChannel
virtual bool SetRemoteContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc) = 0;
bool SetRtpTransportParameters_w(const MediaContentDescription* content,
bool SetRtpTransportParameters(const MediaContentDescription* content,
ContentAction action,
ContentSource src,
std::string* error_desc);
bool SetRtpTransportParameters_n(const MediaContentDescription* content,
ContentAction action,
ContentSource src,
std::string* error_desc);
// Helper method to get RTP Absoulute SendTime extension header id if
// present in remote supported extensions list.
void MaybeCacheRtpAbsSendTimeHeaderExtension(
void MaybeCacheRtpAbsSendTimeHeaderExtension_w(
const std::vector<RtpHeaderExtension>& extensions);
bool CheckSrtpConfig(const std::vector<CryptoParams>& cryptos,
bool* dtls,
std::string* error_desc);
bool SetSrtp_w(const std::vector<CryptoParams>& params,
bool CheckSrtpConfig_n(const std::vector<CryptoParams>& cryptos,
bool* dtls,
std::string* error_desc);
bool SetSrtp_n(const std::vector<CryptoParams>& params,
ContentAction action,
ContentSource src,
std::string* error_desc);
void ActivateRtcpMux_w();
bool SetRtcpMux_w(bool enable,
void ActivateRtcpMux_n();
bool SetRtcpMux_n(bool enable,
ContentAction action,
ContentSource src,
std::string* error_desc);
@ -283,7 +302,7 @@ class BaseChannel
// Handled in derived classes
// Get the SRTP crypto suites to use for RTP media
virtual void GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const = 0;
virtual void GetSrtpCryptoSuites_n(std::vector<int>* crypto_suites) const = 0;
virtual void OnConnectionMonitorUpdate(ConnectionMonitor* monitor,
const std::vector<ConnectionInfo>& infos) = 0;
@ -294,13 +313,23 @@ class BaseChannel
}
private:
rtc::Thread* worker_thread_;
TransportController* transport_controller_;
MediaChannel* media_channel_;
std::vector<StreamParams> local_streams_;
std::vector<StreamParams> remote_streams_;
bool InitNetwork_n();
void DeinitNetwork_n();
void SignalSentPacket_n(TransportChannel* channel,
const rtc::SentPacket& sent_packet);
void SignalSentPacket_w(const rtc::SentPacket& sent_packet);
bool IsTransportReadyToSend_n() const;
void CacheRtpAbsSendTimeHeaderExtension_n(int rtp_abs_sendtime_extn_id);
rtc::Thread* const worker_thread_;
rtc::Thread* const network_thread_;
rtc::AsyncInvoker invoker_;
const std::string content_name_;
std::unique_ptr<ConnectionMonitor> connection_monitor_;
// Transport related members that should be accessed from network thread.
TransportController* const transport_controller_;
std::string transport_name_;
bool rtcp_transport_enabled_;
TransportChannel* transport_channel_;
@ -310,32 +339,40 @@ class BaseChannel
SrtpFilter srtp_filter_;
RtcpMuxFilter rtcp_mux_filter_;
BundleFilter bundle_filter_;
std::unique_ptr<ConnectionMonitor> connection_monitor_;
bool enabled_;
bool writable_;
bool rtp_ready_to_send_;
bool rtcp_ready_to_send_;
bool writable_;
bool was_ever_writable_;
MediaContentDirection local_content_direction_;
MediaContentDirection remote_content_direction_;
bool has_received_packet_;
bool dtls_keyed_;
bool secure_required_;
int rtp_abs_sendtime_extn_id_;
// MediaChannel related members that should be access from worker thread.
MediaChannel* const media_channel_;
// Currently enabled_ flag accessed from signaling thread too, but it can
// be changed only when signaling thread does sunchronious call to worker
// thread, so it should be safe.
bool enabled_;
std::vector<StreamParams> local_streams_;
std::vector<StreamParams> remote_streams_;
MediaContentDirection local_content_direction_;
MediaContentDirection remote_content_direction_;
};
// VoiceChannel is a specialization that adds support for early media, DTMF,
// and input/output level monitoring.
class VoiceChannel : public BaseChannel {
public:
VoiceChannel(rtc::Thread* thread,
VoiceChannel(rtc::Thread* worker_thread,
rtc::Thread* network_thread,
MediaEngineInterface* media_engine,
VoiceMediaChannel* channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp);
~VoiceChannel();
bool Init();
bool Init_w();
// Configure sending media on the stream with SSRC |ssrc|
// If there is only one sending stream SSRC 0 can be used.
@ -345,7 +382,7 @@ class VoiceChannel : public BaseChannel {
AudioSource* source);
// downcasts a MediaChannel
virtual VoiceMediaChannel* media_channel() const {
VoiceMediaChannel* media_channel() const override {
return static_cast<VoiceMediaChannel*>(BaseChannel::media_channel());
}
@ -393,29 +430,31 @@ class VoiceChannel : public BaseChannel {
private:
// overrides from BaseChannel
virtual void OnChannelRead(TransportChannel* channel,
const char* data, size_t len,
const rtc::PacketTime& packet_time,
int flags);
virtual void ChangeState();
virtual const ContentInfo* GetFirstContent(const SessionDescription* sdesc);
virtual bool SetLocalContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc);
virtual bool SetRemoteContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc);
void OnChannelRead(TransportChannel* channel,
const char* data,
size_t len,
const rtc::PacketTime& packet_time,
int flags) override;
void ChangeState_w() override;
const ContentInfo* GetFirstContent(const SessionDescription* sdesc) override;
bool SetLocalContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc) override;
bool SetRemoteContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc) override;
void HandleEarlyMediaTimeout();
bool InsertDtmf_w(uint32_t ssrc, int event, int duration);
bool SetOutputVolume_w(uint32_t ssrc, double volume);
bool GetStats_w(VoiceMediaInfo* stats);
virtual void OnMessage(rtc::Message* pmsg);
virtual void GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const;
virtual void OnConnectionMonitorUpdate(
ConnectionMonitor* monitor, const std::vector<ConnectionInfo>& infos);
virtual void OnMediaMonitorUpdate(
VoiceMediaChannel* media_channel, const VoiceMediaInfo& info);
void OnMessage(rtc::Message* pmsg) override;
void GetSrtpCryptoSuites_n(std::vector<int>* crypto_suites) const override;
void OnConnectionMonitorUpdate(
ConnectionMonitor* monitor,
const std::vector<ConnectionInfo>& infos) override;
void OnMediaMonitorUpdate(VoiceMediaChannel* media_channel,
const VoiceMediaInfo& info);
void OnAudioMonitorUpdate(AudioMonitor* monitor, const AudioInfo& info);
static const int kEarlyMediaTimeout = 1000;
@ -435,16 +474,17 @@ class VoiceChannel : public BaseChannel {
// VideoChannel is a specialization for video.
class VideoChannel : public BaseChannel {
public:
VideoChannel(rtc::Thread* thread,
VideoChannel(rtc::Thread* worker_thread,
rtc::Thread* netwokr_thread,
VideoMediaChannel* channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp);
~VideoChannel();
bool Init();
bool Init_w();
// downcasts a MediaChannel
virtual VideoMediaChannel* media_channel() const {
VideoMediaChannel* media_channel() const override {
return static_cast<VideoMediaChannel*>(BaseChannel::media_channel());
}
@ -469,24 +509,25 @@ class VideoChannel : public BaseChannel {
private:
// overrides from BaseChannel
virtual void ChangeState();
virtual const ContentInfo* GetFirstContent(const SessionDescription* sdesc);
virtual bool SetLocalContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc);
virtual bool SetRemoteContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc);
void ChangeState_w() override;
const ContentInfo* GetFirstContent(const SessionDescription* sdesc) override;
bool SetLocalContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc) override;
bool SetRemoteContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc) override;
bool GetStats_w(VideoMediaInfo* stats);
webrtc::RtpParameters GetRtpParameters_w(uint32_t ssrc) const;
bool SetRtpParameters_w(uint32_t ssrc, webrtc::RtpParameters parameters);
virtual void OnMessage(rtc::Message* pmsg);
virtual void GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const;
virtual void OnConnectionMonitorUpdate(
ConnectionMonitor* monitor, const std::vector<ConnectionInfo>& infos);
virtual void OnMediaMonitorUpdate(
VideoMediaChannel* media_channel, const VideoMediaInfo& info);
void OnMessage(rtc::Message* pmsg) override;
void GetSrtpCryptoSuites_n(std::vector<int>* crypto_suites) const override;
void OnConnectionMonitorUpdate(
ConnectionMonitor* monitor,
const std::vector<ConnectionInfo>& infos) override;
void OnMediaMonitorUpdate(VideoMediaChannel* media_channel,
const VideoMediaInfo& info);
std::unique_ptr<VideoMediaMonitor> media_monitor_;
@ -501,13 +542,14 @@ class VideoChannel : public BaseChannel {
// DataChannel is a specialization for data.
class DataChannel : public BaseChannel {
public:
DataChannel(rtc::Thread* thread,
DataChannel(rtc::Thread* worker_thread,
rtc::Thread* network_thread,
DataMediaChannel* media_channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp);
~DataChannel();
bool Init();
bool Init_w();
virtual bool SendData(const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
@ -535,7 +577,7 @@ class DataChannel : public BaseChannel {
protected:
// downcasts a MediaChannel.
virtual DataMediaChannel* media_channel() const {
DataMediaChannel* media_channel() const override {
return static_cast<DataMediaChannel*>(BaseChannel::media_channel());
}
@ -572,7 +614,7 @@ class DataChannel : public BaseChannel {
typedef rtc::TypedMessageData<bool> DataChannelReadyToSendMessageData;
// overrides from BaseChannel
virtual const ContentInfo* GetFirstContent(const SessionDescription* sdesc);
const ContentInfo* GetFirstContent(const SessionDescription* sdesc) override;
// If data_channel_type_ is DCT_NONE, set it. Otherwise, check that
// it's the same as what was set previously. Returns false if it's
// set to one type one type and changed to another type later.
@ -582,22 +624,23 @@ class DataChannel : public BaseChannel {
// DataContentDescription.
bool SetDataChannelTypeFromContent(const DataContentDescription* content,
std::string* error_desc);
virtual bool SetLocalContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc);
virtual bool SetRemoteContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc);
virtual void ChangeState();
virtual bool WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet);
bool SetLocalContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc) override;
bool SetRemoteContent_w(const MediaContentDescription* content,
ContentAction action,
std::string* error_desc) override;
void ChangeState_w() override;
bool WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) override;
virtual void OnMessage(rtc::Message* pmsg);
virtual void GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const;
virtual void OnConnectionMonitorUpdate(
ConnectionMonitor* monitor, const std::vector<ConnectionInfo>& infos);
virtual void OnMediaMonitorUpdate(
DataMediaChannel* media_channel, const DataMediaInfo& info);
virtual bool ShouldSetupDtlsSrtp() const;
void OnMessage(rtc::Message* pmsg) override;
void GetSrtpCryptoSuites_n(std::vector<int>* crypto_suites) const override;
void OnConnectionMonitorUpdate(
ConnectionMonitor* monitor,
const std::vector<ConnectionInfo>& infos) override;
void OnMediaMonitorUpdate(DataMediaChannel* media_channel,
const DataMediaInfo& info);
bool ShouldSetupDtlsSrtp_n() const override;
void OnDataReceived(
const ReceiveDataParams& params, const char* data, size_t len);
void OnDataChannelError(uint32_t ssrc, DataMediaChannel::Error error);

File diff suppressed because it is too large Load Diff

View File

@ -44,25 +44,26 @@ static DataEngineInterface* ConstructDataEngine() {
ChannelManager::ChannelManager(MediaEngineInterface* me,
DataEngineInterface* dme,
rtc::Thread* worker_thread) {
Construct(me, dme, worker_thread);
rtc::Thread* thread) {
Construct(me, dme, thread, thread);
}
ChannelManager::ChannelManager(MediaEngineInterface* me,
rtc::Thread* worker_thread) {
Construct(me,
ConstructDataEngine(),
worker_thread);
rtc::Thread* worker_thread,
rtc::Thread* network_thread) {
Construct(me, ConstructDataEngine(), worker_thread, network_thread);
}
void ChannelManager::Construct(MediaEngineInterface* me,
DataEngineInterface* dme,
rtc::Thread* worker_thread) {
rtc::Thread* worker_thread,
rtc::Thread* network_thread) {
media_engine_.reset(me);
data_media_engine_.reset(dme);
initialized_ = false;
main_thread_ = rtc::Thread::Current();
worker_thread_ = worker_thread;
network_thread_ = network_thread;
audio_output_volume_ = kNotSetOutputVolume;
capturing_ = false;
enable_rtx_ = false;
@ -144,18 +145,16 @@ bool ChannelManager::Init() {
if (initialized_) {
return false;
}
ASSERT(worker_thread_ != NULL);
if (!worker_thread_) {
return false;
}
if (worker_thread_ != rtc::Thread::Current()) {
// Do not allow invoking calls to other threads on the worker thread.
worker_thread_->Invoke<bool>(rtc::Bind(
&rtc::Thread::SetAllowBlockingCalls, worker_thread_, false));
RTC_DCHECK(network_thread_);
RTC_DCHECK(worker_thread_);
if (!network_thread_->IsCurrent()) {
// Do not allow invoking calls to other threads on the network thread.
network_thread_->Invoke<bool>(
rtc::Bind(&rtc::Thread::SetAllowBlockingCalls, network_thread_, false));
}
initialized_ = worker_thread_->Invoke<bool>(Bind(
&ChannelManager::InitMediaEngine_w, this));
initialized_ = worker_thread_->Invoke<bool>(
Bind(&ChannelManager::InitMediaEngine_w, this));
ASSERT(initialized_);
if (!initialized_) {
return false;
@ -228,9 +227,9 @@ VoiceChannel* ChannelManager::CreateVoiceChannel_w(
return nullptr;
VoiceChannel* voice_channel =
new VoiceChannel(worker_thread_, media_engine_.get(), media_channel,
transport_controller, content_name, rtcp);
if (!voice_channel->Init()) {
new VoiceChannel(worker_thread_, network_thread_, media_engine_.get(),
media_channel, transport_controller, content_name, rtcp);
if (!voice_channel->Init_w()) {
delete voice_channel;
return nullptr;
}
@ -286,9 +285,10 @@ VideoChannel* ChannelManager::CreateVideoChannel_w(
return NULL;
}
VideoChannel* video_channel = new VideoChannel(
worker_thread_, media_channel, transport_controller, content_name, rtcp);
if (!video_channel->Init()) {
VideoChannel* video_channel =
new VideoChannel(worker_thread_, network_thread_, media_channel,
transport_controller, content_name, rtcp);
if (!video_channel->Init_w()) {
delete video_channel;
return NULL;
}
@ -344,9 +344,10 @@ DataChannel* ChannelManager::CreateDataChannel_w(
return NULL;
}
DataChannel* data_channel = new DataChannel(
worker_thread_, media_channel, transport_controller, content_name, rtcp);
if (!data_channel->Init()) {
DataChannel* data_channel =
new DataChannel(worker_thread_, network_thread_, media_channel,
transport_controller, content_name, rtcp);
if (!data_channel->Init_w()) {
LOG(LS_WARNING) << "Failed to init data channel.";
delete data_channel;
return NULL;

View File

@ -42,20 +42,31 @@ class ChannelManager {
// ownership of these objects.
ChannelManager(MediaEngineInterface* me,
DataEngineInterface* dme,
rtc::Thread* worker);
rtc::Thread* worker_and_network);
// Same as above, but gives an easier default DataEngine.
ChannelManager(MediaEngineInterface* me,
rtc::Thread* worker);
rtc::Thread* worker,
rtc::Thread* network);
~ChannelManager();
// Accessors for the worker thread, allowing it to be set after construction,
// but before Init. set_worker_thread will return false if called after Init.
rtc::Thread* worker_thread() const { return worker_thread_; }
bool set_worker_thread(rtc::Thread* thread) {
if (initialized_) return false;
if (initialized_) {
return false;
}
worker_thread_ = thread;
return true;
}
rtc::Thread* network_thread() const { return network_thread_; }
bool set_network_thread(rtc::Thread* thread) {
if (initialized_) {
return false;
}
network_thread_ = thread;
return true;
}
MediaEngineInterface* media_engine() { return media_engine_.get(); }
@ -138,7 +149,8 @@ class ChannelManager {
void Construct(MediaEngineInterface* me,
DataEngineInterface* dme,
rtc::Thread* worker_thread);
rtc::Thread* worker_thread,
rtc::Thread* network_thread);
bool InitMediaEngine_w();
void DestructorDeletes_w();
void Terminate_w();
@ -167,6 +179,7 @@ class ChannelManager {
bool initialized_;
rtc::Thread* main_thread_;
rtc::Thread* worker_thread_;
rtc::Thread* network_thread_;
VoiceChannels voice_channels_;
VideoChannels video_channels_;

View File

@ -56,6 +56,7 @@ class ChannelManagerTest : public testing::Test {
fme_ = NULL;
}
rtc::Thread network_;
rtc::Thread worker_;
cricket::FakeMediaEngine* fme_;
cricket::FakeDataEngine* fdme_;
@ -77,14 +78,18 @@ TEST_F(ChannelManagerTest, StartupShutdown) {
// Test that we startup/shutdown properly with a worker thread.
TEST_F(ChannelManagerTest, StartupShutdownOnThread) {
network_.Start();
worker_.Start();
EXPECT_FALSE(cm_->initialized());
EXPECT_EQ(rtc::Thread::Current(), cm_->worker_thread());
EXPECT_TRUE(cm_->set_network_thread(&network_));
EXPECT_EQ(&network_, cm_->network_thread());
EXPECT_TRUE(cm_->set_worker_thread(&worker_));
EXPECT_EQ(&worker_, cm_->worker_thread());
EXPECT_TRUE(cm_->Init());
EXPECT_TRUE(cm_->initialized());
// Setting the worker thread while initialized should fail.
// Setting the network or worker thread while initialized should fail.
EXPECT_FALSE(cm_->set_network_thread(rtc::Thread::Current()));
EXPECT_FALSE(cm_->set_worker_thread(rtc::Thread::Current()));
cm_->Terminate();
EXPECT_FALSE(cm_->initialized());
@ -112,12 +117,14 @@ TEST_F(ChannelManagerTest, CreateDestroyChannels) {
// Test that we can create and destroy a voice and video channel with a worker.
TEST_F(ChannelManagerTest, CreateDestroyChannelsOnThread) {
network_.Start();
worker_.Start();
EXPECT_TRUE(cm_->set_worker_thread(&worker_));
EXPECT_TRUE(cm_->set_network_thread(&network_));
EXPECT_TRUE(cm_->Init());
delete transport_controller_;
transport_controller_ =
new cricket::FakeTransportController(&worker_, ICEROLE_CONTROLLING);
new cricket::FakeTransportController(&network_, ICEROLE_CONTROLLING);
cricket::VoiceChannel* voice_channel =
cm_->CreateVoiceChannel(&fake_mc_, transport_controller_,
cricket::CN_AUDIO, false, AudioOptions());