From 273d0296a47f6bf0ff1f0e894bf42543ceff9832 Mon Sep 17 00:00:00 2001 From: Bjorn Mellem Date: Thu, 1 Nov 2018 16:42:44 -0700 Subject: [PATCH] Implement data channel methods in LoopbackMediaTransport. This enables PeerConnection tests to use LoopbackMediaTransport to test data-channel-over-media-transport code. Also changes LoopbackMediaTransport to invoke callbacks asynchronously. This is more accurate, as these callbacks are triggered by network events. The caller should not block while the callback executes. Since LoopbackMediaTransport is used for testing, it provides a FlushAsyncInvokes() method which may be used to ensure that callbacks occur deterministically (eg. before checking that data has been received). Bug: webrtc:9719 Change-Id: Ib8ea9aebf4a0ad3d5934a6fe4ab33432c68523fd Tbr: stefan@webrtc.org Reviewed-on: https://webrtc-review.googlesource.com/c/109060 Commit-Queue: Bjorn Mellem Reviewed-by: Steve Anton Cr-Commit-Position: refs/heads/master@{#25489} --- api/BUILD.gn | 1 + api/test/DEPS | 5 ++ api/test/loopback_media_transport.h | 88 +++++++++++++++---- api/test/loopback_media_transport_unittest.cc | 69 ++++++++++++++- audio/test/media_transport_test.cc | 4 +- 5 files changed, 149 insertions(+), 18 deletions(-) diff --git a/api/BUILD.gn b/api/BUILD.gn index 7e74c1a164..4aa42d407b 100644 --- a/api/BUILD.gn +++ b/api/BUILD.gn @@ -611,6 +611,7 @@ if (rtc_include_tests) { deps = [ ":libjingle_peerconnection_api", "../rtc_base:checks", + "../rtc_base:rtc_base", ] } diff --git a/api/test/DEPS b/api/test/DEPS index 98b1ad391f..1fc1f7cb30 100644 --- a/api/test/DEPS +++ b/api/test/DEPS @@ -5,4 +5,9 @@ specific_include_rules = { ".*": [ "+video" ], + "loopback_media_transport\.h": [ + "+rtc_base/asyncinvoker.h", + "+rtc_base/criticalsection.h", + "+rtc_base/thread.h", + ], } diff --git a/api/test/loopback_media_transport.h b/api/test/loopback_media_transport.h index 9ce50a7c8c..f3f24d4c98 100644 --- a/api/test/loopback_media_transport.h +++ b/api/test/loopback_media_transport.h @@ -14,6 +14,9 @@ #include #include "api/media_transport_interface.h" +#include "rtc_base/asyncinvoker.h" +#include "rtc_base/criticalsection.h" +#include "rtc_base/thread.h" namespace webrtc { @@ -21,24 +24,36 @@ namespace webrtc { // Currently supports audio only. class MediaTransportPair { public: - MediaTransportPair() - : pipe_{LoopbackMediaTransport(&pipe_[1]), - LoopbackMediaTransport(&pipe_[0])} {} + explicit MediaTransportPair(rtc::Thread* thread) + : first_(thread, &second_), second_(thread, &first_) {} // Ownership stays with MediaTransportPair - MediaTransportInterface* first() { return &pipe_[0]; } - MediaTransportInterface* second() { return &pipe_[1]; } + MediaTransportInterface* first() { return &first_; } + MediaTransportInterface* second() { return &second_; } + + void FlushAsyncInvokes() { + first_.FlushAsyncInvokes(); + second_.FlushAsyncInvokes(); + } private: class LoopbackMediaTransport : public MediaTransportInterface { public: - explicit LoopbackMediaTransport(LoopbackMediaTransport* other) - : other_(other) {} - ~LoopbackMediaTransport() { RTC_CHECK(sink_ == nullptr); } + LoopbackMediaTransport(rtc::Thread* thread, LoopbackMediaTransport* other) + : thread_(thread), other_(other) {} + + ~LoopbackMediaTransport() { + rtc::CritScope lock(&sink_lock_); + RTC_CHECK(sink_ == nullptr); + RTC_CHECK(data_sink_ == nullptr); + } RTCError SendAudioFrame(uint64_t channel_id, MediaTransportEncodedAudioFrame frame) override { - other_->OnData(channel_id, std::move(frame)); + invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, + [this, channel_id, frame] { + other_->OnData(channel_id, std::move(frame)); + }); return RTCError::OK(); }; @@ -53,6 +68,7 @@ class MediaTransportPair { } void SetReceiveAudioSink(MediaTransportAudioSinkInterface* sink) override { + rtc::CritScope lock(&sink_lock_); if (sink) { RTC_CHECK(sink_ == nullptr); } @@ -70,27 +86,69 @@ class MediaTransportPair { RTCError SendData(int channel_id, const SendDataParams& params, const rtc::CopyOnWriteBuffer& buffer) override { - return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented"); + invoker_.AsyncInvoke( + RTC_FROM_HERE, thread_, [this, channel_id, params, buffer] { + other_->OnData(channel_id, params.type, buffer); + }); + return RTCError::OK(); } RTCError CloseChannel(int channel_id) override { - return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented"); + invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, [this, channel_id] { + other_->OnRemoteCloseChannel(channel_id); + rtc::CritScope lock(&sink_lock_); + if (data_sink_) { + data_sink_->OnChannelClosed(channel_id); + } + }); + return RTCError::OK(); } - void SetDataSink(DataChannelSink* sink) override {} + void SetDataSink(DataChannelSink* sink) override { + rtc::CritScope lock(&sink_lock_); + data_sink_ = sink; + } + + void FlushAsyncInvokes() { invoker_.Flush(thread_); } private: void OnData(uint64_t channel_id, MediaTransportEncodedAudioFrame frame) { + rtc::CritScope lock(&sink_lock_); if (sink_) { sink_->OnData(channel_id, frame); } } - MediaTransportAudioSinkInterface* sink_ = nullptr; - LoopbackMediaTransport* other_; + void OnData(int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) { + rtc::CritScope lock(&sink_lock_); + if (data_sink_) { + data_sink_->OnDataReceived(channel_id, type, buffer); + } + } + + void OnRemoteCloseChannel(int channel_id) { + rtc::CritScope lock(&sink_lock_); + if (data_sink_) { + data_sink_->OnChannelClosing(channel_id); + data_sink_->OnChannelClosed(channel_id); + } + } + + rtc::Thread* const thread_; + rtc::CriticalSection sink_lock_; + + MediaTransportAudioSinkInterface* sink_ RTC_GUARDED_BY(sink_lock_) = + nullptr; + DataChannelSink* data_sink_ RTC_GUARDED_BY(sink_lock_) = nullptr; + LoopbackMediaTransport* const other_; + + rtc::AsyncInvoker invoker_; }; - LoopbackMediaTransport pipe_[2]; + LoopbackMediaTransport first_; + LoopbackMediaTransport second_; }; } // namespace webrtc diff --git a/api/test/loopback_media_transport_unittest.cc b/api/test/loopback_media_transport_unittest.cc index bff74b8930..f85413c55b 100644 --- a/api/test/loopback_media_transport_unittest.cc +++ b/api/test/loopback_media_transport_unittest.cc @@ -8,6 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ +#include #include #include "api/test/loopback_media_transport.h" @@ -23,6 +24,14 @@ class MockMediaTransportAudioSinkInterface MOCK_METHOD2(OnData, void(uint64_t, MediaTransportEncodedAudioFrame)); }; +class MockDataChannelSink : public DataChannelSink { + public: + MOCK_METHOD3(OnDataReceived, + void(int, DataMessageType, const rtc::CopyOnWriteBuffer&)); + MOCK_METHOD1(OnChannelClosing, void(int)); + MOCK_METHOD1(OnChannelClosed, void(int)); +}; + // Test only uses the sequence number. MediaTransportEncodedAudioFrame CreateAudioFrame(int sequence_number) { static constexpr int kSamplingRateHz = 48000; @@ -39,13 +48,18 @@ MediaTransportEncodedAudioFrame CreateAudioFrame(int sequence_number) { } // namespace TEST(LoopbackMediaTransport, AudioWithNoSinkSilentlyIgnored) { - MediaTransportPair transport_pair; + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(0)); transport_pair.second()->SendAudioFrame(2, CreateAudioFrame(0)); + transport_pair.FlushAsyncInvokes(); } TEST(LoopbackMediaTransport, AudioDeliveredToSink) { - MediaTransportPair transport_pair; + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); testing::StrictMock sink; EXPECT_CALL(sink, OnData(1, testing::Property( @@ -54,7 +68,58 @@ TEST(LoopbackMediaTransport, AudioDeliveredToSink) { transport_pair.second()->SetReceiveAudioSink(&sink); transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(10)); + transport_pair.FlushAsyncInvokes(); transport_pair.second()->SetReceiveAudioSink(nullptr); } +TEST(LoopbackMediaTransport, DataDeliveredToSink) { + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); + + MockDataChannelSink sink; + transport_pair.first()->SetDataSink(&sink); + + const int channel_id = 1; + EXPECT_CALL(sink, + OnDataReceived( + channel_id, DataMessageType::kText, + testing::Property( + &rtc::CopyOnWriteBuffer::cdata, testing::StrEq("foo")))); + + SendDataParams params; + params.type = DataMessageType::kText; + rtc::CopyOnWriteBuffer buffer("foo"); + transport_pair.second()->SendData(channel_id, params, buffer); + + transport_pair.FlushAsyncInvokes(); + transport_pair.first()->SetDataSink(nullptr); +} + +TEST(LoopbackMediaTransport, CloseDeliveredToSink) { + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); + + MockDataChannelSink first_sink; + transport_pair.first()->SetDataSink(&first_sink); + + MockDataChannelSink second_sink; + transport_pair.second()->SetDataSink(&second_sink); + + const int channel_id = 1; + { + testing::InSequence s; + EXPECT_CALL(second_sink, OnChannelClosing(channel_id)); + EXPECT_CALL(second_sink, OnChannelClosed(channel_id)); + EXPECT_CALL(first_sink, OnChannelClosed(channel_id)); + } + + transport_pair.first()->CloseChannel(channel_id); + + transport_pair.FlushAsyncInvokes(); + transport_pair.first()->SetDataSink(nullptr); + transport_pair.second()->SetDataSink(nullptr); +} + } // namespace webrtc diff --git a/audio/test/media_transport_test.cc b/audio/test/media_transport_test.cc index 7c25a724b2..91677dbd8b 100644 --- a/audio/test/media_transport_test.cc +++ b/audio/test/media_transport_test.cc @@ -65,7 +65,9 @@ class TestRenderer : public TestAudioDeviceModule::Renderer { } // namespace TEST(AudioWithMediaTransport, DeliversAudio) { - MediaTransportPair transport_pair; + std::unique_ptr transport_thread = rtc::Thread::Create(); + transport_thread->Start(); + MediaTransportPair transport_pair(transport_thread.get()); MockTransport rtcp_send_transport; MockTransport send_transport; std::unique_ptr null_event_log = RtcEventLog::CreateNull();