/* * Copyright 2004 The WebRTC Project Authors. All rights reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "webrtc/libjingle/session/sessionmanager.h" #include "webrtc/base/common.h" #include "webrtc/base/helpers.h" #include "webrtc/base/logging.h" #include "webrtc/base/scoped_ptr.h" #include "webrtc/base/stringencode.h" #include "webrtc/libjingle/session/p2ptransportparser.h" #include "webrtc/libjingle/session/sessionmessages.h" #include "webrtc/libjingle/xmpp/constants.h" #include "webrtc/libjingle/xmpp/jid.h" #include "webrtc/p2p/base/constants.h" namespace cricket { bool BadMessage(const buzz::QName type, const std::string& text, MessageError* err) { err->SetType(type); err->SetText(text); return false; } Session::Session(SessionManager* session_manager, const std::string& local_name, const std::string& initiator_name, const std::string& sid, const std::string& content_type, SessionClient* client) : BaseSession(session_manager->signaling_thread(), session_manager->worker_thread(), session_manager->port_allocator(), sid, content_type, initiator_name == local_name) { ASSERT(client != NULL); session_manager_ = session_manager; local_name_ = local_name; initiator_name_ = initiator_name; transport_parser_ = new P2PTransportParser(); client_ = client; initiate_acked_ = false; current_protocol_ = PROTOCOL_HYBRID; } Session::~Session() { delete transport_parser_; } bool Session::Initiate(const std::string& to, const SessionDescription* sdesc) { ASSERT(signaling_thread()->IsCurrent()); SessionError error; // Only from STATE_INIT if (state() != STATE_INIT) return false; // Setup for signaling. set_remote_name(to); set_local_description(sdesc); if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()), &error)) { LOG(LS_ERROR) << "Could not create transports: " << error.text; return false; } if (!SendInitiateMessage(sdesc, &error)) { LOG(LS_ERROR) << "Could not send initiate message: " << error.text; return false; } // We need to connect transport proxy and impl here so that we can process // the TransportDescriptions. SpeculativelyConnectAllTransportChannels(); PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL); SetState(Session::STATE_SENTINITIATE); return true; } bool Session::Accept(const SessionDescription* sdesc) { ASSERT(signaling_thread()->IsCurrent()); // Only if just received initiate if (state() != STATE_RECEIVEDINITIATE) return false; // Setup for signaling. set_local_description(sdesc); SessionError error; if (!SendAcceptMessage(sdesc, &error)) { LOG(LS_ERROR) << "Could not send accept message: " << error.text; return false; } // TODO(juberti): Add BUNDLE support to transport-info messages. PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL); MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. SetState(Session::STATE_SENTACCEPT); return true; } bool Session::Reject(const std::string& reason) { ASSERT(signaling_thread()->IsCurrent()); // Reject is sent in response to an initiate or modify, to reject the // request if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY) return false; SessionError error; if (!SendRejectMessage(reason, &error)) { LOG(LS_ERROR) << "Could not send reject message: " << error.text; return false; } SetState(STATE_SENTREJECT); return true; } bool Session::TerminateWithReason(const std::string& reason) { ASSERT(signaling_thread()->IsCurrent()); // Either side can terminate, at any time. switch (state()) { case STATE_SENTTERMINATE: case STATE_RECEIVEDTERMINATE: return false; case STATE_SENTREJECT: case STATE_RECEIVEDREJECT: // We don't need to send terminate if we sent or received a reject... // it's implicit. break; default: SessionError error; if (!SendTerminateMessage(reason, &error)) { LOG(LS_ERROR) << "Could not send terminate message: " << error.text; return false; } break; } SetState(STATE_SENTTERMINATE); return true; } bool Session::SendInfoMessage(const XmlElements& elems, const std::string& remote_name) { ASSERT(signaling_thread()->IsCurrent()); SessionError error; if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) { LOG(LS_ERROR) << "Could not send info message " << error.text; return false; } return true; } bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) { XmlElements elems; WriteError write_error; if (!WriteDescriptionInfo(current_protocol_, contents, GetContentParsers(), &elems, &write_error)) { LOG(LS_ERROR) << "Could not write description info message: " << write_error.text; return false; } SessionError error; if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) { LOG(LS_ERROR) << "Could not send description info message: " << error.text; return false; } return true; } TransportInfos Session::GetEmptyTransportInfos( const ContentInfos& contents) const { TransportInfos tinfos; for (ContentInfos::const_iterator content = contents.begin(); content != contents.end(); ++content) { tinfos.push_back(TransportInfo(content->name, TransportDescription(transport_type(), std::string(), std::string()))); } return tinfos; } bool Session::OnRemoteCandidates( const TransportInfos& tinfos, ParseError* error) { for (TransportInfos::const_iterator tinfo = tinfos.begin(); tinfo != tinfos.end(); ++tinfo) { std::string str_error; if (!BaseSession::OnRemoteCandidates( tinfo->content_name, tinfo->description.candidates, &str_error)) { return BadParse(str_error, error); } } return true; } bool Session::CreateTransportProxies(const TransportInfos& tinfos, SessionError* error) { for (TransportInfos::const_iterator tinfo = tinfos.begin(); tinfo != tinfos.end(); ++tinfo) { if (tinfo->description.transport_type != transport_type()) { error->SetText("No supported transport in offer."); return false; } GetOrCreateTransportProxy(tinfo->content_name); } return true; } TransportParserMap Session::GetTransportParsers() { TransportParserMap parsers; parsers[transport_type()] = transport_parser_; return parsers; } CandidateTranslatorMap Session::GetCandidateTranslators() { CandidateTranslatorMap translators; // NOTE: This technique makes it impossible to parse G-ICE // candidates in session-initiate messages because the channels // aren't yet created at that point. Since we don't use candidates // in session-initiate messages, we should be OK. Once we switch to // ICE, this translation shouldn't be necessary. for (TransportMap::const_iterator iter = transport_proxies().begin(); iter != transport_proxies().end(); ++iter) { translators[iter->first] = iter->second; } return translators; } ContentParserMap Session::GetContentParsers() { ContentParserMap parsers; parsers[content_type()] = client_; // We need to be able parse both RTP-based and SCTP-based Jingle // with the same client. if (content_type() == NS_JINGLE_RTP) { parsers[NS_JINGLE_DRAFT_SCTP] = client_; } return parsers; } void Session::OnTransportRequestSignaling(Transport* transport) { ASSERT(signaling_thread()->IsCurrent()); TransportProxy* transproxy = GetTransportProxy(transport); ASSERT(transproxy != NULL); if (transproxy) { // Reset candidate allocation status for the transport proxy. transproxy->set_candidates_allocated(false); } SignalRequestSignaling(this); } void Session::OnTransportConnecting(Transport* transport) { // This is an indication that we should begin watching the writability // state of the transport. OnTransportWritable(transport); } void Session::OnTransportWritable(Transport* transport) { ASSERT(signaling_thread()->IsCurrent()); // If the transport is not writable, start a timer to make sure that it // becomes writable within a reasonable amount of time. If it does not, we // terminate since we can't actually send data. If the transport is writable, // cancel the timer. Note that writability transitions may occur repeatedly // during the lifetime of the session. signaling_thread()->Clear(this, MSG_TIMEOUT); if (transport->HasChannels() && !transport->writable()) { signaling_thread()->PostDelayed( session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); } } void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy, const Candidates& candidates) { ASSERT(signaling_thread()->IsCurrent()); if (transproxy != NULL) { if (initiator() && !initiate_acked_) { // TODO: This is to work around server re-ordering // messages. We send the candidates once the session-initiate // is acked. Once we have fixed the server to guarantee message // order, we can remove this case. transproxy->AddUnsentCandidates(candidates); } else { if (!transproxy->negotiated()) { transproxy->AddSentCandidates(candidates); } SessionError error; if (!SendTransportInfoMessage(transproxy, candidates, &error)) { LOG(LS_ERROR) << "Could not send transport info message: " << error.text; return; } } } } void Session::OnIncomingMessage(const SessionMessage& msg) { ASSERT(signaling_thread()->IsCurrent()); ASSERT(state() == STATE_INIT || msg.from == remote_name()); if (current_protocol_== PROTOCOL_HYBRID) { if (msg.protocol == PROTOCOL_GINGLE) { current_protocol_ = PROTOCOL_GINGLE; } else { current_protocol_ = PROTOCOL_JINGLE; } } bool valid = false; MessageError error; switch (msg.type) { case ACTION_SESSION_INITIATE: valid = OnInitiateMessage(msg, &error); break; case ACTION_SESSION_INFO: valid = OnInfoMessage(msg); break; case ACTION_SESSION_ACCEPT: valid = OnAcceptMessage(msg, &error); break; case ACTION_SESSION_REJECT: valid = OnRejectMessage(msg, &error); break; case ACTION_SESSION_TERMINATE: valid = OnTerminateMessage(msg, &error); break; case ACTION_TRANSPORT_INFO: valid = OnTransportInfoMessage(msg, &error); break; case ACTION_TRANSPORT_ACCEPT: valid = OnTransportAcceptMessage(msg, &error); break; case ACTION_DESCRIPTION_INFO: valid = OnDescriptionInfoMessage(msg, &error); break; default: valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST, "unknown session message type", &error); } if (valid) { SendAcknowledgementMessage(msg.stanza); } else { SignalErrorMessage(this, msg.stanza, error.type, "modify", error.text, NULL); } } void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza, const buzz::XmlElement* response_stanza, const SessionMessage& msg) { ASSERT(signaling_thread()->IsCurrent()); if (msg.type == ACTION_SESSION_INITIATE) { OnInitiateAcked(); } } void Session::OnInitiateAcked() { // TODO: This is to work around server re-ordering // messages. We send the candidates once the session-initiate // is acked. Once we have fixed the server to guarantee message // order, we can remove this case. if (!initiate_acked_) { initiate_acked_ = true; SessionError error; SendAllUnsentTransportInfoMessages(&error); } } void Session::OnFailedSend(const buzz::XmlElement* orig_stanza, const buzz::XmlElement* error_stanza) { ASSERT(signaling_thread()->IsCurrent()); SessionMessage msg; ParseError parse_error; if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) { LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text << ":" << orig_stanza; return; } // If the error is a session redirect, call OnRedirectError, which will // continue the session with a new remote JID. SessionRedirect redirect; if (FindSessionRedirect(error_stanza, &redirect)) { SessionError error; if (!OnRedirectError(redirect, &error)) { // TODO: Should we send a message back? The standard // says nothing about it. std::ostringstream desc; desc << "Failed to redirect: " << error.text; LOG(LS_ERROR) << desc.str(); SetError(ERROR_RESPONSE, desc.str()); } return; } std::string error_type = "cancel"; const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR); if (error) { error_type = error->Attr(buzz::QN_TYPE); LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n" << "in response to:\n" << orig_stanza->Str(); } else { // don't crash if is missing LOG(LS_ERROR) << "Session error without element, ignoring"; return; } if (msg.type == ACTION_TRANSPORT_INFO) { // Transport messages frequently generate errors because they are sent right // when we detect a network failure. For that reason, we ignore such // errors, because if we do not establish writability again, we will // terminate anyway. The exceptions are transport-specific error tags, // which we pass on to the respective transport. } else if ((error_type != "continue") && (error_type != "wait")) { // We do not set an error if the other side said it is okay to continue // (possibly after waiting). These errors can be ignored. SetError(ERROR_RESPONSE, ""); } } bool Session::OnInitiateMessage(const SessionMessage& msg, MessageError* error) { if (!CheckState(STATE_INIT, error)) return false; SessionInitiate init; if (!ParseSessionInitiate(msg.protocol, msg.action_elem, GetContentParsers(), GetTransportParsers(), GetCandidateTranslators(), &init, error)) return false; SessionError session_error; if (!CreateTransportProxies(init.transports, &session_error)) { return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE, session_error.text, error); } set_remote_name(msg.from); set_initiator_name(msg.initiator); set_remote_description(new SessionDescription(init.ClearContents(), init.transports, init.groups)); // Updating transport with TransportDescription. PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL); SetState(STATE_RECEIVEDINITIATE); // Users of Session may listen to state change and call Reject(). if (state() != STATE_SENTREJECT) { if (!OnRemoteCandidates(init.transports, error)) return false; // TODO(juberti): Auto-generate and push down the local transport answer. // This is necessary for trickling to work with RFC 5245 ICE. } return true; } bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) { if (!CheckState(STATE_SENTINITIATE, error)) return false; SessionAccept accept; if (!ParseSessionAccept(msg.protocol, msg.action_elem, GetContentParsers(), GetTransportParsers(), GetCandidateTranslators(), &accept, error)) { return false; } // If we get an accept, we can assume the initiate has been // received, even if we haven't gotten an IQ response. OnInitiateAcked(); set_remote_description(new SessionDescription(accept.ClearContents(), accept.transports, accept.groups)); // Updating transport with TransportDescription. PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL); MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. SetState(STATE_RECEIVEDACCEPT); if (!OnRemoteCandidates(accept.transports, error)) return false; return true; } bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) { if (!CheckState(STATE_SENTINITIATE, error)) return false; SetState(STATE_RECEIVEDREJECT); return true; } bool Session::OnInfoMessage(const SessionMessage& msg) { SignalInfoMessage(this, msg.action_elem); return true; } bool Session::OnTerminateMessage(const SessionMessage& msg, MessageError* error) { SessionTerminate term; if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error)) return false; SignalReceivedTerminateReason(this, term.reason); if (term.debug_reason != buzz::STR_EMPTY) { LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason; } SetState(STATE_RECEIVEDTERMINATE); return true; } bool Session::OnTransportInfoMessage(const SessionMessage& msg, MessageError* error) { TransportInfos tinfos; if (!ParseTransportInfos(msg.protocol, msg.action_elem, initiator_description()->contents(), GetTransportParsers(), GetCandidateTranslators(), &tinfos, error)) return false; if (!OnRemoteCandidates(tinfos, error)) return false; return true; } bool Session::OnTransportAcceptMessage(const SessionMessage& msg, MessageError* error) { // TODO: Currently here only for compatibility with // Gingle 1.1 clients (notably, Google Voice). return true; } bool Session::OnDescriptionInfoMessage(const SessionMessage& msg, MessageError* error) { if (!CheckState(STATE_INPROGRESS, error)) return false; DescriptionInfo description_info; if (!ParseDescriptionInfo(msg.protocol, msg.action_elem, GetContentParsers(), GetTransportParsers(), GetCandidateTranslators(), &description_info, error)) { return false; } ContentInfos& updated_contents = description_info.contents; // TODO: Currently, reflector sends back // video stream updates even for an audio-only call, which causes // this to fail. Put this back once reflector is fixed. // // ContentInfos::iterator it; // First, ensure all updates are valid before modifying remote_description_. // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { // if (remote_description()->GetContentByName(it->name) == NULL) { // return false; // } // } // TODO: We used to replace contents from an update, but // that no longer works with partial updates. We need to figure out // a way to merge patial updates into contents. For now, users of // Session should listen to SignalRemoteDescriptionUpdate and handle // updates. They should not expect remote_description to be the // latest value. // // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { // remote_description()->RemoveContentByName(it->name); // remote_description()->AddContent(it->name, it->type, it->description); // } // } SignalRemoteDescriptionUpdate(this, updated_contents); return true; } bool BareJidsEqual(const std::string& name1, const std::string& name2) { buzz::Jid jid1(name1); buzz::Jid jid2(name2); return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2); } bool Session::OnRedirectError(const SessionRedirect& redirect, SessionError* error) { MessageError message_error; if (!CheckState(STATE_SENTINITIATE, &message_error)) { return BadWrite(message_error.text, error); } if (!BareJidsEqual(remote_name(), redirect.target)) return BadWrite("Redirection not allowed: must be the same bare jid.", error); // When we receive a redirect, we point the session at the new JID // and resend the candidates. set_remote_name(redirect.target); return (SendInitiateMessage(local_description(), error) && ResendAllTransportInfoMessages(error)); } bool Session::CheckState(State expected, MessageError* error) { if (state() != expected) { // The server can deliver messages out of order/repeated for various // reasons. For example, if the server does not recive our iq response, // it could assume that the iq it sent was lost, and will then send // it again. Ideally, we should implement reliable messaging with // duplicate elimination. return BadMessage(buzz::QN_STANZA_NOT_ALLOWED, "message not allowed in current state", error); } return true; } void Session::SetError(Error error, const std::string& error_desc) { BaseSession::SetError(error, error_desc); if (error != ERROR_NONE) signaling_thread()->Post(this, MSG_ERROR); } void Session::OnMessage(rtc::Message* pmsg) { // preserve this because BaseSession::OnMessage may modify it State orig_state = state(); BaseSession::OnMessage(pmsg); switch (pmsg->message_id) { case MSG_ERROR: TerminateWithReason(STR_TERMINATE_ERROR); break; case MSG_STATE: switch (orig_state) { case STATE_SENTREJECT: case STATE_RECEIVEDREJECT: // Assume clean termination. Terminate(); break; case STATE_SENTTERMINATE: case STATE_RECEIVEDTERMINATE: session_manager_->DestroySession(this); break; default: // Explicitly ignoring some states here. break; } break; } } bool Session::SendInitiateMessage(const SessionDescription* sdesc, SessionError* error) { SessionInitiate init; init.contents = sdesc->contents(); init.transports = GetEmptyTransportInfos(init.contents); init.groups = sdesc->groups(); return SendMessage(ACTION_SESSION_INITIATE, init, error); } bool Session::WriteSessionAction( SignalingProtocol protocol, const SessionInitiate& init, XmlElements* elems, WriteError* error) { return WriteSessionInitiate(protocol, init.contents, init.transports, GetContentParsers(), GetTransportParsers(), GetCandidateTranslators(), init.groups, elems, error); } bool Session::SendAcceptMessage(const SessionDescription* sdesc, SessionError* error) { XmlElements elems; if (!WriteSessionAccept(current_protocol_, sdesc->contents(), GetEmptyTransportInfos(sdesc->contents()), GetContentParsers(), GetTransportParsers(), GetCandidateTranslators(), sdesc->groups(), &elems, error)) { return false; } return SendMessage(ACTION_SESSION_ACCEPT, elems, error); } bool Session::SendRejectMessage(const std::string& reason, SessionError* error) { SessionTerminate term(reason); return SendMessage(ACTION_SESSION_REJECT, term, error); } bool Session::SendTerminateMessage(const std::string& reason, SessionError* error) { SessionTerminate term(reason); return SendMessage(ACTION_SESSION_TERMINATE, term, error); } bool Session::WriteSessionAction(SignalingProtocol protocol, const SessionTerminate& term, XmlElements* elems, WriteError* error) { WriteSessionTerminate(protocol, term, elems); return true; } bool Session::SendTransportInfoMessage(const TransportInfo& tinfo, SessionError* error) { return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error); } bool Session::SendTransportInfoMessage(const TransportProxy* transproxy, const Candidates& candidates, SessionError* error) { return SendTransportInfoMessage(TransportInfo(transproxy->content_name(), TransportDescription(transproxy->type(), std::vector(), std::string(), std::string(), ICEMODE_FULL, CONNECTIONROLE_NONE, NULL, candidates)), error); } bool Session::WriteSessionAction(SignalingProtocol protocol, const TransportInfo& tinfo, XmlElements* elems, WriteError* error) { TransportInfos tinfos; tinfos.push_back(tinfo); return WriteTransportInfos(protocol, tinfos, GetTransportParsers(), GetCandidateTranslators(), elems, error); } bool Session::ResendAllTransportInfoMessages(SessionError* error) { for (TransportMap::const_iterator iter = transport_proxies().begin(); iter != transport_proxies().end(); ++iter) { TransportProxy* transproxy = iter->second; if (transproxy->sent_candidates().size() > 0) { if (!SendTransportInfoMessage( transproxy, transproxy->sent_candidates(), error)) { LOG(LS_ERROR) << "Could not resend transport info messages: " << error->text; return false; } transproxy->ClearSentCandidates(); } } return true; } bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) { for (TransportMap::const_iterator iter = transport_proxies().begin(); iter != transport_proxies().end(); ++iter) { TransportProxy* transproxy = iter->second; if (transproxy->unsent_candidates().size() > 0) { if (!SendTransportInfoMessage( transproxy, transproxy->unsent_candidates(), error)) { LOG(LS_ERROR) << "Could not send unsent transport info messages: " << error->text; return false; } transproxy->ClearUnsentCandidates(); } } return true; } bool Session::SendMessage(ActionType type, const XmlElements& action_elems, SessionError* error) { return SendMessage(type, action_elems, remote_name(), error); } bool Session::SendMessage(ActionType type, const XmlElements& action_elems, const std::string& remote_name, SessionError* error) { rtc::scoped_ptr stanza( new buzz::XmlElement(buzz::QN_IQ)); SessionMessage msg(current_protocol_, type, id(), initiator_name()); msg.to = remote_name; WriteSessionMessage(msg, action_elems, stanza.get()); SignalOutgoingMessage(this, stanza.get()); return true; } template bool Session::SendMessage(ActionType type, const Action& action, SessionError* error) { rtc::scoped_ptr stanza( new buzz::XmlElement(buzz::QN_IQ)); if (!WriteActionMessage(type, action, stanza.get(), error)) return false; SignalOutgoingMessage(this, stanza.get()); return true; } template bool Session::WriteActionMessage(ActionType type, const Action& action, buzz::XmlElement* stanza, WriteError* error) { if (current_protocol_ == PROTOCOL_HYBRID) { if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error)) return false; if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error)) return false; } else { if (!WriteActionMessage(current_protocol_, type, action, stanza, error)) return false; } return true; } template bool Session::WriteActionMessage(SignalingProtocol protocol, ActionType type, const Action& action, buzz::XmlElement* stanza, WriteError* error) { XmlElements action_elems; if (!WriteSessionAction(protocol, action, &action_elems, error)) return false; SessionMessage msg(protocol, type, id(), initiator_name()); msg.to = remote_name(); WriteSessionMessage(msg, action_elems, stanza); return true; } void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) { rtc::scoped_ptr ack( new buzz::XmlElement(buzz::QN_IQ)); ack->SetAttr(buzz::QN_TO, remote_name()); ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID)); ack->SetAttr(buzz::QN_TYPE, "result"); SignalOutgoingMessage(this, ack.get()); } SessionManager::SessionManager(PortAllocator *allocator, rtc::Thread *worker) { allocator_ = allocator; signaling_thread_ = rtc::Thread::Current(); if (worker == NULL) { worker_thread_ = rtc::Thread::Current(); } else { worker_thread_ = worker; } timeout_ = 50; } SessionManager::~SessionManager() { // Note: Session::Terminate occurs asynchronously, so it's too late to // delete them now. They better be all gone. ASSERT(session_map_.empty()); // TerminateAll(); SignalDestroyed(); } void SessionManager::AddClient(const std::string& content_type, SessionClient* client) { ASSERT(client_map_.find(content_type) == client_map_.end()); client_map_[content_type] = client; } void SessionManager::RemoveClient(const std::string& content_type) { ClientMap::iterator iter = client_map_.find(content_type); ASSERT(iter != client_map_.end()); client_map_.erase(iter); } SessionClient* SessionManager::GetClient(const std::string& content_type) { ClientMap::iterator iter = client_map_.find(content_type); return (iter != client_map_.end()) ? iter->second : NULL; } Session* SessionManager::CreateSession(const std::string& local_name, const std::string& content_type) { std::string id; return CreateSession(id, local_name, content_type); } Session* SessionManager::CreateSession(const std::string& id, const std::string& local_name, const std::string& content_type) { std::string sid = id.empty() ? rtc::ToString(rtc::CreateRandomId64()) : id; return CreateSession(local_name, local_name, sid, content_type, false); } Session* SessionManager::CreateSession( const std::string& local_name, const std::string& initiator_name, const std::string& sid, const std::string& content_type, bool received_initiate) { SessionClient* client = GetClient(content_type); ASSERT(client != NULL); Session* session = new Session(this, local_name, initiator_name, sid, content_type, client); session->SetIdentity(transport_desc_factory_.identity()); session_map_[session->id()] = session; session->SignalRequestSignaling.connect( this, &SessionManager::OnRequestSignaling); session->SignalOutgoingMessage.connect( this, &SessionManager::OnOutgoingMessage); session->SignalErrorMessage.connect(this, &SessionManager::OnErrorMessage); SignalSessionCreate(session, received_initiate); session->client()->OnSessionCreate(session, received_initiate); return session; } void SessionManager::DestroySession(Session* session) { if (session != NULL) { SessionMap::iterator it = session_map_.find(session->id()); if (it != session_map_.end()) { SignalSessionDestroy(session); session->client()->OnSessionDestroy(session); session_map_.erase(it); delete session; } } } Session* SessionManager::GetSession(const std::string& sid) { SessionMap::iterator it = session_map_.find(sid); if (it != session_map_.end()) return it->second; return NULL; } void SessionManager::TerminateAll() { while (session_map_.begin() != session_map_.end()) { Session* session = session_map_.begin()->second; session->Terminate(); } } bool SessionManager::IsSessionMessage(const buzz::XmlElement* stanza) { return cricket::IsSessionMessage(stanza); } Session* SessionManager::FindSession(const std::string& sid, const std::string& remote_name) { SessionMap::iterator iter = session_map_.find(sid); if (iter == session_map_.end()) return NULL; Session* session = iter->second; if (buzz::Jid(remote_name) != buzz::Jid(session->remote_name())) return NULL; return session; } void SessionManager::OnIncomingMessage(const buzz::XmlElement* stanza) { SessionMessage msg; ParseError error; if (!ParseSessionMessage(stanza, &msg, &error)) { SendErrorMessage(stanza, buzz::QN_STANZA_BAD_REQUEST, "modify", error.text, NULL); return; } Session* session = FindSession(msg.sid, msg.from); if (session) { session->OnIncomingMessage(msg); return; } if (msg.type != ACTION_SESSION_INITIATE) { SendErrorMessage(stanza, buzz::QN_STANZA_BAD_REQUEST, "modify", "unknown session", NULL); return; } std::string content_type; if (!ParseContentType(msg.protocol, msg.action_elem, &content_type, &error)) { SendErrorMessage(stanza, buzz::QN_STANZA_BAD_REQUEST, "modify", error.text, NULL); return; } if (!GetClient(content_type)) { SendErrorMessage(stanza, buzz::QN_STANZA_BAD_REQUEST, "modify", "unknown content type: " + content_type, NULL); return; } session = CreateSession(msg.to, msg.initiator, msg.sid, content_type, true); session->OnIncomingMessage(msg); } void SessionManager::OnIncomingResponse(const buzz::XmlElement* orig_stanza, const buzz::XmlElement* response_stanza) { if (orig_stanza == NULL || response_stanza == NULL) { return; } SessionMessage msg; ParseError error; if (!ParseSessionMessage(orig_stanza, &msg, &error)) { LOG(LS_WARNING) << "Error parsing incoming response: " << error.text << ":" << orig_stanza; return; } Session* session = FindSession(msg.sid, msg.to); if (!session) { // Also try the QN_FROM in the response stanza, in case we sent the request // to a bare JID but got the response from a full JID. std::string ack_from = response_stanza->Attr(buzz::QN_FROM); session = FindSession(msg.sid, ack_from); } if (session) { session->OnIncomingResponse(orig_stanza, response_stanza, msg); } } void SessionManager::OnFailedSend(const buzz::XmlElement* orig_stanza, const buzz::XmlElement* error_stanza) { SessionMessage msg; ParseError error; if (!ParseSessionMessage(orig_stanza, &msg, &error)) { return; // TODO: log somewhere? } Session* session = FindSession(msg.sid, msg.to); if (session) { rtc::scoped_ptr synthetic_error; if (!error_stanza) { // A failed send is semantically equivalent to an error response, so we // can just turn the former into the latter. synthetic_error.reset( CreateErrorMessage(orig_stanza, buzz::QN_STANZA_ITEM_NOT_FOUND, "cancel", "Recipient did not respond", NULL)); error_stanza = synthetic_error.get(); } session->OnFailedSend(orig_stanza, error_stanza); } } void SessionManager::SendErrorMessage(const buzz::XmlElement* stanza, const buzz::QName& name, const std::string& type, const std::string& text, const buzz::XmlElement* extra_info) { rtc::scoped_ptr msg( CreateErrorMessage(stanza, name, type, text, extra_info)); SignalOutgoingMessage(this, msg.get()); } buzz::XmlElement* SessionManager::CreateErrorMessage( const buzz::XmlElement* stanza, const buzz::QName& name, const std::string& type, const std::string& text, const buzz::XmlElement* extra_info) { buzz::XmlElement* iq = new buzz::XmlElement(buzz::QN_IQ); iq->SetAttr(buzz::QN_TO, stanza->Attr(buzz::QN_FROM)); iq->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID)); iq->SetAttr(buzz::QN_TYPE, "error"); CopyXmlChildren(stanza, iq); buzz::XmlElement* error = new buzz::XmlElement(buzz::QN_ERROR); error->SetAttr(buzz::QN_TYPE, type); iq->AddElement(error); // If the error name is not in the standard namespace, we have to first add // some error from that namespace. if (name.Namespace() != buzz::NS_STANZA) { error->AddElement( new buzz::XmlElement(buzz::QN_STANZA_UNDEFINED_CONDITION)); } error->AddElement(new buzz::XmlElement(name)); if (extra_info) error->AddElement(new buzz::XmlElement(*extra_info)); if (text.size() > 0) { // It's okay to always use English here. This text is for debugging // purposes only. buzz::XmlElement* text_elem = new buzz::XmlElement(buzz::QN_STANZA_TEXT); text_elem->SetAttr(buzz::QN_XML_LANG, "en"); text_elem->SetBodyText(text); error->AddElement(text_elem); } // TODO: Should we include error codes as well for SIP compatibility? return iq; } void SessionManager::OnOutgoingMessage(Session* session, const buzz::XmlElement* stanza) { SignalOutgoingMessage(this, stanza); } void SessionManager::OnErrorMessage(BaseSession* session, const buzz::XmlElement* stanza, const buzz::QName& name, const std::string& type, const std::string& text, const buzz::XmlElement* extra_info) { SendErrorMessage(stanza, name, type, text, extra_info); } void SessionManager::OnSignalingReady() { for (SessionMap::iterator it = session_map_.begin(); it != session_map_.end(); ++it) { it->second->OnSignalingReady(); } } void SessionManager::OnRequestSignaling(Session* session) { SignalRequestSignaling(); } } // namespace cricket