diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc index 1897b73fc8..559eec5343 100644 --- a/talk/app/webrtc/datachannel.cc +++ b/talk/app/webrtc/datachannel.cc @@ -109,24 +109,26 @@ DataChannel::DataChannel( state_(kConnecting), data_channel_type_(dct), provider_(provider), - waiting_for_open_ack_(false), - was_ever_writable_(false), + handshake_state_(kHandshakeInit), connected_to_provider_(false), send_ssrc_set_(false), receive_ssrc_set_(false), + writable_(false), send_ssrc_(0), receive_ssrc_(0) { } bool DataChannel::Init(const InternalDataChannelInit& config) { - if (data_channel_type_ == cricket::DCT_RTP && - (config.reliable || - config.id != -1 || - config.maxRetransmits != -1 || - config.maxRetransmitTime != -1)) { - LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " - << "invalid DataChannelInit."; - return false; + if (data_channel_type_ == cricket::DCT_RTP) { + if (config.reliable || + config.id != -1 || + config.maxRetransmits != -1 || + config.maxRetransmitTime != -1) { + LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " + << "invalid DataChannelInit."; + return false; + } + handshake_state_ = kHandshakeReady; } else if (data_channel_type_ == cricket::DCT_SCTP) { if (config.id < -1 || config.maxRetransmits < -1 || @@ -142,6 +144,18 @@ bool DataChannel::Init(const InternalDataChannelInit& config) { } config_ = config; + switch (config_.open_handshake_role) { + case webrtc::InternalDataChannelInit::kNone: // pre-negotiated + handshake_state_ = kHandshakeReady; + break; + case webrtc::InternalDataChannelInit::kOpener: + handshake_state_ = kHandshakeShouldSendOpen; + break; + case webrtc::InternalDataChannelInit::kAcker: + handshake_state_ = kHandshakeShouldSendAck; + break; + }; + // Try to connect to the transport in case the transport channel already // exists. OnTransportChannelCreated(); @@ -298,7 +312,7 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel, if (params.type == cricket::DMT_CONTROL) { ASSERT(data_channel_type_ == cricket::DCT_SCTP); - if (!waiting_for_open_ack_) { + if (handshake_state_ != kHandshakeWaitingForAck) { // Ignore it if we are not expecting an ACK message. LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, " << "sid = " << params.ssrc; @@ -306,7 +320,7 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel, } if (ParseDataChannelOpenAckMessage(payload)) { // We can send unordered as soon as we receive the ACK message. - waiting_for_open_ack_ = false; + handshake_state_ = kHandshakeReady; LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = " << params.ssrc; } else { @@ -323,11 +337,13 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel, // We can send unordered as soon as we receive any DATA message since the // remote side must have received the OPEN (and old clients do not send // OPEN_ACK). - waiting_for_open_ack_ = false; + if (handshake_state_ == kHandshakeWaitingForAck) { + handshake_state_ = kHandshakeReady; + } bool binary = (params.type == cricket::DMT_BINARY); rtc::scoped_ptr buffer(new DataBuffer(payload, binary)); - if (was_ever_writable_ && observer_) { + if (state_ == kOpen && observer_) { observer_->OnMessage(*buffer.get()); } else { if (queued_received_data_.byte_count() + payload.size() > @@ -346,38 +362,14 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel, } void DataChannel::OnChannelReady(bool writable) { + writable_ = writable; if (!writable) { return; } - // Update the readyState and send the queued control message if the channel - // is writable for the first time; otherwise it means the channel was blocked - // for sending and now unblocked, so send the queued data now. - if (!was_ever_writable_) { - was_ever_writable_ = true; - if (data_channel_type_ == cricket::DCT_SCTP) { - rtc::Buffer payload; - - if (config_.open_handshake_role == InternalDataChannelInit::kOpener) { - WriteDataChannelOpenMessage(label_, config_, &payload); - SendControlMessage(payload); - } else if (config_.open_handshake_role == - InternalDataChannelInit::kAcker) { - WriteDataChannelOpenAckMessage(&payload); - SendControlMessage(payload); - } - } - - UpdateState(); - ASSERT(queued_send_data_.Empty()); - } else if (state_ == kOpen) { - // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition - // that the readyState is open. According to the standard, the channel - // should not become open before the OPEN message is sent. - SendQueuedControlMessages(); - - SendQueuedDataMessages(); - } + SendQueuedControlMessages(); + SendQueuedDataMessages(); + UpdateState(); } void DataChannel::DoClose() { @@ -391,20 +383,34 @@ void DataChannel::DoClose() { } void DataChannel::UpdateState() { + // UpdateState determines what to do from a few state variables. Include + // all conditions required for each state transition here for + // clarity. OnChannelReady(true) will send any queued data and then invoke + // UpdateState(). switch (state_) { case kConnecting: { if (send_ssrc_set_ == receive_ssrc_set_) { if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) { connected_to_provider_ = provider_->ConnectDataChannel(this); } - if (was_ever_writable_) { - // TODO(jiayl): Do not transition to kOpen if we failed to send the - // OPEN message. - SendQueuedControlMessages(); - SetState(kOpen); - // If we have received buffers before the channel got writable. - // Deliver them now. - DeliverQueuedReceivedData(); + if (connected_to_provider_) { + if (handshake_state_ == kHandshakeShouldSendOpen) { + rtc::Buffer payload; + WriteDataChannelOpenMessage(label_, config_, &payload); + SendControlMessage(payload); + } else if (handshake_state_ == kHandshakeShouldSendAck) { + rtc::Buffer payload; + WriteDataChannelOpenAckMessage(&payload); + SendControlMessage(payload); + } + if (writable_ && + (handshake_state_ == kHandshakeReady || + handshake_state_ == kHandshakeWaitingForAck)) { + SetState(kOpen); + // If we have received buffers before the channel got writable. + // Deliver them now. + DeliverQueuedReceivedData(); + } } } break; @@ -413,10 +419,14 @@ void DataChannel::UpdateState() { break; } case kClosing: { - DisconnectFromTransport(); + if (queued_send_data_.Empty() && queued_control_data_.Empty()) { + if (connected_to_provider_) { + DisconnectFromProvider(); + } - if (!send_ssrc_set_ && !receive_ssrc_set_) { - SetState(kClosed); + if (!connected_to_provider_ && !send_ssrc_set_ && !receive_ssrc_set_) { + SetState(kClosed); + } } break; } @@ -435,7 +445,7 @@ void DataChannel::SetState(DataState state) { } } -void DataChannel::DisconnectFromTransport() { +void DataChannel::DisconnectFromProvider() { if (!connected_to_provider_) return; @@ -448,7 +458,7 @@ void DataChannel::DisconnectFromTransport() { } void DataChannel::DeliverQueuedReceivedData() { - if (!was_ever_writable_ || !observer_) { + if (!observer_) { return; } @@ -460,7 +470,11 @@ void DataChannel::DeliverQueuedReceivedData() { } void DataChannel::SendQueuedDataMessages() { - ASSERT(was_ever_writable_ && state_ == kOpen); + if (queued_send_data_.Empty()) { + return; + } + + ASSERT(state_ == kOpen || state_ == kClosing); while (!queued_send_data_.Empty()) { DataBuffer* buffer = queued_send_data_.Front(); @@ -479,8 +493,8 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, if (data_channel_type_ == cricket::DCT_SCTP) { send_params.ordered = config_.ordered; - // Send as ordered if it is waiting for the OPEN_ACK message. - if (waiting_for_open_ack_ && !config_.ordered) { + // Send as ordered if it is still going through OPEN/ACK signaling. + if (handshake_state_ != kHandshakeReady && !config_.ordered) { send_params.ordered = true; LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel " << "because the OPEN_ACK message has not been received."; @@ -529,8 +543,6 @@ bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { } void DataChannel::SendQueuedControlMessages() { - ASSERT(was_ever_writable_); - PacketQueue control_packets; control_packets.Swap(&queued_control_data_); @@ -546,16 +558,18 @@ void DataChannel::QueueControlMessage(const rtc::Buffer& buffer) { } bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) { - bool is_open_message = - (config_.open_handshake_role == InternalDataChannelInit::kOpener); + bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; ASSERT(data_channel_type_ == cricket::DCT_SCTP && - was_ever_writable_ && + writable_ && config_.id >= 0 && (!is_open_message || !config_.negotiated)); cricket::SendDataParams send_params; send_params.ssrc = config_.id; + // Send data as ordered before we receive any message from the remote peer to + // make sure the remote peer will not receive any data before it receives the + // OPEN message. send_params.ordered = config_.ordered || is_open_message; send_params.type = cricket::DMT_CONTROL; @@ -564,11 +578,10 @@ bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) { if (retval) { LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id; - if (is_open_message) { - // Send data as ordered before we receive any message from the remote peer - // to make sure the remote peer will not receive any data before it - // receives the OPEN message. - waiting_for_open_ack_ = true; + if (handshake_state_ == kHandshakeShouldSendAck) { + handshake_state_ = kHandshakeReady; + } else if (handshake_state_ == kHandshakeShouldSendOpen) { + handshake_state_ = kHandshakeWaitingForAck; } } else if (send_result == cricket::SDR_BLOCK) { QueueControlMessage(buffer); diff --git a/talk/app/webrtc/datachannel.h b/talk/app/webrtc/datachannel.h index fe8fac1c67..8e58d0664b 100644 --- a/talk/app/webrtc/datachannel.h +++ b/talk/app/webrtc/datachannel.h @@ -204,11 +204,20 @@ class DataChannel : public DataChannelInterface, size_t byte_count_; }; + // The OPEN(_ACK) signaling state. + enum HandshakeState { + kHandshakeInit, + kHandshakeShouldSendOpen, + kHandshakeShouldSendAck, + kHandshakeWaitingForAck, + kHandshakeReady + }; + bool Init(const InternalDataChannelInit& config); void DoClose(); void UpdateState(); void SetState(DataState state); - void DisconnectFromTransport(); + void DisconnectFromProvider(); void DeliverQueuedReceivedData(); @@ -226,11 +235,11 @@ class DataChannel : public DataChannelInterface, DataState state_; cricket::DataChannelType data_channel_type_; DataChannelProviderInterface* provider_; - bool waiting_for_open_ack_; - bool was_ever_writable_; + HandshakeState handshake_state_; bool connected_to_provider_; bool send_ssrc_set_; bool receive_ssrc_set_; + bool writable_; uint32 send_ssrc_; uint32 receive_ssrc_; // Control messages that always have to get sent out before any queued diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc index ab5dbe9a1b..bc4f81c09f 100644 --- a/talk/app/webrtc/datachannel_unittest.cc +++ b/talk/app/webrtc/datachannel_unittest.cc @@ -269,6 +269,41 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) { EXPECT_FALSE(provider_.last_send_data_params().ordered); } +// Tests that the channel can't open until it's successfully sent the OPEN +// message. +TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) { + webrtc::DataBuffer buffer("foo"); + + provider_.set_send_blocked(true); + SetChannelReady(); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, + webrtc_data_channel_->state()); + provider_.set_send_blocked(false); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, + webrtc_data_channel_->state(), 1000); + EXPECT_EQ(cricket::DMT_CONTROL, provider_.last_send_data_params().type); +} + +// Tests that close first makes sure all queued data gets sent. +TEST_F(SctpDataChannelTest, QueuedCloseFlushes) { + webrtc::DataBuffer buffer("foo"); + + provider_.set_send_blocked(true); + SetChannelReady(); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, + webrtc_data_channel_->state()); + provider_.set_send_blocked(false); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, + webrtc_data_channel_->state(), 1000); + provider_.set_send_blocked(true); + webrtc_data_channel_->Send(buffer); + webrtc_data_channel_->Close(); + provider_.set_send_blocked(false); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, + webrtc_data_channel_->state(), 1000); + EXPECT_EQ(cricket::DMT_TEXT, provider_.last_send_data_params().type); +} + // Tests that messages are sent with the right ssrc. TEST_F(SctpDataChannelTest, SendDataSsrc) { webrtc_data_channel_->SetSctpSid(1); @@ -369,8 +404,9 @@ TEST_F(SctpDataChannelTest, ClosedWhenSendBufferFull) { EXPECT_TRUE(webrtc_data_channel_->Send(packet)); } - EXPECT_EQ(webrtc::DataChannelInterface::kClosed, - webrtc_data_channel_->state()); + EXPECT_TRUE( + webrtc::DataChannelInterface::kClosed == webrtc_data_channel_->state() || + webrtc::DataChannelInterface::kClosing == webrtc_data_channel_->state()); } // Tests that the DataChannel is closed on transport errors. diff --git a/talk/app/webrtc/test/fakedatachannelprovider.h b/talk/app/webrtc/test/fakedatachannelprovider.h index bf64a94e45..eb86873c90 100644 --- a/talk/app/webrtc/test/fakedatachannelprovider.h +++ b/talk/app/webrtc/test/fakedatachannelprovider.h @@ -91,11 +91,15 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { void set_send_blocked(bool blocked) { send_blocked_ = blocked; if (!blocked) { - std::set::iterator it; - for (it = connected_channels_.begin(); - it != connected_channels_.end(); - ++it) { - (*it)->OnChannelReady(true); + // Take a snapshot of the connected channels and check to see whether + // each value is still in connected_channels_ before calling + // OnChannelReady(). This avoids problems where the set gets modified + // in response to OnChannelReady(). + for (webrtc::DataChannel *ch : std::set( + connected_channels_.begin(), connected_channels_.end())) { + if (connected_channels_.count(ch)) { + ch->OnChannelReady(true); + } } } }