Commit 97139d8b authored by MozLando's avatar MozLando
Browse files

Merge #5093



5093: Add streaming for large messages r=jonalmeida a=espertus
Co-authored-by: default avatarEllen Spertus <ellen.spertus@gmail.com>
parents d307380a 78b4444d
......@@ -27,9 +27,38 @@ import com.google.android.gms.nearby.connection.Strategy
import mozilla.components.support.base.log.logger.Logger
import mozilla.components.support.base.observer.Observable
import mozilla.components.support.base.observer.ObserverRegistry
import java.nio.charset.StandardCharsets.UTF_8
import java.io.ByteArrayInputStream
import java.nio.charset.Charset
import java.util.concurrent.ConcurrentHashMap
@VisibleForTesting
internal fun Payload.toString(errorReporter: (String) -> Unit = { _: String -> Unit }): String? =
when (type) {
Payload.Type.BYTES -> {
asBytes()?.toString(NearbyConnection.PAYLOAD_ENCODING)
}
Payload.Type.STREAM -> {
asStream()?.asInputStream()?.readBytes()?.toString(NearbyConnection.PAYLOAD_ENCODING)
}
else -> {
errorReporter("Received payload had illegal type: $type")
null
}
}
@VisibleForTesting
internal fun String.toPayload(): Payload {
val bytes = toByteArray(NearbyConnection.PAYLOAD_ENCODING)
if (bytes.size <= ConnectionsClient.MAX_BYTES_DATA_SIZE) {
return Payload.fromBytes(bytes)
} else {
// Logically, it might make more sense to use Payload.fromFile() since we
// know the size of the string, than Payload.fromStream(), but we would
// have to create a file locally to use use the former.
return Payload.fromStream(ByteArrayInputStream(bytes))
}
}
/**
* A class that can be run on two devices to allow them to connect. This supports sending a single
* message at a time in each direction. It contains internal synchronization and may be accessed
......@@ -41,6 +70,7 @@ import java.util.concurrent.ConcurrentHashMap
* @param connectionsClient the underlying client
* @param name a human-readable name for this device
*/
@Suppress("TooManyFunctions")
class NearbyConnection(
private val connectionsClient: ConnectionsClient,
private val name: String = Build.MODEL,
......@@ -198,15 +228,6 @@ class NearbyConnection(
private var connectionState: ConnectionState = ConnectionState.Isolated
// Override all 3 register() methods to notify listener of initial state.
override fun register(
observer: NearbyConnectionObserver,
owner: LifecycleOwner,
autoPause: Boolean
) {
delegate.register(observer, owner, autoPause)
observer.onStateUpdated(connectionState)
}
override fun register(observer: NearbyConnectionObserver) {
delegate.register(observer)
observer.onStateUpdated(connectionState)
......@@ -217,6 +238,15 @@ class NearbyConnection(
observer.onStateUpdated(connectionState)
}
override fun register(
observer: NearbyConnectionObserver,
owner: LifecycleOwner,
autoPause: Boolean
) {
delegate.register(observer, owner, autoPause)
observer.onStateUpdated(connectionState)
}
// This method is called from both the main thread and callbacks.
@Synchronized
private fun updateState(cs: ConnectionState) {
......@@ -321,11 +351,10 @@ class NearbyConnection(
private val payloadCallback = object : PayloadCallback() {
override fun onPayloadReceived(endpointId: String, payload: Payload) {
notifyObservers {
onMessageReceived(
endpointId,
endpointIdsToNames[endpointId],
payload.asBytes()?.let { String(it, UTF_8) } ?: "")
payload.toString(::reportError)?.let {
notifyObservers {
onMessageReceived(endpointId, endpointIdsToNames[endpointId], it)
}
}
}
......@@ -356,7 +385,7 @@ class NearbyConnection(
fun sendMessage(message: String): Long? {
val state = connectionState
if (state is ConnectionState.ReadyToSend) {
val payload: Payload = Payload.fromBytes(message.toByteArray(UTF_8))
val payload = message.toPayload()
connectionsClient.sendPayload(state.neighborId, payload)
updateState(
ConnectionState.Sending(
......@@ -390,6 +419,7 @@ class NearbyConnection(
@VisibleForTesting
internal const val PACKAGE_NAME = "mozilla.components.lib.nearby"
@VisibleForTesting
internal val PAYLOAD_ENCODING: Charset = Charsets.UTF_8
private val STRATEGY = Strategy.P2P_STAR
/**
......
......@@ -4,6 +4,9 @@
package mozilla.components.lib.nearby
import android.view.View
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleOwner
import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.android.gms.common.api.CommonStatusCodes
import com.google.android.gms.common.api.Status
......@@ -44,15 +47,22 @@ class NearbyConnectionTest {
private lateinit var mockConnectionsClient: ConnectionsClient
private lateinit var mockNearbyConnectionObserver: NearbyConnectionObserver
private lateinit var mockNearbyConnection: NearbyConnection
private lateinit var mockConnectionInfo: ConnectionInfo
private lateinit var nearbyConnection: NearbyConnection
private lateinit var stateWatchingObserver: NearbyConnectionObserver
private var state: ConnectionState? = null // mutated by stateWatchingObserver
private lateinit var errors: MutableList<String>
private fun logError(s: String) {
errors.add(s)
}
@Before
fun setup() {
// Clear state.
state = null
errors = mutableListOf<String>()
// Initialize mocks.
mockConnectionsClient = mock<ConnectionsClient>()
......@@ -62,15 +72,31 @@ class NearbyConnectionTest {
state = connectionState
}
}
mockNearbyConnection = NearbyConnection(mockConnectionsClient, DEVICE_NAME)
nearbyConnection = NearbyConnection(mockConnectionsClient, DEVICE_NAME)
mockConnectionInfo = mock<ConnectionInfo>()
whenever(mockConnectionInfo.endpointName).thenReturn(NEIGHBOR_NAME)
whenever(mockConnectionInfo.authenticationToken).thenReturn(AUTHENTICATION_TOKEN)
}
@Test
fun `Should make initial onStatusUpdated() call`() {
mockNearbyConnection.register(stateWatchingObserver)
fun `Should make initial onStatusUpdated() call after 1-arg registration`() {
nearbyConnection.register(stateWatchingObserver)
assertEquals(ConnectionState.Isolated, state)
}
@Test
fun `Should make initial onStatusUpdated() call after 2-arg registration`() {
nearbyConnection.register(stateWatchingObserver, mock<View>())
assertEquals(ConnectionState.Isolated, state)
}
@Test
fun `Should make initial onStatusUpdated() call after 3-arg registration`() {
val mockLifecycleOwner = mock<LifecycleOwner>()
val mockLifecycle = mock<Lifecycle>()
whenever(mockLifecycleOwner.lifecycle).thenReturn(mockLifecycle)
whenever(mockLifecycle.currentState).thenReturn(androidx.lifecycle.Lifecycle.State.RESUMED)
nearbyConnection.register(stateWatchingObserver, mockLifecycleOwner, true)
assertEquals(ConnectionState.Isolated, state)
}
......@@ -95,9 +121,9 @@ class NearbyConnectionTest {
// Should enter advertising state if startAdvertising() succeeds.
whenever(mockConnectionsClient.startAdvertising(anyString(), anyString(), any(), any()))
.thenReturn(VoidTask.SUCCESS)
mockNearbyConnection.register(stateWatchingObserver)
nearbyConnection.register(stateWatchingObserver)
assertState(ConnectionState.Isolated::class)
mockNearbyConnection.startAdvertising()
nearbyConnection.startAdvertising()
assertState(ConnectionState.Advertising::class)
// Should enter authenticating state if a connection is simulated.
......@@ -118,8 +144,8 @@ class NearbyConnectionTest {
assertEquals(AUTHENTICATION_TOKEN, authState?.token)
// Should update endpointIdsToNames.
assertEquals(1, mockNearbyConnection.endpointIdsToNames.size)
assertEquals(NEIGHBOR_NAME, mockNearbyConnection.endpointIdsToNames[NEIGHBOR_ID])
assertEquals(1, nearbyConnection.endpointIdsToNames.size)
assertEquals(NEIGHBOR_NAME, nearbyConnection.endpointIdsToNames[NEIGHBOR_ID])
// If all goes well, ConnectionLifeCycleCallback will be needed.
return connectionLifecycleCallback.value
......@@ -147,7 +173,7 @@ class NearbyConnectionTest {
// Should enter sending state if sendMessage() called.
whenever(mockConnectionsClient.sendPayload(anyString(), any())).thenReturn(VoidTask.SUCCESS)
val returnedPayloadId = mockNearbyConnection.sendMessage(OUTGOING_MESSAGE)
val returnedPayloadId = nearbyConnection.sendMessage(OUTGOING_MESSAGE)
assertState(ConnectionState.Sending::class)
// Should have right payload.
......@@ -206,7 +232,7 @@ class NearbyConnectionTest {
assertState(ConnectionState.ReadyToSend::class)
// Should change state to isolated if disconnected.
mockNearbyConnection.disconnect()
nearbyConnection.disconnect()
assertState(ConnectionState.Isolated::class)
verify(mockConnectionsClient).stopAllEndpoints()
}
......@@ -228,9 +254,9 @@ class NearbyConnectionTest {
fun `Should enter failure state if startAdvertising() fails`() {
whenever(mockConnectionsClient.startAdvertising(anyString(), anyString(), any(), any()))
.thenReturn(VoidTask.FAILURE)
mockNearbyConnection.register(stateWatchingObserver)
nearbyConnection.register(stateWatchingObserver)
assertState(ConnectionState.Isolated::class)
mockNearbyConnection.startAdvertising()
nearbyConnection.startAdvertising()
assertState(ConnectionState.Failure::class)
}
......@@ -238,9 +264,9 @@ class NearbyConnectionTest {
// Should enter discovering state if startDiscovery() succeeds.
whenever(mockConnectionsClient.startDiscovery(anyString(), any(), any()))
.thenReturn(VoidTask.SUCCESS)
mockNearbyConnection.register(stateWatchingObserver)
nearbyConnection.register(stateWatchingObserver)
assertState(ConnectionState.Isolated::class)
mockNearbyConnection.startDiscovering()
nearbyConnection.startDiscovering()
assertState(ConnectionState.Discovering::class)
// Should call startDiscovery() with right arguments.
......@@ -308,9 +334,9 @@ class NearbyConnectionTest {
fun `Should enter failure state if startDiscovery() fails`() {
whenever(mockConnectionsClient.startDiscovery(anyString(), any(), any()))
.thenReturn(VoidTask.FAILURE)
mockNearbyConnection.register(stateWatchingObserver)
nearbyConnection.register(stateWatchingObserver)
assertState(ConnectionState.Isolated::class)
mockNearbyConnection.startDiscovering()
nearbyConnection.startDiscovering()
assertState(ConnectionState.Failure::class)
}
......@@ -327,9 +353,9 @@ class NearbyConnectionTest {
@Test
fun `Should return null if sendMessage() called from wrong state`() {
mockNearbyConnection.register(stateWatchingObserver)
nearbyConnection.register(stateWatchingObserver)
assertState(ConnectionState.Isolated::class)
assertNull(mockNearbyConnection.sendMessage(OUTGOING_MESSAGE))
assertNull(nearbyConnection.sendMessage(OUTGOING_MESSAGE))
}
@Test
......@@ -340,4 +366,40 @@ class NearbyConnectionTest {
connectionLifecycleCallback.onConnectionResult(NEIGHBOR_ID, mockConnectionResolution)
assertState(ConnectionState.Failure::class)
}
@Test
fun `Should encode and decode short message as bytes`() {
val payload = OUTGOING_MESSAGE.toPayload()
assertTrue(payload.type == Payload.Type.BYTES)
assertEquals(OUTGOING_MESSAGE, payload.toString(::logError))
}
@Test
fun `Should encode and decode borderline message as bytes`() {
// This tests the case where the message length is exactly equal to the bytes limit.
val bytes = ByteArray(ConnectionsClient.MAX_BYTES_DATA_SIZE, { _ -> 0 })
val message = String(bytes, NearbyConnection.PAYLOAD_ENCODING)
val payload = message.toPayload()
assertTrue(payload.type == Payload.Type.BYTES)
assertEquals(message, payload.toString(::logError))
assertEquals(0, errors.count())
}
fun `Should encode and decode long message as stream`() {
// This tests the case where the message length is one more than the bytes limit.
val bytes = ByteArray(ConnectionsClient.MAX_BYTES_DATA_SIZE, { _ -> 0 })
val message = String(bytes, NearbyConnection.PAYLOAD_ENCODING)
val payload = message.toPayload()
assertEquals(Payload.Type.STREAM, payload.type)
assertEquals(message, payload.toString(::logError))
assertEquals(0, errors.count())
}
fun `Should report error and return null when decoding invalid payload of type FILE`() {
val mockPayload = mock<Payload>()
whenever(mockPayload.type).thenReturn(Payload.Type.FILE)
val message = mockPayload.toString(::logError)
assertNull(message)
assertEquals(1, errors.count()) // message content may vary
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment