diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc index bd09ac2515..87023e8f88 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.cc @@ -39,7 +39,8 @@ namespace webrtc { enum { - MSG_SEND_QUEUED_OFFER = 301, + MSG_SEND_QUEUED_OFFER = 1, + MSG_GENERATE_ANSWER = 2, }; static const int kGlareMinWaitTime = 2 * 1000; // 2 sec static const int kGlareWaitIntervall = 1 * 1000; // 1 sec @@ -75,8 +76,9 @@ static bool VerifyAnswer(const cricket::SessionDescription* answer_desc) { scoped_refptr PeerConnectionMessage::Create( PeerConnectionMessageType type, - const cricket::SessionDescription* desc) { - return new RefCountImpl (type, desc); + const cricket::SessionDescription* desc, + const cricket::Candidates& candidates) { + return new RefCountImpl (type, desc, candidates); } scoped_refptr PeerConnectionMessage::CreateErrorMessage( @@ -86,22 +88,25 @@ scoped_refptr PeerConnectionMessage::CreateErrorMessage( PeerConnectionMessage::PeerConnectionMessage( PeerConnectionMessageType type, - const cricket::SessionDescription* desc) + const cricket::SessionDescription* desc, + const cricket::Candidates& candidates) : type_(type), + error_code_(kNoError), desc_(desc), - error_code_(kNoError) { + candidates_(candidates) { } PeerConnectionMessage::PeerConnectionMessage(ErrorCode error) : type_(kError), - desc_(NULL), - error_code_(error) { + error_code_(error), + desc_(NULL) { } PeerConnectionSignaling::PeerConnectionSignaling( - cricket::ChannelManager* channel_manager) - : signaling_thread_(talk_base::Thread::Current()), - state_(kIdle), + cricket::ChannelManager* channel_manager, + talk_base::Thread* signaling_thread) + : signaling_thread_(signaling_thread), + state_(kInitializing), ssrc_counter_(0), session_description_factory_(channel_manager) { } @@ -109,11 +114,39 @@ PeerConnectionSignaling::PeerConnectionSignaling( PeerConnectionSignaling::~PeerConnectionSignaling() { } +void PeerConnectionSignaling::Initialize( + const cricket::Candidates& candidates) { + ASSERT(state_ == kInitializing); + if (state_ != kInitializing) + return; + // Store the candidates. + candidates_ = candidates; + // If we have a queued remote offer we need to handle this first. + if (queued_received_offer_.first != NULL) { + state_ = kIdle; + signaling_thread_->Post(this, MSG_GENERATE_ANSWER); + } else if (queued_offers_.size() >0) { + // Else if we have local queued offers. + state_ = PeerConnectionSignaling::kWaitingForAnswer; + signaling_thread_->Post(this, MSG_SEND_QUEUED_OFFER); + } else { + state_ = kIdle; + } +} + void PeerConnectionSignaling::ProcessSignalingMessage( PeerConnectionMessage* message, StreamCollection* local_streams) { + ASSERT(talk_base::Thread::Current() == signaling_thread_); + switch (message->type()) { case PeerConnectionMessage::kOffer: { + queued_received_offer_ = RemoteOfferPair(message, local_streams); + // If we are still Initializing we need to wait before we can handle + // the offer. Queue it and handle it when the state change. + if (state_ == kInitializing) { + break; + } // Don't handle offers when we are waiting for an answer. if (state_ == kWaitingForAnswer) { state_ = kGlare; @@ -131,11 +164,7 @@ void PeerConnectionSignaling::ProcessSignalingMessage( if (state_ == kGlare) { state_ = kIdle; } - // Reset all pending offers. Instead, send the new streams in the answer. - signaling_thread_->Clear(this, MSG_SEND_QUEUED_OFFER, NULL); - queued_offers_.clear(); - GenerateAnswer(message, local_streams); - UpdateRemoteStreams(message->desc()); + signaling_thread_->Post(this, MSG_GENERATE_ANSWER); break; } case PeerConnectionMessage::kAnswer: { @@ -147,11 +176,16 @@ void PeerConnectionSignaling::ProcessSignalingMessage( queued_offers_.pop_front(); UpdateSendingLocalStreams(message->desc(), streams); // Check if we have more offers waiting in the queue. - if (queued_offers_.size() > 0) + if (queued_offers_.size() > 0) { // Send the next offer. signaling_thread_->Post(this, MSG_SEND_QUEUED_OFFER); - else + } else { state_ = PeerConnectionSignaling::kIdle; + } + // Signal the resulting local and remote session description. + SignalUpdateSessionDescription(last_send_offer_->desc(), + message->desc(), + streams.get()); break; } case PeerConnectionMessage::kError: { @@ -168,6 +202,7 @@ void PeerConnectionSignaling::ProcessSignalingMessage( } void PeerConnectionSignaling::CreateOffer(StreamCollection* local_streams) { + ASSERT(talk_base::Thread::Current() == signaling_thread_); queued_offers_.push_back(local_streams); if (state_ == kIdle) { // Check if we can sent a new offer. @@ -177,6 +212,18 @@ void PeerConnectionSignaling::CreateOffer(StreamCollection* local_streams) { } } +// Implement talk_base::MessageHandler. +void PeerConnectionSignaling::OnMessage(talk_base::Message* msg) { + switch (msg->message_id) { + case MSG_SEND_QUEUED_OFFER: + CreateOffer_s(); + break; + case MSG_GENERATE_ANSWER: + CreateAnswer_s(); + break; + } +} + void PeerConnectionSignaling::CreateOffer_s() { ASSERT(queued_offers_.size() > 0); scoped_refptr local_streams(queued_offers_.front()); @@ -189,11 +236,13 @@ void PeerConnectionSignaling::CreateOffer_s() { scoped_refptr offer_message = PeerConnectionMessage::Create(PeerConnectionMessage::kOffer, - offer.release()); + offer.release(), + candidates_); // If we are updating with new streams we need to get an answer // before we can handle a remote offer. // We also need the response before we are allowed to start send media. + last_send_offer_ = offer_message; SignalNewPeerConnectionMessage(offer_message); } @@ -201,17 +250,16 @@ PeerConnectionSignaling::State PeerConnectionSignaling::GetState() { return state_; } -// Implement talk_base::MessageHandler. -void PeerConnectionSignaling::OnMessage(talk_base::Message* msg) { - switch (msg->message_id) { - case MSG_SEND_QUEUED_OFFER: - CreateOffer_s(); - break; - } -} +void PeerConnectionSignaling::CreateAnswer_s() { + scoped_refptr message( + queued_received_offer_.first.release()); + scoped_refptr local_streams( + queued_received_offer_.second.release()); + + // Reset all pending offers. Instead, send the new streams in the answer. + signaling_thread_->Clear(this, MSG_SEND_QUEUED_OFFER, NULL); + queued_offers_.clear(); -void PeerConnectionSignaling::GenerateAnswer(PeerConnectionMessage* message, - StreamCollection* local_streams) { // Create a MediaSessionOptions object with the sources we want to send. cricket::MediaSessionOptions options; options.is_video = true; @@ -224,13 +272,15 @@ void PeerConnectionSignaling::GenerateAnswer(PeerConnectionMessage* message, scoped_refptr answer_message; if (VerifyAnswer(answer.get())) { answer_message = PeerConnectionMessage::Create( - PeerConnectionMessage::kAnswer, answer.release()); + PeerConnectionMessage::kAnswer, answer.release(), candidates_); } else { answer_message = PeerConnectionMessage::CreateErrorMessage( PeerConnectionMessage::kOfferNotAcceptable); } + UpdateRemoteStreams(message->desc()); + // Signal that the new answer is ready to be sent. SignalNewPeerConnectionMessage(answer_message); @@ -239,6 +289,11 @@ void PeerConnectionSignaling::GenerateAnswer(PeerConnectionMessage* message, // have time to receive the signaling message before media arrives? // This is under debate. UpdateSendingLocalStreams(answer_message->desc(), local_streams); + + // Signal the resulting local and remote session description. + SignalUpdateSessionDescription(answer.get(), + message->desc(), + local_streams); } // Fills a MediaSessionOptions struct with the MediaTracks we want to sent given diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h index 701d47eb3e..cbaa82fff7 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling.h @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include "talk/app/webrtc_dev/mediastreamimpl.h" #include "talk/app/webrtc_dev/peerconnection.h" @@ -48,6 +50,8 @@ namespace cricket { class ChannelManager; +class Candidate; +typedef std::vector Candidates; } namespace webrtc { @@ -74,7 +78,8 @@ class PeerConnectionMessage : public RefCount { static scoped_refptr Create( PeerConnectionMessageType type, - const cricket::SessionDescription* desc); + const cricket::SessionDescription* desc, + const cricket::Candidates& candidates); static scoped_refptr CreateErrorMessage( ErrorCode error); @@ -82,18 +87,21 @@ class PeerConnectionMessage : public RefCount { PeerConnectionMessageType type() {return type_;} ErrorCode error() {return error_code_;} const cricket::SessionDescription* desc() {return desc_.get();} + const cricket::Candidates& candidates() {return candidates_;} // TODO(perkj): Add functions for serializing and deserializing this class. protected: PeerConnectionMessage(PeerConnectionMessageType type, - const cricket::SessionDescription* desc); + const cricket::SessionDescription* desc, + const cricket::Candidates& candidates); explicit PeerConnectionMessage(ErrorCode error); private: PeerConnectionMessageType type_; ErrorCode error_code_; talk_base::scoped_ptr desc_; + cricket::Candidates candidates_; }; // PeerConnectionSignaling is a class responsible for handling signaling @@ -102,12 +110,18 @@ class PeerConnectionMessage : public RefCount { // to send a new MediaStream. // It changes the state of local MediaStreams and tracks // when a remote peer is ready to receive media. +// Call Initialize when local Candidates are ready. // Call CreateOffer to negotiate new local streams to send. // Call ProcessSignalingMessage when a new PeerConnectionMessage have been // received from the remote peer. +// Before PeerConnectionSignaling can process an answer or create an offer, +// Initialize have to be called. The last request to create an offer or process +// an answer will be processed after Initialize have been called. class PeerConnectionSignaling : public talk_base::MessageHandler { public: enum State { + // Awaiting the local candidates. + kInitializing, // Ready to sent new offer or receive a new offer. kIdle, // We have sent an offer and expect an answer, or we want to update @@ -118,9 +132,12 @@ class PeerConnectionSignaling : public talk_base::MessageHandler { kGlare }; - explicit PeerConnectionSignaling(cricket::ChannelManager* channel_manager); + PeerConnectionSignaling(cricket::ChannelManager* channel_manager, + talk_base::Thread* signaling_thread); ~PeerConnectionSignaling(); + void Initialize(const cricket::Candidates& candidates); + // Process a received offer/answer from the remote peer. void ProcessSignalingMessage(PeerConnectionMessage* message, StreamCollection* local_streams); @@ -148,15 +165,22 @@ class PeerConnectionSignaling : public talk_base::MessageHandler { // Remote PeerConnection sent an error message. sigslot::signal1 SignalErrorMessageReceived; + // Informs that a new Offer/Answer have been exchanged. + // The parameters are local session description, + // remote session_description, + // local StreamCollection. + sigslot::signal3 SignalUpdateSessionDescription; + private: // Implement talk_base::MessageHandler. virtual void OnMessage(talk_base::Message* msg); void CreateOffer_s(); - void GenerateAnswer(PeerConnectionMessage* message, - StreamCollection* local_streams); + void CreateAnswer_s(); void InitMediaSessionOptions(cricket::MediaSessionOptions* options, - StreamCollection* local_streams); + StreamCollection* local_streams); void UpdateRemoteStreams(const cricket::SessionDescription* remote_desc); void UpdateSendingLocalStreams( @@ -166,6 +190,10 @@ class PeerConnectionSignaling : public talk_base::MessageHandler { typedef std::list > StreamCollectionList; StreamCollectionList queued_offers_; + typedef std::pair, + scoped_refptr > RemoteOfferPair; + RemoteOfferPair queued_received_offer_; + talk_base::Thread* signaling_thread_; State state_; uint32 ssrc_counter_; @@ -177,6 +205,9 @@ class PeerConnectionSignaling : public talk_base::MessageHandler { LocalStreamMap; LocalStreamMap local_streams_; cricket::MediaSessionDescriptionFactory session_description_factory_; + + scoped_refptr last_send_offer_; + cricket::Candidates candidates_; }; } // namespace webrtc diff --git a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc index df49d89801..3a72503f3f 100644 --- a/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc +++ b/third_party_mods/libjingle/source/talk/app/webrtc_dev/peerconnectionsignaling_unittest.cc @@ -82,7 +82,8 @@ class MockMediaStreamObserver : public webrtc::Observer { class MockSignalingObserver : public sigslot::has_slots<> { public: MockSignalingObserver() - : remote_peer_(NULL) { + : update_session_description_counter_(0), + remote_peer_(NULL) { } // New remote stream have been discovered. @@ -103,12 +104,21 @@ class MockSignalingObserver : public sigslot::has_slots<> { void OnSignalingMessage(PeerConnectionMessage* message) { if (remote_peer_) { remote_peer_->ProcessSignalingMessage(message, remote_local_collection_); + // Process posted messages to allow the remote peer to process + // the message. + talk_base::Thread::Current()->ProcessMessages(1); } if (message->type() != PeerConnectionMessage::kError) { last_message = message; } } + void OnUpdateSessionDescription(const cricket::SessionDescription* local, + const cricket::SessionDescription* remote, + StreamCollection* local_streams) { + update_session_description_counter_++; + } + // Tell this object to answer the remote_peer. // remote_local_collection is the local collection the remote peer want to // send in an answer. @@ -133,6 +143,7 @@ class MockSignalingObserver : public sigslot::has_slots<> { virtual ~MockSignalingObserver() {} scoped_refptr last_message; + int update_session_description_counter_; private: MediaStreamMap remote_media_streams_; @@ -147,25 +158,32 @@ class PeerConnectionSignalingTest: public testing::Test { talk_base::Thread::Current())); EXPECT_TRUE(channel_manager_->Init()); - signaling1_.reset(new PeerConnectionSignaling(channel_manager_.get())); + signaling1_.reset(new PeerConnectionSignaling( + channel_manager_.get(), talk_base::Thread::Current())); observer1_.reset(new MockSignalingObserver()); signaling1_->SignalNewPeerConnectionMessage.connect( observer1_.get(), &MockSignalingObserver::OnSignalingMessage); + signaling1_->SignalUpdateSessionDescription.connect( + observer1_.get(), &MockSignalingObserver::OnUpdateSessionDescription); signaling1_->SignalRemoteStreamAdded.connect( observer1_.get(), &MockSignalingObserver::OnRemoteStreamAdded); signaling1_->SignalRemoteStreamRemoved.connect( observer1_.get(), &MockSignalingObserver::OnRemoteStreamRemoved); - signaling2_.reset(new PeerConnectionSignaling(channel_manager_.get())); + signaling2_.reset(new PeerConnectionSignaling( + channel_manager_.get(), talk_base::Thread::Current())); observer2_.reset(new MockSignalingObserver()); signaling2_->SignalNewPeerConnectionMessage.connect( observer2_.get(), &MockSignalingObserver::OnSignalingMessage); + signaling2_->SignalUpdateSessionDescription.connect( + observer2_.get(), &MockSignalingObserver::OnUpdateSessionDescription); signaling2_->SignalRemoteStreamAdded.connect( observer2_.get(), &MockSignalingObserver::OnRemoteStreamAdded); signaling2_->SignalRemoteStreamRemoved.connect( observer2_.get(), &MockSignalingObserver::OnRemoteStreamRemoved); } + cricket::Candidates candidates_; talk_base::scoped_ptr observer1_; talk_base::scoped_ptr observer2_; talk_base::scoped_ptr signaling1_; @@ -203,12 +221,30 @@ TEST_F(PeerConnectionSignalingTest, SimpleOneWayCall) { // Connect all messages sent from Peer2 to be received on Peer1 observer2_->AnswerPeer(signaling1_.get(), local_collection1); - // Peer 1 generates the offer and and send it to Peer2. + // Peer 1 generates the offer. It is not sent since there is no + // local candidates ready. signaling1_->CreateOffer(local_collection1); // Process posted messages. talk_base::Thread::Current()->ProcessMessages(1); + EXPECT_EQ(PeerConnectionSignaling::kInitializing, signaling1_->GetState()); + // Initialize signaling1_ by providing the candidates. + signaling1_->Initialize(candidates_); + EXPECT_EQ(PeerConnectionSignaling::kWaitingForAnswer, + signaling1_->GetState()); + // Process posted messages to allow signaling_1 to send the offer. + talk_base::Thread::Current()->ProcessMessages(1); + + // Verify that signaling_2 is still not initialized. + // Even though it have received an offer. + EXPECT_EQ(PeerConnectionSignaling::kInitializing, signaling2_->GetState()); + + // Provide the candidates to signaling_2 and let it process the offer. + signaling2_->Initialize(candidates_); + talk_base::Thread::Current()->ProcessMessages(1); + + // Verify that the offer/answer have been exchanged and the state is good. EXPECT_EQ(PeerConnectionSignaling::kIdle, signaling1_->GetState()); EXPECT_EQ(PeerConnectionSignaling::kIdle, signaling2_->GetState()); @@ -219,9 +255,16 @@ TEST_F(PeerConnectionSignalingTest, SimpleOneWayCall) { // Verify that PeerConnection2 is aware of the sending stream. EXPECT_TRUE(observer2_->RemoteStream(label) != NULL); + + // Verify that both peers have updated the session descriptions. + EXPECT_EQ(1u, observer1_->update_session_description_counter_); + EXPECT_EQ(1u, observer2_->update_session_description_counter_); } TEST_F(PeerConnectionSignalingTest, Glare) { + // Initialize signaling1_ and signaling_2 by providing the candidates. + signaling1_->Initialize(candidates_); + signaling2_->Initialize(candidates_); // Create a local stream. std::string label(kStreamLabel1); scoped_refptr stream(CreateLocalMediaStream(label)); @@ -275,9 +318,16 @@ TEST_F(PeerConnectionSignalingTest, Glare) { // Verify that PeerConnection2 is aware of the sending stream. EXPECT_TRUE(observer2_->RemoteStream(label) != NULL); + + // Verify that both peers have updated the session descriptions. + EXPECT_EQ(1u, observer1_->update_session_description_counter_); + EXPECT_EQ(1u, observer2_->update_session_description_counter_); } TEST_F(PeerConnectionSignalingTest, AddRemoveStream) { + // Initialize signaling1_ and signaling_2 by providing the candidates. + signaling1_->Initialize(candidates_); + signaling2_->Initialize(candidates_); // Create a local stream. std::string label(kStreamLabel1); scoped_refptr stream(CreateLocalMediaStream(label)); @@ -310,10 +360,13 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) { // Peer 1 creates an empty offer and send it to Peer2. signaling1_->CreateOffer(local_collection1); - // Process posted messages. talk_base::Thread::Current()->ProcessMessages(1); + // Verify that both peers have updated the session descriptions. + EXPECT_EQ(1u, observer1_->update_session_description_counter_); + EXPECT_EQ(1u, observer2_->update_session_description_counter_); + // Peer2 add a stream. local_collection2->AddStream(stream); @@ -327,6 +380,10 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) { // Verify that PeerConnection1 is aware of the sending stream. EXPECT_TRUE(observer1_->RemoteStream(label) != NULL); + // Verify that both peers have updated the session descriptions. + EXPECT_EQ(2u, observer1_->update_session_description_counter_); + EXPECT_EQ(2u, observer2_->update_session_description_counter_); + // Remove the stream local_collection2->RemoveStream(stream); @@ -339,6 +396,10 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) { // Verify that the PeerConnection 2 local stream is now ended. EXPECT_EQ(MediaStream::kEnded, stream_observer1.ready_state); EXPECT_EQ(MediaStreamTrack::kEnded, track_observer1.track_state); + + // Verify that both peers have updated the session descriptions. + EXPECT_EQ(3u, observer1_->update_session_description_counter_); + EXPECT_EQ(3u, observer2_->update_session_description_counter_); } } // namespace webrtc