diff --git a/webrtc/examples/androidapp/src/org/appspot/apprtc/CallActivity.java b/webrtc/examples/androidapp/src/org/appspot/apprtc/CallActivity.java index ae653da63b..ed7e60f7e6 100644 --- a/webrtc/examples/androidapp/src/org/appspot/apprtc/CallActivity.java +++ b/webrtc/examples/androidapp/src/org/appspot/apprtc/CallActivity.java @@ -139,7 +139,6 @@ public class CallActivity extends Activity private HudFragment hudFragment; private CpuMonitor cpuMonitor; - @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); @@ -240,8 +239,15 @@ public class CallActivity extends Activity commandLineRun = intent.getBooleanExtra(EXTRA_CMDLINE, false); runTimeMs = intent.getIntExtra(EXTRA_RUNTIME, 0); - // Create connection client and connection parameters. - appRtcClient = new WebSocketRTCClient(this, new LooperExecutor()); + // Create connection client. Use DirectRTCClient if room name is an IP otherwise use the + // standard WebSocketRTCClient. + if (loopback || !DirectRTCClient.IP_PATTERN.matcher(roomId).matches()) { + appRtcClient = new WebSocketRTCClient(this, new LooperExecutor()); + } else { + Log.i(TAG, "Using DirectRTCClient because room name looks like an IP."); + appRtcClient = new DirectRTCClient(this); + } + // Create connection parameters. roomConnectionParameters = new RoomConnectionParameters( roomUri.toString(), roomId, loopback); diff --git a/webrtc/examples/androidapp/src/org/appspot/apprtc/DirectRTCClient.java b/webrtc/examples/androidapp/src/org/appspot/apprtc/DirectRTCClient.java new file mode 100644 index 0000000000..8db38ae830 --- /dev/null +++ b/webrtc/examples/androidapp/src/org/appspot/apprtc/DirectRTCClient.java @@ -0,0 +1,354 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +package org.appspot.apprtc; + +import android.util.Log; + +import org.appspot.apprtc.util.LooperExecutor; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.webrtc.IceCandidate; +import org.webrtc.PeerConnection; +import org.webrtc.SessionDescription; + +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Implementation of AppRTCClient that uses direct TCP connection as the signaling channel. + * This eliminates the need for an external server. This class does not support loopback + * connections. + */ +public class DirectRTCClient implements AppRTCClient, TCPChannelClient.TCPChannelEvents { + private static final String TAG = "DirectRTCClient"; + private static final int DEFAULT_PORT = 8888; + + // Regex pattern used for checking if room id looks like an IP. + static final Pattern IP_PATTERN = Pattern.compile( + "(" + // IPv4 + + "((\\d+\\.){3}\\d+)|" + // IPv6 + + "\\[((([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?::" + + "(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?)\\]|" + + "\\[(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})\\]|" + // IPv6 without [] + + "((([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?::(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?)|" + + "(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})|" + // Literals + + "localhost" + + ")" + // Optional port number + + "(:(\\d+))?" + ); + + private final LooperExecutor executor; + private final SignalingEvents events; + private TCPChannelClient tcpClient; + private RoomConnectionParameters connectionParameters; + + private enum ConnectionState { + NEW, CONNECTED, CLOSED, ERROR + }; + + // All alterations of the room state should be done from inside the looper thread. + private ConnectionState roomState; + + public DirectRTCClient(SignalingEvents events) { + this.events = events; + executor = new LooperExecutor(); + + executor.requestStart(); + roomState = ConnectionState.NEW; + } + + /** + * Connects to the room, roomId in connectionsParameters is required. roomId must be a valid + * IP address matching IP_PATTERN. + */ + @Override + public void connectToRoom(RoomConnectionParameters connectionParameters) { + this.connectionParameters = connectionParameters; + + if (connectionParameters.loopback) { + reportError("Loopback connections aren't supported by DirectRTCClient."); + } + + executor.execute(new Runnable() { + @Override + public void run() { + connectToRoomInternal(); + } + }); + } + + @Override + public void disconnectFromRoom() { + executor.execute(new Runnable() { + @Override + public void run() { + disconnectFromRoomInternal(); + } + }); + executor.requestStop(); + } + + /** + * Connects to the room. + * + * Runs on the looper thread. + */ + private void connectToRoomInternal() { + this.roomState = ConnectionState.NEW; + + String endpoint = connectionParameters.roomId; + + Matcher matcher = IP_PATTERN.matcher(endpoint); + if (!matcher.matches()) { + reportError("roomId must match IP_PATTERN for DirectRTCClient."); + return; + } + + String ip = matcher.group(1); + String portStr = matcher.group(matcher.groupCount()); + int port; + + if (portStr != null) { + try { + port = Integer.parseInt(portStr); + } catch (NumberFormatException e) { + reportError("Invalid port number: " + portStr); + return; + } + } else { + port = DEFAULT_PORT; + } + + tcpClient = new TCPChannelClient(executor, this, ip, port); + } + + /** + * Disconnects from the room. + * + * Runs on the looper thread. + */ + private void disconnectFromRoomInternal() { + roomState = ConnectionState.CLOSED; + + if (tcpClient != null) { + tcpClient.disconnect(); + tcpClient = null; + } + } + + @Override + public void sendOfferSdp(final SessionDescription sdp) { + executor.execute(new Runnable() { + @Override + public void run() { + if (roomState != ConnectionState.CONNECTED) { + reportError("Sending offer SDP in non connected state."); + return; + } + JSONObject json = new JSONObject(); + jsonPut(json, "sdp", sdp.description); + jsonPut(json, "type", "offer"); + sendMessage(json.toString()); + } + }); + } + + @Override + public void sendAnswerSdp(final SessionDescription sdp) { + executor.execute(new Runnable() { + @Override + public void run() { + JSONObject json = new JSONObject(); + jsonPut(json, "sdp", sdp.description); + jsonPut(json, "type", "answer"); + sendMessage(json.toString()); + } + }); + } + + @Override + public void sendLocalIceCandidate(final IceCandidate candidate) { + executor.execute(new Runnable() { + @Override + public void run() { + JSONObject json = new JSONObject(); + jsonPut(json, "type", "candidate"); + jsonPut(json, "label", candidate.sdpMLineIndex); + jsonPut(json, "id", candidate.sdpMid); + jsonPut(json, "candidate", candidate.sdp); + + if (roomState != ConnectionState.CONNECTED) { + reportError("Sending ICE candidate in non connected state."); + return; + } + sendMessage(json.toString()); + } + }); + } + + /** Send removed Ice candidates to the other participant. */ + @Override + public void sendLocalIceCandidateRemovals(final IceCandidate[] candidates) { + executor.execute(new Runnable() { + @Override + public void run() { + JSONObject json = new JSONObject(); + jsonPut(json, "type", "remove-candidates"); + JSONArray jsonArray = new JSONArray(); + for (final IceCandidate candidate : candidates) { + jsonArray.put(toJsonCandidate(candidate)); + } + jsonPut(json, "candidates", jsonArray); + + if (roomState != ConnectionState.CONNECTED) { + reportError("Sending ICE candidate removals in non connected state."); + return; + } + sendMessage(json.toString()); + } + }); + } + + // ------------------------------------------------------------------- + // TCPChannelClient event handlers + + /** + * If the client is the server side, this will trigger onConnectedToRoom. + */ + @Override + public void onTCPConnected(boolean isServer) { + if (isServer) { + roomState = ConnectionState.CONNECTED; + + SignalingParameters parameters = new SignalingParameters( + // Ice servers are not needed for direct connections. + new LinkedList(), + isServer, // Server side acts as the initiator on direct connections. + null, // clientId + null, // wssUrl + null, // wwsPostUrl + null, // offerSdp + null // iceCandidates + ); + events.onConnectedToRoom(parameters); + } + } + + @Override + public void onTCPMessage(String msg) { + try { + JSONObject json = new JSONObject(msg); + String type = json.optString("type"); + if (type.equals("candidate")) { + events.onRemoteIceCandidate(toJavaCandidate(json)); + } else if (type.equals("remove-candidates")) { + JSONArray candidateArray = json.getJSONArray("candidates"); + IceCandidate[] candidates = new IceCandidate[candidateArray.length()]; + for (int i = 0; i < candidateArray.length(); ++i) { + candidates[i] = toJavaCandidate(candidateArray.getJSONObject(i)); + } + events.onRemoteIceCandidatesRemoved(candidates); + } else if (type.equals("answer")) { + SessionDescription sdp = new SessionDescription( + SessionDescription.Type.fromCanonicalForm(type), + json.getString("sdp")); + events.onRemoteDescription(sdp); + } else if (type.equals("offer")) { + SessionDescription sdp = new SessionDescription( + SessionDescription.Type.fromCanonicalForm(type), + json.getString("sdp")); + + SignalingParameters parameters = new SignalingParameters( + // Ice servers are not needed for direct connections. + new LinkedList(), + false, // This code will only be run on the client side. So, we are not the initiator. + null, // clientId + null, // wssUrl + null, // wssPostUrl + sdp, // offerSdp + null // iceCandidates + ); + roomState = ConnectionState.CONNECTED; + events.onConnectedToRoom(parameters); + } else { + reportError("Unexpected TCP message: " + msg); + } + } catch (JSONException e) { + reportError("TCP message JSON parsing error: " + e.toString()); + } + } + + @Override + public void onTCPError(String description) { + reportError("TCP connection error: " + description); + } + + @Override + public void onTCPClose() { + events.onChannelClose(); + } + + // -------------------------------------------------------------------- + // Helper functions. + private void reportError(final String errorMessage) { + Log.e(TAG, errorMessage); + executor.execute(new Runnable() { + @Override + public void run() { + if (roomState != ConnectionState.ERROR) { + roomState = ConnectionState.ERROR; + events.onChannelError(errorMessage); + } + } + }); + } + + private void sendMessage(final String message) { + executor.execute(new Runnable() { + @Override + public void run() { + tcpClient.send(message); + } + }); + } + + // Put a |key|->|value| mapping in |json|. + private static void jsonPut(JSONObject json, String key, Object value) { + try { + json.put(key, value); + } catch (JSONException e) { + throw new RuntimeException(e); + } + } + + // Converts a Java candidate to a JSONObject. + private static JSONObject toJsonCandidate(final IceCandidate candidate) { + JSONObject json = new JSONObject(); + jsonPut(json, "label", candidate.sdpMLineIndex); + jsonPut(json, "id", candidate.sdpMid); + jsonPut(json, "candidate", candidate.sdp); + return json; + } + + // Converts a JSON candidate to a Java object. + private static IceCandidate toJavaCandidate(JSONObject json) throws JSONException { + return new IceCandidate(json.getString("id"), + json.getInt("label"), + json.getString("candidate")); + } +} diff --git a/webrtc/examples/androidapp/src/org/appspot/apprtc/TCPChannelClient.java b/webrtc/examples/androidapp/src/org/appspot/apprtc/TCPChannelClient.java new file mode 100644 index 0000000000..996483ec1d --- /dev/null +++ b/webrtc/examples/androidapp/src/org/appspot/apprtc/TCPChannelClient.java @@ -0,0 +1,362 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +package org.appspot.apprtc; + +import android.util.Log; + +import org.appspot.apprtc.util.LooperExecutor; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; + +/** + * Replacement for WebSocketChannelClient for direct communication between two IP addresses. Handles + * the signaling between the two clients using a TCP connection. + * + *

All public methods should be called from a looper executor thread + * passed in a constructor, otherwise exception will be thrown. + * All events are dispatched on the same thread. + */ +public class TCPChannelClient { + private static final String TAG = "TCPChannelClient"; + + private final LooperExecutor executor; + private final TCPChannelEvents eventListener; + private TCPSocket socket; + + /** + * Callback interface for messages delivered on TCP Connection. All callbacks are invoked from the + * looper executor thread. + */ + public interface TCPChannelEvents { + void onTCPConnected(boolean server); + void onTCPMessage(String message); + void onTCPError(String description); + void onTCPClose(); + } + + /** + * Initializes the TCPChannelClient. If IP is a local IP address, starts a listening server on + * that IP. If not, instead connects to the IP. + * + * @param eventListener Listener that will receive events from the client. + * @param ip IP address to listen on or connect to. + * @param port Port to listen on or connect to. + */ + public TCPChannelClient( + LooperExecutor executor, TCPChannelEvents eventListener, String ip, int port) { + this.executor = executor; + this.eventListener = eventListener; + + InetAddress address; + try { + address = InetAddress.getByName(ip); + } catch (UnknownHostException e) { + reportError("Invalid IP address."); + return; + } + + if (address.isAnyLocalAddress()) { + socket = new TCPSocketServer(address, port); + } else { + socket = new TCPSocketClient(address, port); + } + + socket.start(); + } + + /** + * Disconnects the client if not already disconnected. This will fire the onTCPClose event. + */ + public void disconnect() { + checkIfCalledOnValidThread(); + + socket.disconnect(); + } + + /** + * Sends a message on the socket. + * + * @param message Message to be sent. + */ + public void send(String message) { + checkIfCalledOnValidThread(); + + socket.send(message); + } + + /** + * Helper method for firing onTCPError events. Calls onTCPError on the executor thread. + */ + private void reportError(final String message) { + Log.e(TAG, "TCP Error: " + message); + executor.execute(new Runnable() { + @Override + public void run() { + eventListener.onTCPError(message); + } + }); + } + + /** + * Helper method for debugging purposes. + * Ensures that TCPChannelClient method is called on a looper thread. + */ + private void checkIfCalledOnValidThread() { + if (!executor.checkOnLooperThread()) { + throw new IllegalStateException( + "TCPChannelClient method is not called on valid thread"); + } + } + + + /** + * Base class for server and client sockets. Contains a listening thread that will call + * eventListener.onTCPMessage on new messages. + */ + private abstract class TCPSocket extends Thread { + // Lock for editing out and rawSocket + protected final Object rawSocketLock; + private PrintWriter out; + private Socket rawSocket; + + /** + * Connect to the peer, potentially a slow operation. + * + * @return Socket connection, null if connection failed. + */ + public abstract Socket connect(); + /** Returns true if sockets is a server rawSocket. */ + public abstract boolean isServer(); + + TCPSocket() { + rawSocketLock = new Object(); + } + + /** + * The listening thread. + */ + @Override + public void run() { + Log.d(TAG, "Listening thread started..."); + + // Receive connection to temporary variable first, so we don't block. + Socket tempSocket = connect(); + BufferedReader in; + + Log.d(TAG, "TCP connection established."); + + synchronized (rawSocketLock) { + if (rawSocket != null) { + Log.e(TAG, "Socket already existed and will be replaced."); + } + + rawSocket = tempSocket; + + // Connecting failed, error has already been reported, just exit. + if (rawSocket == null) { + return; + } + + try { + out = new PrintWriter(rawSocket.getOutputStream(), true); + in = new BufferedReader(new InputStreamReader(rawSocket.getInputStream())); + } catch (IOException e) { + reportError("Failed to open IO on rawSocket: " + e.getMessage()); + return; + } + } + + Log.v(TAG, "Execute onTCPConnected"); + executor.execute(new Runnable() { + @Override + public void run() { + Log.v(TAG, "Run onTCPConnected"); + eventListener.onTCPConnected(isServer()); + } + }); + + while (true) { + final String message; + try { + message = in.readLine(); + } catch (IOException e) { + synchronized (rawSocketLock) { + // If socket was closed, this is expected. + if (rawSocket == null) { + break; + } + } + + reportError("Failed to read from rawSocket: " + e.getMessage()); + break; + } + + // No data received, rawSocket probably closed. + if (message == null) { + break; + } + + executor.execute(new Runnable() { + @Override + public void run() { + Log.v(TAG, "Receive: " + message); + eventListener.onTCPMessage(message); + } + }); + } + + Log.d(TAG, "Receiving thread exiting..."); + + // Close the rawSocket if it is still open. + disconnect(); + } + + /** + * Closes the rawSocket if it is still open. Also fires the onTCPClose event. + */ + public void disconnect() { + try { + synchronized (rawSocketLock) { + if (rawSocket != null) { + rawSocket.close(); + rawSocket = null; + out = null; + + executor.execute(new Runnable() { + @Override + public void run() { + eventListener.onTCPClose(); + } + }); + } + } + } catch (IOException e) { + reportError("Failed to close rawSocket: " + e.getMessage()); + } + } + + /** + * Sends a message on the socket. Should only be called on the executor thread. + */ + public void send(String message) { + Log.v(TAG, "Send: " + message); + + synchronized (rawSocketLock) { + if (out == null) { + reportError("Sending data on closed socket."); + return; + } + + out.write(message + "\n"); + out.flush(); + } + } + } + + private class TCPSocketServer extends TCPSocket { + // Server socket is also guarded by rawSocketLock. + private ServerSocket serverSocket; + + final private InetAddress address; + final private int port; + + public TCPSocketServer(InetAddress address, int port) { + this.address = address; + this.port = port; + } + + /** Opens a listening socket and waits for a connection. */ + @Override + public Socket connect() { + Log.d(TAG, "Listening on [" + address.getHostAddress() + "]:" + Integer.toString(port)); + + final ServerSocket tempSocket; + try { + tempSocket = new ServerSocket(port, 0, address); + } catch (IOException e) { + reportError("Failed to create server socket: " + e.getMessage()); + return null; + } + + synchronized (rawSocketLock) { + if (serverSocket != null) { + Log.e(TAG, "Server rawSocket was already listening and new will be opened."); + } + + serverSocket = tempSocket; + } + + try { + return tempSocket.accept(); + } catch (IOException e) { + reportError("Failed to receive connection: " + e.getMessage()); + return null; + } + } + + /** Closes the listening socket and calls super. */ + @Override + public void disconnect() { + try { + synchronized (rawSocketLock) { + if (serverSocket != null) { + serverSocket.close(); + serverSocket = null; + } + } + } catch (IOException e) { + reportError("Failed to close server socket: " + e.getMessage()); + } + + super.disconnect(); + } + + @Override + public boolean isServer() { + return true; + } + } + + private class TCPSocketClient extends TCPSocket { + final private InetAddress address; + final private int port; + + public TCPSocketClient(InetAddress address, int port) { + this.address = address; + this.port = port; + } + + /** Connects to the peer. */ + @Override + public Socket connect() { + Log.d(TAG, "Connecting to [" + address.getHostAddress() + "]:" + Integer.toString(port)); + + try { + return new Socket(address, port); + } catch (IOException e) { + reportError("Failed to connect: " + e.getMessage()); + return null; + } + } + + @Override + public boolean isServer() { + return false; + } + } +} diff --git a/webrtc/examples/androidjunit/src/org/appspot/apprtc/TCPChannelClientTest.java b/webrtc/examples/androidjunit/src/org/appspot/apprtc/TCPChannelClientTest.java new file mode 100644 index 0000000000..4c60299456 --- /dev/null +++ b/webrtc/examples/androidjunit/src/org/appspot/apprtc/TCPChannelClientTest.java @@ -0,0 +1,195 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +package org.appspot.apprtc; + +import org.appspot.apprtc.util.LooperExecutor; +import org.appspot.apprtc.util.RobolectricLooperExecutor; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.robolectric.RobolectricTestRunner; +import org.robolectric.annotation.Config; +import org.robolectric.shadows.ShadowLog; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(RobolectricTestRunner.class) +@Config(manifest = Config.NONE) +public class TCPChannelClientTest { + private static final int PORT = 8888; + /** + * How long we wait before trying to connect to the server. Chosen quite arbitrarily and + * could be made smaller if need be. + */ + private static final int SERVER_WAIT = 10; + private static final int CONNECT_TIMEOUT = 100; + private static final int SEND_TIMEOUT = 100; + private static final int DISCONNECT_TIMEOUT = 100; + private static final String TEST_MESSAGE_SERVER = "Hello, Server!"; + private static final String TEST_MESSAGE_CLIENT = "Hello, Client!"; + + @Mock TCPChannelClient.TCPChannelEvents serverEvents; + @Mock TCPChannelClient.TCPChannelEvents clientEvents; + + private RobolectricLooperExecutor executor; + private TCPChannelClient server; + private TCPChannelClient client; + + + @Before + public void setUp() { + ShadowLog.stream = System.out; + + MockitoAnnotations.initMocks(this); + + executor = new RobolectricLooperExecutor(); + executor.requestStart(); + } + + @After + public void tearDown() { + verifyNoMoreEvents(); + + executor.executeAndWait(new Runnable() { + @Override + public void run() { + client.disconnect(); + server.disconnect(); + } + }); + + // Stop the executor thread + executor.requestStop(); + try { + executor.join(); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + + @Test + public void testConnectIPv4() { + setUpIPv4Server(); + try { + Thread.sleep(SERVER_WAIT); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + setUpIPv4Client(); + + verify(serverEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(true); + verify(clientEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(false); + } + + @Test + public void testConnectIPv6() { + setUpIPv6Server(); + try { + Thread.sleep(SERVER_WAIT); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + setUpIPv6Client(); + + verify(serverEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(true); + verify(clientEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(false); + } + + @Test + public void testSendData() { + testConnectIPv4(); + + executor.executeAndWait(new Runnable() { + @Override + public void run() { + client.send(TEST_MESSAGE_SERVER); + server.send(TEST_MESSAGE_CLIENT); + } + }); + + verify(serverEvents, timeout(SEND_TIMEOUT)).onTCPMessage(TEST_MESSAGE_SERVER); + verify(clientEvents, timeout(SEND_TIMEOUT)).onTCPMessage(TEST_MESSAGE_CLIENT); + } + + @Test + public void testDisconnectServer() { + testConnectIPv4(); + executor.executeAndWait(new Runnable() { + @Override + public void run() { + server.disconnect(); + } + }); + + verify(serverEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose(); + verify(clientEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose(); + } + + @Test + public void testDisconnectClient() { + testConnectIPv4(); + executor.executeAndWait(new Runnable() { + @Override + public void run() { + client.disconnect(); + } + }); + + verify(serverEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose(); + verify(clientEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose(); + } + + private void setUpIPv4Server() { + setUpServer("0.0.0.0", PORT); + } + + private void setUpIPv4Client() { + setUpClient("127.0.0.1", PORT); + } + + private void setUpIPv6Server() { + setUpServer("::", PORT); + } + + private void setUpIPv6Client() { + setUpClient("::1", PORT); + } + + private void setUpServer(String ip, int port) { + server = new TCPChannelClient(executor, serverEvents, ip, port); + } + + private void setUpClient(String ip, int port) { + client = new TCPChannelClient(executor, clientEvents, ip, port); + } + + /** + * Verifies no more server or client events have been issued + */ + private void verifyNoMoreEvents() { + verifyNoMoreInteractions(serverEvents); + verifyNoMoreInteractions(clientEvents); + } +} diff --git a/webrtc/examples/androidjunit/src/org/appspot/apprtc/util/RobolectricLooperExecutor.java b/webrtc/examples/androidjunit/src/org/appspot/apprtc/util/RobolectricLooperExecutor.java new file mode 100644 index 0000000000..19d595d304 --- /dev/null +++ b/webrtc/examples/androidjunit/src/org/appspot/apprtc/util/RobolectricLooperExecutor.java @@ -0,0 +1,118 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +package org.appspot.apprtc.util; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.fail; + +/** + * LooperExecutor that doesn't use Looper because its implementation in Robolectric is not suited + * for our needs. Also implements executeAndWait that can be used to wait until the runnable has + * been executed. + */ +public class RobolectricLooperExecutor extends LooperExecutor { + private volatile boolean running = false; + private static final int RUNNABLE_QUEUE_CAPACITY = 256; + private final BlockingQueue runnableQueue + = new ArrayBlockingQueue<>(RUNNABLE_QUEUE_CAPACITY); + private long threadId; + + /** + * Executes the runnable passed to the constructor and sets isDone flag afterwards. + */ + private static class ExecuteAndWaitRunnable implements Runnable { + public boolean isDone = false; + private final Runnable runnable; + + ExecuteAndWaitRunnable(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public void run() { + runnable.run(); + + synchronized (this) { + isDone = true; + notifyAll(); + } + } + } + + @Override + public void run() { + threadId = Thread.currentThread().getId(); + + while (running) { + final Runnable runnable; + + try { + runnable = runnableQueue.take(); + } catch (InterruptedException e) { + if (running) { + fail(e.getMessage()); + } + return; + } + + runnable.run(); + } + } + + @Override + public synchronized void requestStart() { + if (running) { + return; + } + running = true; + start(); + } + + @Override + public synchronized void requestStop() { + running = false; + interrupt(); + } + + @Override + public synchronized void execute(Runnable runnable) { + try { + runnableQueue.put(runnable); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + + /** + * Queues runnable to be run and waits for it to be executed by the executor thread + */ + public void executeAndWait(Runnable runnable) { + ExecuteAndWaitRunnable executeAndWaitRunnable = new ExecuteAndWaitRunnable(runnable); + execute(executeAndWaitRunnable); + + synchronized (executeAndWaitRunnable) { + while (!executeAndWaitRunnable.isDone) { + try { + executeAndWaitRunnable.wait(); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + } + } + + @Override + public boolean checkOnLooperThread() { + return (Thread.currentThread().getId() == threadId); + } +}