diff --git a/api/BUILD.gn b/api/BUILD.gn index 7e74c1a164..4aa42d407b 100644 --- a/api/BUILD.gn +++ b/api/BUILD.gn @@ -611,6 +611,7 @@ if (rtc_include_tests) { deps = [ ":libjingle_peerconnection_api", "../rtc_base:checks", + "../rtc_base:rtc_base", ] } diff --git a/api/test/DEPS b/api/test/DEPS index 98b1ad391f..1fc1f7cb30 100644 --- a/api/test/DEPS +++ b/api/test/DEPS @@ -5,4 +5,9 @@ specific_include_rules = { ".*": [ "+video" ], + "loopback_media_transport\.h": [ + "+rtc_base/asyncinvoker.h", + "+rtc_base/criticalsection.h", + "+rtc_base/thread.h", + ], } diff --git a/api/test/loopback_media_transport.h b/api/test/loopback_media_transport.h index 9ce50a7c8c..f3f24d4c98 100644 --- a/api/test/loopback_media_transport.h +++ b/api/test/loopback_media_transport.h @@ -14,6 +14,9 @@ #include #include "api/media_transport_interface.h" +#include "rtc_base/asyncinvoker.h" +#include "rtc_base/criticalsection.h" +#include "rtc_base/thread.h" namespace webrtc { @@ -21,24 +24,36 @@ namespace webrtc { // Currently supports audio only. class MediaTransportPair { public: - MediaTransportPair() - : pipe_{LoopbackMediaTransport(&pipe_[1]), - LoopbackMediaTransport(&pipe_[0])} {} + explicit MediaTransportPair(rtc::Thread* thread) + : first_(thread, &second_), second_(thread, &first_) {} // Ownership stays with MediaTransportPair - MediaTransportInterface* first() { return &pipe_[0]; } - MediaTransportInterface* second() { return &pipe_[1]; } + MediaTransportInterface* first() { return &first_; } + MediaTransportInterface* second() { return &second_; } + + void FlushAsyncInvokes() { + first_.FlushAsyncInvokes(); + second_.FlushAsyncInvokes(); + } private: class LoopbackMediaTransport : public MediaTransportInterface { public: - explicit LoopbackMediaTransport(LoopbackMediaTransport* other) - : other_(other) {} - ~LoopbackMediaTransport() { RTC_CHECK(sink_ == nullptr); } + LoopbackMediaTransport(rtc::Thread* thread, LoopbackMediaTransport* other) + : thread_(thread), other_(other) {} + + ~LoopbackMediaTransport() { + rtc::CritScope lock(&sink_lock_); + RTC_CHECK(sink_ == nullptr); + RTC_CHECK(data_sink_ == nullptr); + } RTCError SendAudioFrame(uint64_t channel_id, MediaTransportEncodedAudioFrame frame) override { - other_->OnData(channel_id, std::move(frame)); + invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, + [this, channel_id, frame] { + other_->OnData(channel_id, std::move(frame)); + }); return RTCError::OK(); }; @@ -53,6 +68,7 @@ class MediaTransportPair { } void SetReceiveAudioSink(MediaTransportAudioSinkInterface* sink) override { + rtc::CritScope lock(&sink_lock_); if (sink) { RTC_CHECK(sink_ == nullptr); } @@ -70,27 +86,69 @@ class MediaTransportPair { RTCError SendData(int channel_id, const SendDataParams& params, const rtc::CopyOnWriteBuffer& buffer) override { - return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented"); + invoker_.AsyncInvoke( + RTC_FROM_HERE, thread_, [this, channel_id, params, buffer] { + other_->OnData(channel_id, params.type, buffer); + }); + return RTCError::OK(); } RTCError CloseChannel(int channel_id) override { - return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented"); + invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, [this, channel_id] { + other_->OnRemoteCloseChannel(channel_id); + rtc::CritScope lock(&sink_lock_); + if (data_sink_) { + data_sink_->OnChannelClosed(channel_id); + } + }); + return RTCError::OK(); } - void SetDataSink(DataChannelSink* sink) override {} + void SetDataSink(DataChannelSink* sink) override { + rtc::CritScope lock(&sink_lock_); + data_sink_ = sink; + } + + void FlushAsyncInvokes() { invoker_.Flush(thread_); } private: void OnData(uint64_t channel_id, MediaTransportEncodedAudioFrame frame) { + rtc::CritScope lock(&sink_lock_); if (sink_) { sink_->OnData(channel_id, frame); } } - MediaTransportAudioSinkInterface* sink_ = nullptr; - LoopbackMediaTransport* other_; + void OnData(int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) { + rtc::CritScope lock(&sink_lock_); + if (data_sink_) { + data_sink_->OnDataReceived(channel_id, type, buffer); + } + } + + void OnRemoteCloseChannel(int channel_id) { + rtc::CritScope lock(&sink_lock_); + if (data_sink_) { + data_sink_->OnChannelClosing(channel_id); + data_sink_->OnChannelClosed(channel_id); + } + } + + rtc::Thread* const thread_; + rtc::CriticalSection sink_lock_; + + MediaTransportAudioSinkInterface* sink_ RTC_GUARDED_BY(sink_lock_) = + nullptr; + DataChannelSink* data_sink_ RTC_GUARDED_BY(sink_lock_) = nullptr; + LoopbackMediaTransport* const other_; + + rtc::AsyncInvoker invoker_; }; - LoopbackMediaTransport pipe_[2]; + LoopbackMediaTransport first_; + LoopbackMediaTransport second_; }; } // namespace webrtc diff --git a/api/test/loopback_media_transport_unittest.cc b/api/test/loopback_media_transport_unittest.cc index bff74b8930..f85413c55b 100644 --- a/api/test/loopback_media_transport_unittest.cc +++ b/api/test/loopback_media_transport_unittest.cc @@ -8,6 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ +#include #include #include "api/test/loopback_media_transport.h" @@ -23,6 +24,14 @@ class MockMediaTransportAudioSinkInterface MOCK_METHOD2(OnData, void(uint64_t, MediaTransportEncodedAudioFrame)); }; +class MockDataChannelSink : public DataChannelSink { + public: + MOCK_METHOD3(OnDataReceived, + void(int, DataMessageType, const rtc::CopyOnWriteBuffer&)); + MOCK_METHOD1(OnChannelClosing, void(int)); + MOCK_METHOD1(OnChannelClosed, void(int)); +}; + // Test only uses the sequence number. MediaTransportEncodedAudioFrame CreateAudioFrame(int sequence_number) { static constexpr int kSamplingRateHz = 48000; @@ -39,13 +48,18 @@ MediaTransportEncodedAudioFrame CreateAudioFrame(int sequence_number) { } // namespace TEST(LoopbackMediaTransport, AudioWithNoSinkSilentlyIgnored) { - MediaTransportPair transport_pair; + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(0)); transport_pair.second()->SendAudioFrame(2, CreateAudioFrame(0)); + transport_pair.FlushAsyncInvokes(); } TEST(LoopbackMediaTransport, AudioDeliveredToSink) { - MediaTransportPair transport_pair; + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); testing::StrictMock sink; EXPECT_CALL(sink, OnData(1, testing::Property( @@ -54,7 +68,58 @@ TEST(LoopbackMediaTransport, AudioDeliveredToSink) { transport_pair.second()->SetReceiveAudioSink(&sink); transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(10)); + transport_pair.FlushAsyncInvokes(); transport_pair.second()->SetReceiveAudioSink(nullptr); } +TEST(LoopbackMediaTransport, DataDeliveredToSink) { + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); + + MockDataChannelSink sink; + transport_pair.first()->SetDataSink(&sink); + + const int channel_id = 1; + EXPECT_CALL(sink, + OnDataReceived( + channel_id, DataMessageType::kText, + testing::Property( + &rtc::CopyOnWriteBuffer::cdata, testing::StrEq("foo")))); + + SendDataParams params; + params.type = DataMessageType::kText; + rtc::CopyOnWriteBuffer buffer("foo"); + transport_pair.second()->SendData(channel_id, params, buffer); + + transport_pair.FlushAsyncInvokes(); + transport_pair.first()->SetDataSink(nullptr); +} + +TEST(LoopbackMediaTransport, CloseDeliveredToSink) { + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); + + MockDataChannelSink first_sink; + transport_pair.first()->SetDataSink(&first_sink); + + MockDataChannelSink second_sink; + transport_pair.second()->SetDataSink(&second_sink); + + const int channel_id = 1; + { + testing::InSequence s; + EXPECT_CALL(second_sink, OnChannelClosing(channel_id)); + EXPECT_CALL(second_sink, OnChannelClosed(channel_id)); + EXPECT_CALL(first_sink, OnChannelClosed(channel_id)); + } + + transport_pair.first()->CloseChannel(channel_id); + + transport_pair.FlushAsyncInvokes(); + transport_pair.first()->SetDataSink(nullptr); + transport_pair.second()->SetDataSink(nullptr); +} + } // namespace webrtc diff --git a/audio/test/media_transport_test.cc b/audio/test/media_transport_test.cc index 7c25a724b2..91677dbd8b 100644 --- a/audio/test/media_transport_test.cc +++ b/audio/test/media_transport_test.cc @@ -65,7 +65,9 @@ class TestRenderer : public TestAudioDeviceModule::Renderer { } // namespace TEST(AudioWithMediaTransport, DeliversAudio) { - MediaTransportPair transport_pair; + std::unique_ptr transport_thread = rtc::Thread::Create(); + transport_thread->Start(); + MediaTransportPair transport_pair(transport_thread.get()); MockTransport rtcp_send_transport; MockTransport send_transport; std::unique_ptr null_event_log = RtcEventLog::CreateNull();