diff --git a/talk/media/sctp/sctpdataengine.cc b/talk/media/sctp/sctpdataengine.cc index 8c8a6a192f..4fc3d43dbc 100644 --- a/talk/media/sctp/sctpdataengine.cc +++ b/talk/media/sctp/sctpdataengine.cc @@ -109,6 +109,8 @@ typedef rtc::ScopedMessageData OutboundPacketMessage; // take off 80 bytes for DTLS/TURN/TCP/IP overhead. static const size_t kSctpMtu = 1200; +// The size of the SCTP association send buffer. 256kB, the usrsctp default. +static const int kSendBufferSize = 262144; enum { MSG_SCTPINBOUNDPACKET = 1, // MessageData is SctpInboundPacket MSG_SCTPOUTBOUNDPACKET = 2, // MessageData is rtc:Buffer @@ -177,11 +179,11 @@ static bool GetDataMediaType( } // Log the packet in text2pcap format, if log level is at LS_VERBOSE. -static void VerboseLogPacket(void *addr, size_t length, int direction) { +static void VerboseLogPacket(void *data, size_t length, int direction) { if (LOG_CHECK_LEVEL(LS_VERBOSE) && length > 0) { char *dump_buf; if ((dump_buf = usrsctp_dumppacket( - addr, length, direction)) != NULL) { + data, length, direction)) != NULL) { LOG(LS_VERBOSE) << dump_buf; usrsctp_freedumpbuffer(dump_buf); } @@ -258,6 +260,13 @@ SctpDataEngine::SctpDataEngine() { // TODO(ldixon): Consider turning this on/off. usrsctp_sysctl_set_sctp_ecn_enable(0); + // This is harmless, but we should find out when the library default + // changes. + int send_size = usrsctp_sysctl_get_sctp_sendspace(); + if (send_size != kSendBufferSize) { + LOG(LS_ERROR) << "Got different send size than expected: " << send_size; + } + // TODO(ldixon): Consider turning this on/off. // This is not needed right now (we don't do dynamic address changes): // If SCTP Auto-ASCONF is enabled, the peer is informed automatically @@ -315,6 +324,44 @@ DataMediaChannel* SctpDataEngine::CreateChannel( return new SctpDataMediaChannel(rtc::Thread::Current()); } +// static +SctpDataMediaChannel* SctpDataEngine::GetChannelFromSocket( + struct socket* sock) { + struct sockaddr* addrs = nullptr; + int naddrs = usrsctp_getladdrs(sock, 0, &addrs); + if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) { + return nullptr; + } + // usrsctp_getladdrs() returns the addresses bound to this socket, which + // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer, + // then free the list of addresses once we have the pointer. We only open + // AF_CONN sockets, and they should all have the sconn_addr set to the + // pointer that created them, so [0] is as good as any other. + struct sockaddr_conn* sconn = + reinterpret_cast(&addrs[0]); + SctpDataMediaChannel* channel = + reinterpret_cast(sconn->sconn_addr); + usrsctp_freeladdrs(addrs); + + return channel; +} + +// static +int SctpDataEngine::SendThresholdCallback(struct socket* sock, + uint32_t sb_free) { + // Fired on our I/O thread. SctpDataMediaChannel::OnPacketReceived() gets + // a packet containing acknowledgments, which goes into usrsctp_conninput, + // and then back here. + SctpDataMediaChannel* channel = GetChannelFromSocket(sock); + if (!channel) { + LOG(LS_ERROR) << "SendThresholdCallback: Failed to get channel for socket " + << sock; + return 0; + } + channel->OnSendThresholdCallback(); + return 0; +} + SctpDataMediaChannel::SctpDataMediaChannel(rtc::Thread* thread) : worker_thread_(thread), local_port_(kSctpDefaultPort), @@ -329,6 +376,11 @@ SctpDataMediaChannel::~SctpDataMediaChannel() { CloseSctpSocket(); } +void SctpDataMediaChannel::OnSendThresholdCallback() { + DCHECK(rtc::Thread::Current() == worker_thread_); + SignalReadyToSend(true); +} + sockaddr_conn SctpDataMediaChannel::GetSctpSockAddr(int port) { sockaddr_conn sconn = {0}; sconn.sconn_family = AF_CONN; @@ -347,8 +399,16 @@ bool SctpDataMediaChannel::OpenSctpSocket() { << "->Ignoring attempt to re-create existing socket."; return false; } + + // If kSendBufferSize isn't reflective of reality, we log an error, but we + // still have to do something reasonable here. Look up what the buffer's + // real size is and set our threshold to something reasonable. + const static int kSendThreshold = usrsctp_sysctl_get_sctp_sendspace() / 2; + sock_ = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, - cricket::OnSctpInboundPacket, NULL, 0, this); + cricket::OnSctpInboundPacket, + &SctpDataEngine::SendThresholdCallback, + kSendThreshold, this); if (!sock_) { LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to create SCTP socket."; return false; @@ -393,7 +453,7 @@ bool SctpDataMediaChannel::OpenSctpSocket() { } // Disable MTU discovery - struct sctp_paddrparams params = {{0}}; + sctp_paddrparams params = {{0}}; params.spp_assoc_id = 0; params.spp_flags = SPP_PMTUD_DISABLE; params.spp_pathmtu = kSctpMtu; @@ -598,6 +658,7 @@ bool SctpDataMediaChannel::SendData( // Called by network interface when a packet has been received. void SctpDataMediaChannel::OnPacketReceived( rtc::Buffer* packet, const rtc::PacketTime& packet_time) { + DCHECK(rtc::Thread::Current() == worker_thread_); LOG(LS_VERBOSE) << debug_name_ << "->OnPacketReceived(...): " << " length=" << packet->size() << ", sending: " << sending_; // Only give receiving packets to usrsctp after if connected. This enables two @@ -608,7 +669,6 @@ void SctpDataMediaChannel::OnPacketReceived( // Pass received packet to SCTP stack. Once processed by usrsctp, the data // will be will be given to the global OnSctpInboundData, and then, // marshalled by a Post and handled with OnMessage. - VerboseLogPacket(packet->data(), packet->size(), SCTP_DUMP_INBOUND); usrsctp_conninput(this, packet->data(), packet->size(), 0); } else { @@ -904,10 +964,17 @@ bool SctpDataMediaChannel::SetRecvCodecs(const std::vector& codecs) { void SctpDataMediaChannel::OnPacketFromSctpToNetwork( rtc::Buffer* buffer) { - if (buffer->size() > kSctpMtu) { + // usrsctp seems to interpret the MTU we give it strangely -- it seems to + // give us back packets bigger than that MTU, if only by a fixed amount. + // This is that amount that we've observed. + const int kSctpOverhead = 76; + if (buffer->size() > (kSctpOverhead + kSctpMtu)) { LOG(LS_ERROR) << debug_name_ << "->OnPacketFromSctpToNetwork(...): " << "SCTP seems to have made a packet that is bigger " - "than its official MTU."; + << "than its official MTU: " << buffer->size() + << " vs max of " << kSctpMtu + << " even after adding " << kSctpOverhead + << " extra SCTP overhead"; } MediaChannel::SendPacket(buffer); } diff --git a/talk/media/sctp/sctpdataengine.h b/talk/media/sctp/sctpdataengine.h index 86bfa37a48..20d9ed7792 100644 --- a/talk/media/sctp/sctpdataengine.h +++ b/talk/media/sctp/sctpdataengine.h @@ -64,6 +64,8 @@ const uint32 kMaxSctpSid = 1023; // usrsctp.h) const int kSctpDefaultPort = 5000; +class SctpDataMediaChannel; + // A DataEngine that interacts with usrsctp. // // From channel calls, data flows like this: @@ -88,7 +90,7 @@ const int kSctpDefaultPort = 5000; // 14. SctpDataMediaChannel::SignalDataReceived(data) // [from the same thread, methods registered/connected to // SctpDataMediaChannel are called with the recieved data] -class SctpDataEngine : public DataEngineInterface { +class SctpDataEngine : public DataEngineInterface, public sigslot::has_slots<> { public: SctpDataEngine(); virtual ~SctpDataEngine(); @@ -97,9 +99,13 @@ class SctpDataEngine : public DataEngineInterface { virtual const std::vector& data_codecs() { return codecs_; } + static int SendThresholdCallback(struct socket* sock, uint32_t sb_free); + private: static int usrsctp_engines_count; std::vector codecs_; + + static SctpDataMediaChannel* GetChannelFromSocket(struct socket* sock); }; // TODO(ldixon): Make into a special type of TypedMessageData. @@ -183,12 +189,13 @@ class SctpDataMediaChannel : public DataMediaChannel, const rtc::PacketTime& packet_time) {} virtual void OnReadyToSend(bool ready) {} + void OnSendThresholdCallback(); // Helper for debugging. void set_debug_name(const std::string& debug_name) { debug_name_ = debug_name; } const std::string& debug_name() const { return debug_name_; } - + const struct socket* socket() const { return sock_; } private: sockaddr_conn GetSctpSockAddr(int port); diff --git a/talk/media/sctp/sctpdataengine_unittest.cc b/talk/media/sctp/sctpdataengine_unittest.cc index 5b4c09e6a7..d406fa18cd 100644 --- a/talk/media/sctp/sctpdataengine_unittest.cc +++ b/talk/media/sctp/sctpdataengine_unittest.cc @@ -240,10 +240,16 @@ class SctpDataMediaChannelTest : public testing::Test, net2_.reset(new SctpFakeNetworkInterface(rtc::Thread::Current())); recv1_.reset(new SctpFakeDataReceiver()); recv2_.reset(new SctpFakeDataReceiver()); + chan1_ready_to_send_count_ = 0; + chan2_ready_to_send_count_ = 0; chan1_.reset(CreateChannel(net1_.get(), recv1_.get())); chan1_->set_debug_name("chan1/connector"); + chan1_->SignalReadyToSend.connect( + this, &SctpDataMediaChannelTest::OnChan1ReadyToSend); chan2_.reset(CreateChannel(net2_.get(), recv2_.get())); chan2_->set_debug_name("chan2/listener"); + chan2_->SignalReadyToSend.connect( + this, &SctpDataMediaChannelTest::OnChan2ReadyToSend); // Setup two connected channels ready to send and receive. net1_->SetDestination(chan2_.get()); net2_->SetDestination(chan1_.get()); @@ -330,6 +336,8 @@ class SctpDataMediaChannelTest : public testing::Test, SctpFakeDataReceiver* receiver1() { return recv1_.get(); } SctpFakeDataReceiver* receiver2() { return recv2_.get(); } + int channel1_ready_to_send_count() { return chan1_ready_to_send_count_; } + int channel2_ready_to_send_count() { return chan2_ready_to_send_count_; } private: rtc::scoped_ptr engine_; rtc::scoped_ptr net1_; @@ -338,6 +346,18 @@ class SctpDataMediaChannelTest : public testing::Test, rtc::scoped_ptr recv2_; rtc::scoped_ptr chan1_; rtc::scoped_ptr chan2_; + + int chan1_ready_to_send_count_; + int chan2_ready_to_send_count_; + + void OnChan1ReadyToSend(bool send) { + if (send) + ++chan1_ready_to_send_count_; + } + void OnChan2ReadyToSend(bool send) { + if (send) + ++chan2_ready_to_send_count_; + } }; // Verifies that SignalReadyToSend is fired. @@ -486,6 +506,15 @@ TEST_F(SctpDataMediaChannelTest, ClosesStreamsOnBothSides) { EXPECT_TRUE_WAIT(chan_1_sig_receiver.WasStreamClosed(4), 1000); } +TEST_F(SctpDataMediaChannelTest, EngineSignalsRightChannel) { + SetupConnectedChannels(); + EXPECT_TRUE_WAIT(channel1()->socket() != NULL, 1000); + struct socket *sock = const_cast(channel1()->socket()); + int prior_count = channel1_ready_to_send_count(); + cricket::SctpDataEngine::SendThresholdCallback(sock, 0); + EXPECT_GT(channel1_ready_to_send_count(), prior_count); +} + // Flaky on Linux and Windows. See webrtc:4453. #if defined(WEBRTC_WIN) || defined(WEBRTC_LINUX) #define MAYBE_ReusesAStream DISABLED_ReusesAStream