Implement data channel methods in LoopbackMediaTransport.

This enables PeerConnection tests to use LoopbackMediaTransport to test
data-channel-over-media-transport code.

Also changes LoopbackMediaTransport to invoke callbacks asynchronously.
This is more accurate, as these callbacks are triggered by network
events.  The caller should not block while the callback executes.

Since LoopbackMediaTransport is used for testing, it provides a
FlushAsyncInvokes() method which may be used to ensure that callbacks
occur deterministically (eg. before checking that data has been
received).

Bug: webrtc:9719
Change-Id: Ib8ea9aebf4a0ad3d5934a6fe4ab33432c68523fd
Tbr: stefan@webrtc.org
Reviewed-on: https://webrtc-review.googlesource.com/c/109060
Commit-Queue: Bjorn Mellem <mellem@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25489}
This commit is contained in:
Bjorn Mellem 2018-11-01 16:42:44 -07:00 committed by Commit Bot
parent 0367d1a1fb
commit 273d0296a4
5 changed files with 149 additions and 18 deletions

View File

@ -611,6 +611,7 @@ if (rtc_include_tests) {
deps = [
":libjingle_peerconnection_api",
"../rtc_base:checks",
"../rtc_base:rtc_base",
]
}

View File

@ -5,4 +5,9 @@ specific_include_rules = {
".*": [
"+video"
],
"loopback_media_transport\.h": [
"+rtc_base/asyncinvoker.h",
"+rtc_base/criticalsection.h",
"+rtc_base/thread.h",
],
}

View File

@ -14,6 +14,9 @@
#include <utility>
#include "api/media_transport_interface.h"
#include "rtc_base/asyncinvoker.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -21,24 +24,36 @@ namespace webrtc {
// Currently supports audio only.
class MediaTransportPair {
public:
MediaTransportPair()
: pipe_{LoopbackMediaTransport(&pipe_[1]),
LoopbackMediaTransport(&pipe_[0])} {}
explicit MediaTransportPair(rtc::Thread* thread)
: first_(thread, &second_), second_(thread, &first_) {}
// Ownership stays with MediaTransportPair
MediaTransportInterface* first() { return &pipe_[0]; }
MediaTransportInterface* second() { return &pipe_[1]; }
MediaTransportInterface* first() { return &first_; }
MediaTransportInterface* second() { return &second_; }
void FlushAsyncInvokes() {
first_.FlushAsyncInvokes();
second_.FlushAsyncInvokes();
}
private:
class LoopbackMediaTransport : public MediaTransportInterface {
public:
explicit LoopbackMediaTransport(LoopbackMediaTransport* other)
: other_(other) {}
~LoopbackMediaTransport() { RTC_CHECK(sink_ == nullptr); }
LoopbackMediaTransport(rtc::Thread* thread, LoopbackMediaTransport* other)
: thread_(thread), other_(other) {}
~LoopbackMediaTransport() {
rtc::CritScope lock(&sink_lock_);
RTC_CHECK(sink_ == nullptr);
RTC_CHECK(data_sink_ == nullptr);
}
RTCError SendAudioFrame(uint64_t channel_id,
MediaTransportEncodedAudioFrame frame) override {
other_->OnData(channel_id, std::move(frame));
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_,
[this, channel_id, frame] {
other_->OnData(channel_id, std::move(frame));
});
return RTCError::OK();
};
@ -53,6 +68,7 @@ class MediaTransportPair {
}
void SetReceiveAudioSink(MediaTransportAudioSinkInterface* sink) override {
rtc::CritScope lock(&sink_lock_);
if (sink) {
RTC_CHECK(sink_ == nullptr);
}
@ -70,27 +86,69 @@ class MediaTransportPair {
RTCError SendData(int channel_id,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& buffer) override {
return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented");
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, thread_, [this, channel_id, params, buffer] {
other_->OnData(channel_id, params.type, buffer);
});
return RTCError::OK();
}
RTCError CloseChannel(int channel_id) override {
return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented");
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, channel_id] {
other_->OnRemoteCloseChannel(channel_id);
rtc::CritScope lock(&sink_lock_);
if (data_sink_) {
data_sink_->OnChannelClosed(channel_id);
}
});
return RTCError::OK();
}
void SetDataSink(DataChannelSink* sink) override {}
void SetDataSink(DataChannelSink* sink) override {
rtc::CritScope lock(&sink_lock_);
data_sink_ = sink;
}
void FlushAsyncInvokes() { invoker_.Flush(thread_); }
private:
void OnData(uint64_t channel_id, MediaTransportEncodedAudioFrame frame) {
rtc::CritScope lock(&sink_lock_);
if (sink_) {
sink_->OnData(channel_id, frame);
}
}
MediaTransportAudioSinkInterface* sink_ = nullptr;
LoopbackMediaTransport* other_;
void OnData(int channel_id,
DataMessageType type,
const rtc::CopyOnWriteBuffer& buffer) {
rtc::CritScope lock(&sink_lock_);
if (data_sink_) {
data_sink_->OnDataReceived(channel_id, type, buffer);
}
}
void OnRemoteCloseChannel(int channel_id) {
rtc::CritScope lock(&sink_lock_);
if (data_sink_) {
data_sink_->OnChannelClosing(channel_id);
data_sink_->OnChannelClosed(channel_id);
}
}
rtc::Thread* const thread_;
rtc::CriticalSection sink_lock_;
MediaTransportAudioSinkInterface* sink_ RTC_GUARDED_BY(sink_lock_) =
nullptr;
DataChannelSink* data_sink_ RTC_GUARDED_BY(sink_lock_) = nullptr;
LoopbackMediaTransport* const other_;
rtc::AsyncInvoker invoker_;
};
LoopbackMediaTransport pipe_[2];
LoopbackMediaTransport first_;
LoopbackMediaTransport second_;
};
} // namespace webrtc

View File

@ -8,6 +8,7 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#include <memory>
#include <vector>
#include "api/test/loopback_media_transport.h"
@ -23,6 +24,14 @@ class MockMediaTransportAudioSinkInterface
MOCK_METHOD2(OnData, void(uint64_t, MediaTransportEncodedAudioFrame));
};
class MockDataChannelSink : public DataChannelSink {
public:
MOCK_METHOD3(OnDataReceived,
void(int, DataMessageType, const rtc::CopyOnWriteBuffer&));
MOCK_METHOD1(OnChannelClosing, void(int));
MOCK_METHOD1(OnChannelClosed, void(int));
};
// Test only uses the sequence number.
MediaTransportEncodedAudioFrame CreateAudioFrame(int sequence_number) {
static constexpr int kSamplingRateHz = 48000;
@ -39,13 +48,18 @@ MediaTransportEncodedAudioFrame CreateAudioFrame(int sequence_number) {
} // namespace
TEST(LoopbackMediaTransport, AudioWithNoSinkSilentlyIgnored) {
MediaTransportPair transport_pair;
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start();
MediaTransportPair transport_pair(thread.get());
transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(0));
transport_pair.second()->SendAudioFrame(2, CreateAudioFrame(0));
transport_pair.FlushAsyncInvokes();
}
TEST(LoopbackMediaTransport, AudioDeliveredToSink) {
MediaTransportPair transport_pair;
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start();
MediaTransportPair transport_pair(thread.get());
testing::StrictMock<MockMediaTransportAudioSinkInterface> sink;
EXPECT_CALL(sink,
OnData(1, testing::Property(
@ -54,7 +68,58 @@ TEST(LoopbackMediaTransport, AudioDeliveredToSink) {
transport_pair.second()->SetReceiveAudioSink(&sink);
transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(10));
transport_pair.FlushAsyncInvokes();
transport_pair.second()->SetReceiveAudioSink(nullptr);
}
TEST(LoopbackMediaTransport, DataDeliveredToSink) {
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start();
MediaTransportPair transport_pair(thread.get());
MockDataChannelSink sink;
transport_pair.first()->SetDataSink(&sink);
const int channel_id = 1;
EXPECT_CALL(sink,
OnDataReceived(
channel_id, DataMessageType::kText,
testing::Property<rtc::CopyOnWriteBuffer, const char*>(
&rtc::CopyOnWriteBuffer::cdata, testing::StrEq("foo"))));
SendDataParams params;
params.type = DataMessageType::kText;
rtc::CopyOnWriteBuffer buffer("foo");
transport_pair.second()->SendData(channel_id, params, buffer);
transport_pair.FlushAsyncInvokes();
transport_pair.first()->SetDataSink(nullptr);
}
TEST(LoopbackMediaTransport, CloseDeliveredToSink) {
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start();
MediaTransportPair transport_pair(thread.get());
MockDataChannelSink first_sink;
transport_pair.first()->SetDataSink(&first_sink);
MockDataChannelSink second_sink;
transport_pair.second()->SetDataSink(&second_sink);
const int channel_id = 1;
{
testing::InSequence s;
EXPECT_CALL(second_sink, OnChannelClosing(channel_id));
EXPECT_CALL(second_sink, OnChannelClosed(channel_id));
EXPECT_CALL(first_sink, OnChannelClosed(channel_id));
}
transport_pair.first()->CloseChannel(channel_id);
transport_pair.FlushAsyncInvokes();
transport_pair.first()->SetDataSink(nullptr);
transport_pair.second()->SetDataSink(nullptr);
}
} // namespace webrtc

View File

@ -65,7 +65,9 @@ class TestRenderer : public TestAudioDeviceModule::Renderer {
} // namespace
TEST(AudioWithMediaTransport, DeliversAudio) {
MediaTransportPair transport_pair;
std::unique_ptr<rtc::Thread> transport_thread = rtc::Thread::Create();
transport_thread->Start();
MediaTransportPair transport_pair(transport_thread.get());
MockTransport rtcp_send_transport;
MockTransport send_transport;
std::unique_ptr<RtcEventLog> null_event_log = RtcEventLog::CreateNull();