From ee2c391c153dffab925218b6c475a9b70b24b31f Mon Sep 17 00:00:00 2001 From: "mallinath@webrtc.org" Date: Mon, 3 Oct 2011 20:33:06 +0000 Subject: [PATCH] more webrtc session changes. Transport and TransportChannel handling is complete. Need work on session state. Review URL: http://webrtc-codereview.appspot.com/183005 git-svn-id: http://webrtc.googlecode.com/svn/trunk@679 4adac7df-926f-26a2-2b94-8c16560cd09d --- .../app/webrtc_dev/peerconnectionmessage.h | 1 + .../app/webrtc_dev/peerconnectionsignaling.cc | 18 +- .../app/webrtc_dev/peerconnectionsignaling.h | 2 +- .../peerconnectionsignaling_unittest.cc | 2 +- .../talk/app/webrtc_dev/webrtcsession.cc | 179 +++++++++++++++++- .../talk/app/webrtc_dev/webrtcsession.h | 29 ++- .../app/webrtc_dev/webrtcsession_unittest.cc | 22 ++- .../talk/p2p/client/fakeportallocator.h | 92 +++++++++ 8 files changed, 317 insertions(+), 28 deletions(-) create mode 100644 third_party_mods/libjingle/source/talk/p2p/client/fakeportallocator.h diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionmessage.h b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionmessage.h index e46f772740..6f71067d76 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionmessage.h +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionmessage.h @@ -79,6 +79,7 @@ class PeerConnectionMessage : public RefCount { const cricket::SessionDescription* desc() {return desc_.get();} bool Serialize(std::string* message); + std::vector& candidates() { return candidates_; } protected: PeerConnectionMessage(PeerConnectionMessageType type, diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc index 55cb5b5a44..069c2b4476 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc @@ -142,6 +142,11 @@ void PeerConnectionSignaling::ProcessSignalingMessage( ASSERT(state_ != PeerConnectionSignaling::kIdle); if (state_ == PeerConnectionSignaling::kIdle) return; + // Signal the resulting local and remote session description. + SignalUpdateSessionDescription(last_send_offer_->desc(), + message->desc(), + message->candidates()); + UpdateRemoteStreams(message->desc()); scoped_refptr streams(queued_offers_.front()); queued_offers_.pop_front(); @@ -153,10 +158,6 @@ void PeerConnectionSignaling::ProcessSignalingMessage( } else { state_ = PeerConnectionSignaling::kIdle; } - // Signal the resulting local and remote session description. - SignalUpdateSessionDescription(last_send_offer_->desc(), - message->desc(), - streams.get()); break; } case PeerConnectionMessage::kError: { @@ -249,6 +250,10 @@ void PeerConnectionSignaling::CreateAnswer_s() { answer_message = PeerConnectionMessage::CreateErrorMessage( PeerConnectionMessage::kOfferNotAcceptable); } + // Signal the resulting local and remote session description. + SignalUpdateSessionDescription(answer.get(), + message->desc(), + message->candidates()); // remote candidates UpdateRemoteStreams(message->desc()); @@ -260,11 +265,6 @@ void PeerConnectionSignaling::CreateAnswer_s() { // have time to receive the signaling message before media arrives? // This is under debate. UpdateSendingLocalStreams(answer_message->desc(), local_streams); - - // Signal the resulting local and remote session description. - SignalUpdateSessionDescription(answer.get(), - message->desc(), - local_streams); } // Fills a MediaSessionOptions struct with the MediaTracks we want to sent given diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h index 23d07fa3b2..6acad308d4 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h @@ -124,7 +124,7 @@ class PeerConnectionSignaling : public talk_base::MessageHandler { // local StreamCollection. sigslot::signal3 SignalUpdateSessionDescription; + const cricket::Candidates&> SignalUpdateSessionDescription; private: // Implement talk_base::MessageHandler. diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc index 3a72503f3f..1de0d04789 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc @@ -115,7 +115,7 @@ class MockSignalingObserver : public sigslot::has_slots<> { void OnUpdateSessionDescription(const cricket::SessionDescription* local, const cricket::SessionDescription* remote, - StreamCollection* local_streams) { + const cricket::Candidates& candidates) { update_session_description_counter_++; } diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.cc b/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.cc index ec2f652537..0a1c1721cb 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.cc +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.cc @@ -27,12 +27,15 @@ #include "talk/app/webrtc_dev/webrtcsession.h" +#include "talk/app/webrtc_dev/mediastream.h" +#include "talk/app/webrtc_dev/peerconnection.h" +#include "talk/app/webrtc_dev/peerconnectionsignaling.h" #include "talk/base/helpers.h" #include "talk/base/logging.h" #include "talk/session/phone/channel.h" #include "talk/session/phone/channelmanager.h" -#include "talk/app/webrtc_dev/mediastream.h" -#include "talk/app/webrtc_dev/peerconnectionsignaling.h" + +using cricket::MediaContentDescription; namespace webrtc { @@ -42,14 +45,22 @@ enum { // We allow 30 seconds to establish a connection, otherwise it's an error. static const int kCallSetupTimeout = 30 * 1000; +// Session will accept one candidate per transport channel and dropping other +// candidates generated for that channel. During the session initialization +// one cricket::VoiceChannel and one cricket::VideoChannel will be created with +// rtcp enabled. +static const int kAllowedCandidates = 4; +// TODO(mallinath) - These are magic string used by cricket::VideoChannel. +// These should be moved to a common place. +static const std::string kRtpVideoChannelStr = "video_rtp"; +static const std::string kRtcpVideoChannelStr = "video_rtcp"; WebRtcSession::WebRtcSession(cricket::ChannelManager* channel_manager, + talk_base::Thread* signaling_thread, talk_base::Thread* worker_thread, cricket::PortAllocator* port_allocator, PeerConnectionSignaling* pc_signaling) - // TODO(ronghuawu): get the signaling thread from PeerConnectionSignaling - : cricket::BaseSession(worker_thread, worker_thread, - port_allocator, + : cricket::BaseSession(signaling_thread, worker_thread, port_allocator, talk_base::ToString(talk_base::CreateRandomId()), cricket::NS_JINGLE_RTP, true) { // TODO(mallinath) - Remove initiator flag from BaseSession. As it's being @@ -81,11 +92,73 @@ void WebRtcSession::Terminate() { void WebRtcSession::OnSignalUpdateSessionDescription( const cricket::SessionDescription* local_desc, const cricket::SessionDescription* remote_desc, - StreamCollection* streams) { + const cricket::Candidates& remote_candidates) { + // Session updates are not supported yet. If session is in progress state + // ignore this callback. if (state() == STATE_INPROGRESS) { - // TODO(mallinath) - Handling of session updates is not ready yet. + ProcessSessionUpdate(local_desc, remote_desc); + if (remote_candidates.size() > 0) { + SetRemoteCandidates(remote_candidates); + } return; } + // Local session and remote session descriptions are available before + // any state change. So for session it's doesn't matter it's initiator + // or receiver of the call. Session will be treated as initiator. + // Apply first local description + set_local_description(local_desc); + SetState(STATE_SENTINITIATE); + // Applying remote description on the session + set_remote_description(const_cast(remote_desc)); + SetState(STATE_RECEIVEDACCEPT); + // Set remote candidates + SetRemoteCandidates(remote_candidates); +} + +void WebRtcSession::SetRemoteCandidates( + const cricket::Candidates& candidates) { + // First partition the candidates for the proxies. During creation of channels + // we created CN_AUDIO (audio) and CN_VIDEO (video) proxies. + cricket::Candidates audio_candidates; + cricket::Candidates video_candidates; + for (cricket::Candidates::const_iterator citer = candidates.begin(); + citer != candidates.end(); ++citer) { + if (((*citer).name().compare(kRtpVideoChannelStr) == 0) || + ((*citer).name().compare(kRtcpVideoChannelStr)) == 0) { + // Candidate names for video rtp and rtcp channel + video_candidates.push_back(*citer); + } else { + // Candidates for audio rtp and rtcp channel + // Channel name will be "rtp" and "rtcp" + audio_candidates.push_back(*citer); + } + } + + if (!audio_candidates.empty()) { + cricket::TransportProxy* audio_proxy = GetTransportProxy(cricket::CN_AUDIO); + if (audio_proxy) { + // CompleteNegotiation will set actual impl's in Proxy. + audio_proxy->CompleteNegotiation(); + // TODO(mallinath) - Add a interface to TransportProxy to accept + // remote candidate list. + audio_proxy->impl()->OnRemoteCandidates(audio_candidates); + } else { + LOG(LS_INFO) << "No audio TransportProxy exists"; + } + } + + if (!video_candidates.empty()) { + cricket::TransportProxy* video_proxy = GetTransportProxy(cricket::CN_VIDEO); + if (video_proxy) { + // CompleteNegotiation will set actual impl's in Proxy. + video_proxy->CompleteNegotiation(); + // TODO(mallinath) - Add a interface to TransportProxy to accept + // remote candidate list. + video_proxy->impl()->OnRemoteCandidates(audio_candidates); + } else { + LOG(LS_INFO) << "No video TransportProxy exists"; + } + } } bool WebRtcSession::CreateChannels() { @@ -102,6 +175,12 @@ bool WebRtcSession::CreateChannels() { LOG(LS_ERROR) << "Failed to create video channel"; return false; } + + // TransportProxies and TransportChannels will be created when + // CreateVoiceChannel and CreateVideoChannel are called. + // Try connecting all transport channels. This is necessary to generate + // ICE candidates. + SpeculativelyConnectAllTransportChannels(); return true; } @@ -132,7 +211,13 @@ void WebRtcSession::OnTransportWritable(cricket::Transport* transport) { void WebRtcSession::OnTransportCandidatesReady( cricket::Transport* transport, const cricket::Candidates& candidates) { ASSERT(signaling_thread()->IsCurrent()); - pc_signaling_->Initialize(candidates); + // Drop additional candidates for the same channel; + // local_candidates_ will have one candidate per channel. + if (local_candidates_.size() == kAllowedCandidates) + return; + InsertTransportCandidates(candidates); + if (local_candidates_.size() == kAllowedCandidates) + pc_signaling_->Initialize(candidates); } void WebRtcSession::OnTransportChannelGone(cricket::Transport* transport) { @@ -142,10 +227,88 @@ void WebRtcSession::OnTransportChannelGone(cricket::Transport* transport) { void WebRtcSession::OnMessage(talk_base::Message* msg) { switch (msg->message_id) { case MSG_CANDIDATE_TIMEOUT: + LOG(LS_ERROR) << "Transport is not in writable state."; + SignalError(); break; default: break; } } +void WebRtcSession::InsertTransportCandidates( + const cricket::Candidates& candidates) { + for (cricket::Candidates::const_iterator citer = candidates.begin(); + citer != candidates.end(); ++citer) { + // Find candidates by name, if this channel name not exists in local + // candidate list, store it. + if (!CheckCandidate((*citer).name())) { + local_candidates_.push_back(*citer); + } + } +} + +// Check transport candidate already available for transport channel as only +// one cricket::Candidate allower per channel. +bool WebRtcSession::CheckCandidate(const std::string& name) { + bool ret = false; + for (cricket::Candidates::iterator iter = local_candidates_.begin(); + iter != local_candidates_.end(); ++iter) { + if ((*iter).name().compare(name) == 0) { + ret = true; + break; + } + } + return ret; +} + +void WebRtcSession::ProcessSessionUpdate( + const cricket::SessionDescription* local_desc, + const cricket::SessionDescription* remote_desc) { + + if (local_desc) { + ProcessLocalMediaChanges(local_desc); + } + if (remote_desc) { + ProcessRemoteMediaChanges(remote_desc); + } +} + +bool WebRtcSession::GetAudioSourceParamInfo( + const cricket::SessionDescription* sdesc, + cricket::Sources* sources) { + bool ret = false; + const cricket::ContentInfo* content = GetFirstAudioContent(sdesc); + if (content) { + const MediaContentDescription* audio_desc = + static_cast (content->description); + *sources = audio_desc->sources(); + ret = true; + } + return ret; +} + +bool WebRtcSession::GetVideoSourceParamInfo( + const cricket::SessionDescription* sdesc, + cricket::Sources* sources) { + bool ret = false; + const cricket::ContentInfo* content = GetFirstVideoContent(sdesc); + if (content) { + const MediaContentDescription* video_desc = + static_cast (content->description); + *sources = video_desc->sources(); + ret = true; + } + return ret; +} + +void WebRtcSession::ProcessLocalMediaChanges( + const cricket::SessionDescription* sdesc) { + //TODO(mallinath) - Handling of local media stream changes in active session +} + +void WebRtcSession::ProcessRemoteMediaChanges( + const cricket::SessionDescription* sdesc) { + //TODO(mallinath) - Handling of remote media stream changes in active session +} + } // namespace webrtc diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.h b/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.h index c6c347bfc0..2baf211b3f 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.h +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession.h @@ -28,9 +28,13 @@ #ifndef TALK_APP_WEBRTC_WEBRTCSESSION_H_ #define TALK_APP_WEBRTC_WEBRTCSESSION_H_ +#include +#include + #include "talk/base/sigslot.h" #include "talk/base/thread.h" #include "talk/p2p/base/session.h" +#include "talk/session/phone/mediasession.h" namespace cricket { class ChannelManager; @@ -48,6 +52,7 @@ class StreamCollection; class WebRtcSession : public cricket::BaseSession { public: WebRtcSession(cricket::ChannelManager* channel_manager, + talk_base::Thread* signaling_thread, talk_base::Thread* worker_thread, cricket::PortAllocator* port_allocator, PeerConnectionSignaling* pc_signaling); @@ -62,12 +67,19 @@ class WebRtcSession : public cricket::BaseSession { return video_channel_.get(); } + // Generic error message callback from WebRtcSession. + // TODO(mallinath) - It may be necessary to supply error code as well. + sigslot::signal0<> SignalError; + + void ProcessSessionUpdate(const cricket::SessionDescription* local_desc, + const cricket::SessionDescription* remote_desc); + private: // Callback handling from PeerConnectionSignaling void OnSignalUpdateSessionDescription( const cricket::SessionDescription* local_desc, const cricket::SessionDescription* remote_desc, - StreamCollection* streams); + const cricket::Candidates& remote_candidates); // Transport related callbacks, override from cricket::BaseSession. virtual void OnTransportRequestSignaling(cricket::Transport* transport); @@ -80,14 +92,27 @@ class WebRtcSession : public cricket::BaseSession { // Creates channels for voice and video. bool CreateChannels(); virtual void OnMessage(talk_base::Message* msg); - + void InsertTransportCandidates(const cricket::Candidates& candidates); void Terminate(); + // Get candidate from the local candidates list by the name. + bool CheckCandidate(const std::string& name); + void SetRemoteCandidates(const cricket::Candidates& candidates); + + // Helper methods to get handle to the MediaContentDescription sources param. + bool GetAudioSourceParamInfo(const cricket::SessionDescription* sdesc, + cricket::Sources* sources); + bool GetVideoSourceParamInfo(const cricket::SessionDescription* sdesc, + cricket::Sources* sources); + + void ProcessLocalMediaChanges(const cricket::SessionDescription* sdesc); + void ProcessRemoteMediaChanges(const cricket::SessionDescription* sdesc); private: PeerConnectionSignaling* pc_signaling_; talk_base::scoped_ptr voice_channel_; talk_base::scoped_ptr video_channel_; cricket::ChannelManager* channel_manager_; + cricket::Candidates local_candidates_; }; } // namespace webrtc diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession_unittest.cc b/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession_unittest.cc index 30c0763837..9be0a106bb 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession_unittest.cc +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/webrtcsession_unittest.cc @@ -32,21 +32,27 @@ #include "talk/session/phone/channelmanager.h" #include "talk/p2p/client/fakeportallocator.h" +class MockPeerConnectionSignaling { + +}; + class WebRtcSessionTest : public testing::Test { public: WebRtcSessionTest() { + } + + ~WebRtcSessionTest() { + } + + virtual void SetUp() { signaling_thread_ = talk_base::Thread::Current(); worker_thread_ = talk_base::Thread::Current(); channel_manager_.reset(new cricket::ChannelManager(worker_thread_)); port_allocator_.reset( new cricket::FakePortAllocator(worker_thread_, NULL)); pc_signaling_.reset( - new webrtc::PeerConnectionSignaling(channel_manager_.get())); - ASSERT_TRUE(channel_manager_.get() != NULL); - ASSERT_TRUE(session_.get() == NULL); - } - - ~WebRtcSessionTest() { + new webrtc::PeerConnectionSignaling(channel_manager_.get(), + signaling_thread_)); } bool InitializeSession() { @@ -58,9 +64,11 @@ class WebRtcSessionTest : public testing::Test { } void Init() { + ASSERT_TRUE(channel_manager_.get() != NULL); + ASSERT_TRUE(session_.get() == NULL); EXPECT_TRUE(channel_manager_.get()->Init()); session_.reset(new webrtc::WebRtcSession( - channel_manager_.get(), worker_thread_, + channel_manager_.get(), worker_thread_, signaling_thread_, port_allocator_.get(), pc_signaling_.get())); EXPECT_TRUE(InitializeSession()); EXPECT_TRUE(CheckChannels()); diff --git a/third_party_mods/libjingle/source/talk/p2p/client/fakeportallocator.h b/third_party_mods/libjingle/source/talk/p2p/client/fakeportallocator.h new file mode 100644 index 0000000000..9df5ccaa8f --- /dev/null +++ b/third_party_mods/libjingle/source/talk/p2p/client/fakeportallocator.h @@ -0,0 +1,92 @@ +// Copyright 2010 Google Inc. All Rights Reserved, +// +// Author: Justin Uberti (juberti@google.com) + +#ifndef TALK_P2P_CLIENT_FAKEPORTALLOCATOR_H_ +#define TALK_P2P_CLIENT_FAKEPORTALLOCATOR_H_ + +#include +#include "talk/base/basicpacketsocketfactory.h" +#include "talk/base/scoped_ptr.h" +#include "talk/p2p/base/portallocator.h" +#include "talk/p2p/base/udpport.h" + +namespace talk_base { +class SocketFactory; +class Thread; +} + +namespace cricket { + +class FakePortAllocatorSession : public PortAllocatorSession { + public: + FakePortAllocatorSession(talk_base::Thread* worker_thread, + talk_base::PacketSocketFactory* factory, + const std::string& name, + const std::string& session_type) + : PortAllocatorSession(0), worker_thread_(worker_thread), + factory_(factory), name_(name), + network_("network", "unittest", 0x7F000001, 0), + port_(NULL), running_(false) { + } + + virtual void GetInitialPorts() { + if (!port_.get()) { + port_.reset(cricket::UDPPort::Create(worker_thread_, factory_, + &network_, network_.ip(), 0, 0)); + AddPort(port_.get()); + } + } + virtual void StartGetAllPorts() { running_ = true; } + virtual void StopGetAllPorts() { running_ = false; } + virtual bool IsGettingAllPorts() { return running_; } + + void AddPort(cricket::Port* port) { + port->set_name(name_); + port->set_preference(1.0); + port->set_generation(0); + port->SignalAddressReady.connect( + this, &FakePortAllocatorSession::OnAddressReady); + port->PrepareAddress(); + SignalPortReady(this, port); + } + void OnAddressReady(cricket::Port* port) { + SignalCandidatesReady(this, port->candidates()); + } + + private: + talk_base::Thread* worker_thread_; + talk_base::PacketSocketFactory* factory_; + std::string name_; + talk_base::Network network_; + talk_base::scoped_ptr port_; + bool running_; +}; + +class FakePortAllocator : public cricket::PortAllocator { + public: + FakePortAllocator(talk_base::Thread* worker_thread, + talk_base::PacketSocketFactory* factory) + : worker_thread_(worker_thread), factory_(factory) { + if (factory_ == NULL) { + owned_factory_.reset(new talk_base::BasicPacketSocketFactory( + worker_thread_)); + factory_ = owned_factory_.get(); + } + } + + virtual cricket::PortAllocatorSession* CreateSession( + const std::string &name, const std::string &session_type) { + return new FakePortAllocatorSession(worker_thread_, factory_, name, + session_type); + } + + private: + talk_base::Thread* worker_thread_; + talk_base::PacketSocketFactory* factory_; + talk_base::scoped_ptr owned_factory_; +}; + +} // namespace cricket + +#endif // TALK_P2P_CLIENT_FAKEPORTALLOCATOR_H_