From 0edd50ccb34cc2dc4746137fdce1f5cf66808274 Mon Sep 17 00:00:00 2001 From: bemasc Date: Wed, 1 Jul 2015 13:34:33 -0700 Subject: [PATCH] Support for onbufferedamountlow Original review at https://webrtc-codereview.appspot.com/54679004/ BUG=https://code.google.com/p/chromium/issues/detail?id=496700 Review URL: https://codereview.webrtc.org/1207613006 Cr-Commit-Position: refs/heads/master@{#9527} --- talk/app/webrtc/datachannel.cc | 13 ++++++++- talk/app/webrtc/datachannel_unittest.cc | 28 ++++++++++++++++++- talk/app/webrtc/datachannelinterface.h | 2 ++ .../app/webrtc/java/jni/peerconnection_jni.cc | 19 +++++++++---- .../java/src/org/webrtc/DataChannel.java | 2 ++ .../src/org/webrtc/PeerConnectionTest.java | 5 ++++ talk/app/webrtc/objc/RTCDataChannel.mm | 9 ++++++ talk/app/webrtc/objc/public/RTCDataChannel.h | 6 ++++ .../objctests/RTCPeerConnectionSyncObserver.m | 6 ++++ .../webrtc/test/mockpeerconnectionobservers.h | 6 ++-- 10 files changed, 87 insertions(+), 9 deletions(-) diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc index 559eec5343..690ee65d3b 100644 --- a/talk/app/webrtc/datachannel.cc +++ b/talk/app/webrtc/datachannel.cc @@ -476,6 +476,7 @@ void DataChannel::SendQueuedDataMessages() { ASSERT(state_ == kOpen || state_ == kClosing); + uint64 start_buffered_amount = buffered_amount(); while (!queued_send_data_.Empty()) { DataBuffer* buffer = queued_send_data_.Front(); if (!SendDataMessage(*buffer, false)) { @@ -485,6 +486,10 @@ void DataChannel::SendQueuedDataMessages() { queued_send_data_.Pop(); delete buffer; } + + if (observer_ && buffered_amount() < start_buffered_amount) { + observer_->OnBufferedAmountChange(start_buffered_amount); + } } bool DataChannel::SendDataMessage(const DataBuffer& buffer, @@ -534,11 +539,17 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, } bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { - if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) { + size_t start_buffered_amount = buffered_amount(); + if (start_buffered_amount >= kMaxQueuedSendDataBytes) { LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; return false; } queued_send_data_.Push(new DataBuffer(buffer)); + + // The buffer can have length zero, in which case there is no change. + if (observer_ && buffered_amount() > start_buffered_amount) { + observer_->OnBufferedAmountChange(start_buffered_amount); + } return true; } diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc index bc4f81c09f..e3c290bd9b 100644 --- a/talk/app/webrtc/datachannel_unittest.cc +++ b/talk/app/webrtc/datachannel_unittest.cc @@ -35,12 +35,18 @@ using webrtc::DataChannel; class FakeDataChannelObserver : public webrtc::DataChannelObserver { public: FakeDataChannelObserver() - : messages_received_(0), on_state_change_count_(0) {} + : messages_received_(0), + on_state_change_count_(0), + on_buffered_amount_change_count_(0) {} void OnStateChange() { ++on_state_change_count_; } + void OnBufferedAmountChange(uint64 previous_amount) { + ++on_buffered_amount_change_count_; + } + void OnMessage(const webrtc::DataBuffer& buffer) { ++messages_received_; } @@ -53,13 +59,22 @@ class FakeDataChannelObserver : public webrtc::DataChannelObserver { on_state_change_count_ = 0; } + void ResetOnBufferedAmountChangeCount() { + on_buffered_amount_change_count_ = 0; + } + size_t on_state_change_count() const { return on_state_change_count_; } + size_t on_buffered_amount_change_count() const { + return on_buffered_amount_change_count_; + } + private: size_t messages_received_; size_t on_state_change_count_; + size_t on_buffered_amount_change_count_; }; class SctpDataChannelTest : public testing::Test { @@ -133,11 +148,13 @@ TEST_F(SctpDataChannelTest, StateTransition) { // Tests that DataChannel::buffered_amount() is correct after the channel is // blocked. TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { + AddObserver(); SetChannelReady(); webrtc::DataBuffer buffer("abcd"); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); provider_.set_send_blocked(true); @@ -147,37 +164,46 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { } EXPECT_EQ(buffer.data.size() * number_of_packets, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(number_of_packets, observer_->on_buffered_amount_change_count()); } // Tests that the queued data are sent when the channel transitions from blocked // to unblocked. TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { + AddObserver(); SetChannelReady(); webrtc::DataBuffer buffer("abcd"); provider_.set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + provider_.set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); } // Tests that no crash when the channel is blocked right away while trying to // send queued data. TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { + AddObserver(); SetChannelReady(); webrtc::DataBuffer buffer("abcd"); provider_.set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); // Set channel ready while it is still blocked. SetChannelReady(); EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); // Unblock the channel to send queued data again, there should be no crash. provider_.set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); } // Tests that the queued control message is sent when channel is ready. diff --git a/talk/app/webrtc/datachannelinterface.h b/talk/app/webrtc/datachannelinterface.h index 63122629f5..90573ebbf3 100644 --- a/talk/app/webrtc/datachannelinterface.h +++ b/talk/app/webrtc/datachannelinterface.h @@ -91,6 +91,8 @@ class DataChannelObserver { virtual void OnStateChange() = 0; // A data buffer was successfully received. virtual void OnMessage(const DataBuffer& buffer) = 0; + // The data channel's buffered_amount has changed. + virtual void OnBufferedAmountChange(uint64 previous_amount){}; protected: virtual ~DataChannelObserver() {} diff --git a/talk/app/webrtc/java/jni/peerconnection_jni.cc b/talk/app/webrtc/java/jni/peerconnection_jni.cc index 69078e6b7e..2b6cbc7377 100644 --- a/talk/app/webrtc/java/jni/peerconnection_jni.cc +++ b/talk/app/webrtc/java/jni/peerconnection_jni.cc @@ -559,16 +559,24 @@ class DataChannelObserverWrapper : public DataChannelObserver { : j_observer_global_(jni, j_observer), j_observer_class_(jni, GetObjectClass(jni, j_observer)), j_buffer_class_(jni, FindClass(jni, "org/webrtc/DataChannel$Buffer")), - j_on_state_change_mid_(GetMethodID(jni, *j_observer_class_, - "onStateChange", "()V")), + j_on_buffered_amount_change_mid_(GetMethodID( + jni, *j_observer_class_, "onBufferedAmountChange", "(J)V")), + j_on_state_change_mid_( + GetMethodID(jni, *j_observer_class_, "onStateChange", "()V")), j_on_message_mid_(GetMethodID(jni, *j_observer_class_, "onMessage", "(Lorg/webrtc/DataChannel$Buffer;)V")), - j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_, - "", "(Ljava/nio/ByteBuffer;Z)V")) { - } + j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_, "", + "(Ljava/nio/ByteBuffer;Z)V")) {} virtual ~DataChannelObserverWrapper() {} + void OnBufferedAmountChange(uint64 previous_amount) override { + ScopedLocalRefFrame local_ref_frame(jni()); + jni()->CallVoidMethod(*j_observer_global_, j_on_buffered_amount_change_mid_, + previous_amount); + CHECK_EXCEPTION(jni()) << "error during CallVoidMethod"; + } + void OnStateChange() override { ScopedLocalRefFrame local_ref_frame(jni()); jni()->CallVoidMethod(*j_observer_global_, j_on_state_change_mid_); @@ -593,6 +601,7 @@ class DataChannelObserverWrapper : public DataChannelObserver { const ScopedGlobalRef j_observer_global_; const ScopedGlobalRef j_observer_class_; const ScopedGlobalRef j_buffer_class_; + const jmethodID j_on_buffered_amount_change_mid_; const jmethodID j_on_state_change_mid_; const jmethodID j_on_message_mid_; const jmethodID j_buffer_ctor_; diff --git a/talk/app/webrtc/java/src/org/webrtc/DataChannel.java b/talk/app/webrtc/java/src/org/webrtc/DataChannel.java index deee84bd35..1866098703 100644 --- a/talk/app/webrtc/java/src/org/webrtc/DataChannel.java +++ b/talk/app/webrtc/java/src/org/webrtc/DataChannel.java @@ -77,6 +77,8 @@ public class DataChannel { /** Java version of C++ DataChannelObserver. */ public interface Observer { + /** The data channel's bufferedAmount has changed. */ + public void onBufferedAmountChange(long previousAmount); /** The data channel state has changed. */ public void onStateChange(); /** diff --git a/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java b/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java index 224225cdc7..b87f484f2f 100644 --- a/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java +++ b/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java @@ -258,6 +258,11 @@ public class PeerConnectionTest { assertTrue(expected.data.equals(buffer.data)); } + @Override + public synchronized void onBufferedAmountChange(long previousAmount) { + assertFalse(previousAmount == dataChannel.bufferedAmount()); + } + @Override public synchronized void onStateChange() { assertEquals(expectedStateChanges.removeFirst(), dataChannel.state()); diff --git a/talk/app/webrtc/objc/RTCDataChannel.mm b/talk/app/webrtc/objc/RTCDataChannel.mm index 4fb03c284e..8a9b6b6095 100644 --- a/talk/app/webrtc/objc/RTCDataChannel.mm +++ b/talk/app/webrtc/objc/RTCDataChannel.mm @@ -43,6 +43,15 @@ class RTCDataChannelObserver : public DataChannelObserver { [_channel.delegate channelDidChangeState:_channel]; } + void OnBufferedAmountChange(uint64 previousAmount) override { + RTCDataChannel* channel = _channel; + id delegate = channel.delegate; + if ([delegate + respondsToSelector:@selector(channel:didChangeBufferedAmount:)]) { + [delegate channel:channel didChangeBufferedAmount:previousAmount]; + } + } + void OnMessage(const DataBuffer& buffer) override { if (!_channel.delegate) { return; diff --git a/talk/app/webrtc/objc/public/RTCDataChannel.h b/talk/app/webrtc/objc/public/RTCDataChannel.h index 7c22580245..24a46f655c 100644 --- a/talk/app/webrtc/objc/public/RTCDataChannel.h +++ b/talk/app/webrtc/objc/public/RTCDataChannel.h @@ -82,6 +82,12 @@ typedef enum { - (void)channel:(RTCDataChannel*)channel didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer; +@optional + +// Called when the buffered amount has changed. +- (void)channel:(RTCDataChannel*)channel + didChangeBufferedAmount:(NSUInteger)amount; + @end // ObjectiveC wrapper for a DataChannel object. diff --git a/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m b/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m index 5c76672414..5070b789b8 100644 --- a/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m +++ b/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m @@ -230,6 +230,12 @@ NSAssert(expectedState == channel.state, @"Channel state should match"); } +- (void)channel:(RTCDataChannel*)channel + didChangeBufferedAmount:(NSUInteger)previousAmount { + NSAssert(channel.bufferedAmount != previousAmount, + @"Invalid bufferedAmount change"); +} + - (void)channel:(RTCDataChannel*)channel didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer { NSAssert([_expectedMessages count] > 0, diff --git a/talk/app/webrtc/test/mockpeerconnectionobservers.h b/talk/app/webrtc/test/mockpeerconnectionobservers.h index 580a0fbb85..d2697b4364 100644 --- a/talk/app/webrtc/test/mockpeerconnectionobservers.h +++ b/talk/app/webrtc/test/mockpeerconnectionobservers.h @@ -98,8 +98,10 @@ class MockDataChannelObserver : public webrtc::DataChannelObserver { channel_->UnregisterObserver(); } - virtual void OnStateChange() { state_ = channel_->state(); } - virtual void OnMessage(const DataBuffer& buffer) { + void OnBufferedAmountChange(uint64 previous_amount) override {} + + void OnStateChange() override { state_ = channel_->state(); } + void OnMessage(const DataBuffer& buffer) override { last_message_.assign(buffer.data.data(), buffer.data.size()); ++received_message_count_; }