diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc index c8e6da4f00..062360d251 100644 --- a/media/sctp/dcsctp_transport.cc +++ b/media/sctp/dcsctp_transport.cc @@ -218,16 +218,17 @@ bool DcSctpTransport::Start(int local_sctp_port, } bool DcSctpTransport::OpenStream(int sid) { + RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ")."; - if (!socket_) { - RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid - << "): Transport is not started."; - return false; - } + + StreamState stream_state; + stream_states_.insert_or_assign(dcsctp::StreamID(static_cast(sid)), + stream_state); return true; } bool DcSctpTransport::ResetStream(int sid) { + RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ")."; if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid @@ -237,14 +238,21 @@ bool DcSctpTransport::ResetStream(int sid) { dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast(sid))}; - StreamClosingState& closing_state = closing_states_[streams[0]]; - if (closing_state.closure_initiated || closing_state.incoming_reset_done || - closing_state.outgoing_reset_done) { + auto it = stream_states_.find(streams[0]); + if (it == stream_states_.end()) { + RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid + << "): Stream is not open."; + return false; + } + + StreamState& stream_state = it->second; + if (stream_state.closure_initiated || stream_state.incoming_reset_done || + stream_state.outgoing_reset_done) { // The closing procedure was already initiated by the remote, don't do // anything. return false; } - closing_state.closure_initiated = true; + stream_state.closure_initiated = true; socket_->ResetStreams(streams); return true; } @@ -265,6 +273,30 @@ bool DcSctpTransport::SendData(int sid, return false; } + // It is possible for a message to be sent from the signaling thread at the + // same time a data-channel is closing, but before the signaling thread is + // aware of it. So we need to keep track of currently active data channels and + // skip sending messages for the ones that are not open or closing. + // The sending errors are not impacting the data channel API contract as + // it is allowed to discard queued messages when the channel is closing. + auto stream_state = + stream_states_.find(dcsctp::StreamID(static_cast(sid))); + if (stream_state == stream_states_.end()) { + RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: " + << sid; + *result = cricket::SDR_ERROR; + return false; + } + + if (stream_state->second.closure_initiated || + stream_state->second.incoming_reset_done || + stream_state->second.outgoing_reset_done) { + RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: " + << sid; + *result = cricket::SDR_ERROR; + return false; + } + auto max_message_size = socket_->options().max_message_size; if (max_message_size > 0 && payload.size() > max_message_size) { RTC_LOG(LS_WARNING) << debug_name_ @@ -519,16 +551,23 @@ void DcSctpTransport::OnStreamsResetPerformed( RTC_LOG(LS_INFO) << debug_name_ << "->OnStreamsResetPerformed(...): Outgoing stream reset" << ", sid=" << stream_id.value(); - StreamClosingState& closing_state = closing_states_[stream_id]; - closing_state.outgoing_reset_done = true; - if (closing_state.incoming_reset_done) { + auto it = stream_states_.find(stream_id); + if (it == stream_states_.end()) { + // Ignoring an outgoing stream reset for a closed stream + return; + } + + StreamState& stream_state = it->second; + stream_state.outgoing_reset_done = true; + + if (stream_state.incoming_reset_done) { // When the close was not initiated locally, we can signal the end of the // data channel close procedure when the remote ACKs the reset. if (data_channel_sink_) { data_channel_sink_->OnChannelClosed(stream_id.value()); } - closing_states_.erase(stream_id); + stream_states_.erase(stream_id); } } } @@ -540,10 +579,15 @@ void DcSctpTransport::OnIncomingStreamsReset( RTC_LOG(LS_INFO) << debug_name_ << "->OnIncomingStreamsReset(...): Incoming stream reset" << ", sid=" << stream_id.value(); - StreamClosingState& closing_state = closing_states_[stream_id]; - closing_state.incoming_reset_done = true; - if (!closing_state.closure_initiated) { + auto it = stream_states_.find(stream_id); + if (it == stream_states_.end()) + return; + + StreamState& stream_state = it->second; + stream_state.incoming_reset_done = true; + + if (!stream_state.closure_initiated) { // When receiving an incoming stream reset event for a non local close // procedure, the transport needs to reset the stream in the other // direction too. @@ -554,13 +598,13 @@ void DcSctpTransport::OnIncomingStreamsReset( } } - if (closing_state.outgoing_reset_done) { + if (stream_state.outgoing_reset_done) { // The close procedure that was initiated locally is complete when we // receive and incoming reset event. if (data_channel_sink_) { data_channel_sink_->OnChannelClosed(stream_id.value()); } - closing_states_.erase(stream_id); + stream_states_.erase(stream_id); } } } diff --git a/media/sctp/dcsctp_transport.h b/media/sctp/dcsctp_transport.h index 1f71db87c4..f86ac5a23a 100644 --- a/media/sctp/dcsctp_transport.h +++ b/media/sctp/dcsctp_transport.h @@ -114,10 +114,10 @@ class DcSctpTransport : public cricket::SctpTransportInternal, std::string debug_name_ = "DcSctpTransport"; rtc::CopyOnWriteBuffer receive_buffer_; - // Used to keep track of the closing state of the data channel. + // Used to keep track of the state of data channels. // Reset needs to happen both ways before signaling the transport // is closed. - struct StreamClosingState { + struct StreamState { // True when the local connection has initiated the reset. // If a connection receives a reset for a stream that isn't // already being reset locally, it needs to fire the signal @@ -129,7 +129,9 @@ class DcSctpTransport : public cricket::SctpTransportInternal, bool outgoing_reset_done = false; }; - flat_map closing_states_; + // Map of all currently open or closing data channels + flat_map stream_states_ + RTC_GUARDED_BY(network_thread_); bool ready_to_send_data_ = false; std::function on_connected_callback_ RTC_GUARDED_BY(network_thread_); DataChannelSink* data_channel_sink_ RTC_GUARDED_BY(network_thread_) = nullptr; diff --git a/media/sctp/dcsctp_transport_unittest.cc b/media/sctp/dcsctp_transport_unittest.cc index 270b06a63f..08dc2ec0b6 100644 --- a/media/sctp/dcsctp_transport_unittest.cc +++ b/media/sctp/dcsctp_transport_unittest.cc @@ -18,6 +18,7 @@ #include "p2p/base/fake_packet_transport.h" #include "test/gtest.h" +using ::testing::_; using ::testing::ByMove; using ::testing::DoAll; using ::testing::ElementsAre; @@ -25,6 +26,7 @@ using ::testing::InSequence; using ::testing::Invoke; using ::testing::NiceMock; using ::testing::Return; +using ::testing::ReturnPointee; namespace webrtc { @@ -112,6 +114,7 @@ TEST(DcSctpTransportTest, CloseSequence) { peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_a.sctp_transport_->OpenStream(1); + peer_b.sctp_transport_->OpenStream(1); peer_a.sctp_transport_->ResetStream(1); // Simulate the callbacks from the stream resets @@ -153,6 +156,7 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) { peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_a.sctp_transport_->OpenStream(1); + peer_b.sctp_transport_->OpenStream(1); peer_a.sctp_transport_->ResetStream(1); peer_b.sctp_transport_->ResetStream(1); @@ -168,4 +172,62 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) { ->OnIncomingStreamsReset(streams); } +TEST(DcSctpTransportTest, DiscardMessageClosedChannel) { + rtc::AutoThread main_thread; + Peer peer_a; + + EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0); + + peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); + + cricket::SendDataResult result; + SendDataParams params; + rtc::CopyOnWriteBuffer payload; + bool send_data_return = + peer_a.sctp_transport_->SendData(1, params, payload, &result); + EXPECT_FALSE(send_data_return); + EXPECT_EQ(cricket::SDR_ERROR, result); +} + +TEST(DcSctpTransportTest, DiscardMessageClosingChannel) { + rtc::AutoThread main_thread; + Peer peer_a; + + EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0); + + peer_a.sctp_transport_->OpenStream(1); + peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); + peer_a.sctp_transport_->ResetStream(1); + + cricket::SendDataResult result; + SendDataParams params; + rtc::CopyOnWriteBuffer payload; + + bool send_data_return = + peer_a.sctp_transport_->SendData(1, params, payload, &result); + EXPECT_FALSE(send_data_return); + EXPECT_EQ(cricket::SDR_ERROR, result); +} + +TEST(DcSctpTransportTest, SendDataOpenChannel) { + rtc::AutoThread main_thread; + Peer peer_a; + dcsctp::DcSctpOptions options; + + EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(1); + EXPECT_CALL(*peer_a.socket_, options()).WillOnce(ReturnPointee(&options)); + + peer_a.sctp_transport_->OpenStream(1); + peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); + + cricket::SendDataResult result; + SendDataParams params; + rtc::CopyOnWriteBuffer payload; + + bool send_data_return = + peer_a.sctp_transport_->SendData(1, params, payload, &result); + EXPECT_TRUE(send_data_return); + EXPECT_EQ(cricket::SDR_SUCCESS, result); +} + } // namespace webrtc